Skip to content

Commit

Permalink
push mat queries to ftp and staging
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl committed Dec 25, 2024
1 parent b310947 commit 0669338
Showing 3 changed files with 90 additions and 4 deletions.
22 changes: 22 additions & 0 deletions dataload/08_run_queries/csvs_to_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import glob
import argparse
import sqlite3
import pandas as pd
from subprocess import Popen, PIPE, STDOUT

def main():

parser = argparse.ArgumentParser(description='Create sqlite from CSV files')
parser.add_argument('--out-sqlite-path', type=str, help='Path for output sqlite file', required=True)
args = parser.parse_args()

conn = sqlite3.connect(args.out_sqlite_path)

for csv in glob.glob("*.csv"):
df = pd.read_csv(csv)
df.to_sql(csv.replace(".csv", ""), conn)

conn.close()

if __name__=="__main__":
main()
4 changes: 2 additions & 2 deletions dataload/08_run_queries/run_queries.py
Original file line number Diff line number Diff line change
@@ -10,9 +10,9 @@

def main():

parser = argparse.ArgumentParser(description='Create Neo4j DB')
parser = argparse.ArgumentParser(description='Materialise Cypher queries as CSV')
parser.add_argument('--in-db-path', type=str, help='Path with the neo4j database to query', required=True)
parser.add_argument('--out-sqlites-path', type=str, help='Path for the output sqlite files of materialised results', required=True)
parser.add_argument('--out-csvs-path', type=str, help='Path for the output csv files of materialised results', required=True)
args = parser.parse_args()

has_singularity = os.system('which singularity') == 0
68 changes: 66 additions & 2 deletions dataload/nextflow/load_subgraph.nf
Original file line number Diff line number Diff line change
@@ -46,7 +46,8 @@ workflow {
ids_csv.collect()
)

mat_queries_sqlites = run_materialised_queries(neo_db)
mat_queries_csvs = run_materialised_queries(neo_db)
mat_queries_sqlite = csvs_to_sqlite(mat_queries_csvs.collect())

solr_inputs = prepare_solr(link.out.nodes, link.out.edges)
solr_nodes_core = create_solr_nodes_core(prepare_solr.out.nodes.collect(), indexed.names_txt)
@@ -61,12 +62,14 @@ workflow {
copy_solr_to_ftp(solr_tgz)
copy_neo_to_ftp(neo_tgz)
copy_sqlite_to_ftp(sqlite)
copy_mat_queries_to_ftp(mat_queries_csvs, mat_queries_sqlite)

copy_summary_to_staging(merge_summary_jsons.out)
copy_solr_config_to_staging()
copy_solr_cores_to_staging(solr_nodes_core.concat(solr_edges_core).concat(solr_autocomplete_core))
copy_sqlite_to_staging(sqlite)
copy_neo_to_staging(neo_db)
copy_mat_queries_to_staging(mat_queries_sqlite)
}
}

@@ -457,7 +460,31 @@ process run_materialised_queries {
cp -r ${neo_db}/* ${params.neo_tmp_path}
PYTHONUNBUFFERED=true python3 ${params.home}/08_run_queries/run_queries.py \
--in-db-path ${params.neo_tmp_path} \
--out-sqlites-path materialised_queries
--out-csvs-path materialised_queries
"""
}

process csvs_to_sqlite {
cache "lenient"
memory "64 GB"
time "12h"
cpus "8"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

input:
path(csvs)

output:
path("materialised_queries.sqlite3.gz")

script:
"""
#!/usr/bin/env bash
set -Eeuo pipefail
cp -r ${neo_db}/* ${params.neo_tmp_path}
PYTHONUNBUFFERED=true python3 ${params.home}/08_run_queries/csvs_to_sqlite.py --out-sqlite-path materialised_queries.sqlite3
pigz --best materialised_queries.sqlite3
"""
}

@@ -671,6 +698,25 @@ process copy_sqlite_to_ftp {
"""
}

process copy_mat_queries_to_ftp {

cache "lenient"
memory "32 GB"
time "8h"
queue "datamover"

input:
path(csvs, sqlite)

script:
"""
#!/usr/bin/env bash
set -Eeuo pipefail
mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()}
cp -f ${csvs} ${sqlite} /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()}/
"""
}

process copy_summary_to_staging {

cache "lenient"
@@ -744,6 +790,24 @@ process copy_sqlite_to_staging {
"""
}

process copy_mat_queries_to_staging {
cache "lenient"
memory "32 GB"
time "8h"
queue "datamover"

input:
path(sqlite)

script:
"""
#!/usr/bin/env bash
set -Eeuo pipefail
mkdir -p /nfs/public/rw/ontoapps/grebi/staging/materialised_queries
cp -LR * /nfs/public/rw/ontoapps/grebi/staging/materialised_queries/
"""
}

process copy_neo_to_staging {
cache "lenient"
memory "32 GB"

0 comments on commit 0669338

Please sign in to comment.