Skip to content

Commit

Permalink
Merge pull request #54 from elixir-europe/test/add-tranform-integr-tests
Browse files Browse the repository at this point in the history
Test/add tranform integr tests
  • Loading branch information
cmichotey authored Dec 14, 2021
2 parents c8bf637 + ea737b8 commit 717ab74
Show file tree
Hide file tree
Showing 21 changed files with 2,071 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

"document-transform": {
"@type": [ "Germplasm" ],
"entryType": "Germplasm",
"@id": "{.germplasmPUI}",
"identifier": "{.germplasmPUI}",
"name": "{.germplasmName}",

"schema:includedInDataCatalog": "{.source}",
"schema:identifier": "{.germplasmDbId}",
Expand All @@ -16,6 +19,7 @@
]
},
"schema:url": "{.documentationURL}",
"url": "{.documentationURL}",
"schema:description": {
"{join}": [
"\"",
Expand All @@ -30,6 +34,27 @@
" {.comment}"
]
},
"description": {
"{join}": [
{ "{or}": ["{.germplasmName}", "{.defaultDisplayName}"] },
" is a",
" {.genus + .species + .subtaxa}",
" ({.commonCropName})",
" accession",
" (number: {.accessionNumber})",
" managed by {.holdingInstitute.instituteName}",
".",
" {.comment}"
]
},

"species": {
"{or}": [
"{.genusSpecies}",
"{.genus}",
"{.species}"
]
},

"germplasm": {
"cropName": {
Expand Down Expand Up @@ -65,6 +90,13 @@
"{.studyURIs => .observationVariableDbIds}"
]
}
},
"node": "{.source}",
"databaseName": {
"{join}": [
"brapi@",
"{.source}"
]
}
}
}
194 changes: 137 additions & 57 deletions config/transform-elasticsearch/documents/datadiscovery_study.json
Original file line number Diff line number Diff line change
@@ -1,77 +1,146 @@
{
"document-type": "datadiscovery",
"source-entity": "study",

"document-transform": {
"@type": {
"{list}": [
"entryType": {
"{or}": [
{
"{or}": [
{
"{replace}":{
"possible_terms": [
"Genotyping", "Genotyping Study",
"allele size", "genotype"
]
},
"{with}": {
"replaced_by": "Genotyping Study"
}
},
{
"{replace}":{
"possible_terms": [
"Phenotypes", "Phenotyping", "Field Experiement",
"Greenhouse (29\u00baC/20\u00baC)", "Green house",
"Growth chamber", "Phenotyping Study", "Provenance trial",
"Half sibling progeny trial", "Clonal trial", "Progeny trial",
"Other", "Provenance and half sibling progeny trial",
"Species comparison", "Seed orchard", "Demonstration test",
"Full sibling progeny trial", "Juveniles comparison",
"Clonal archiva, clone bank", "Conservation plot",
"Biomasse test - sylvabiom", "Response study", "raw"
]
},
"{with}": {
"replaced_by": "Phenotyping Study"
}
},
"Study"
]
}
"{replace}": {
"possible_terms": [
"Genotyping",
"Genotyping Study",
"allele size",
"genotype"
]
},
"{with}": {
"replaced_by": "Genotyping Study"
}
},
{
"{replace}": {
"possible_terms": [
"",
null,
"Phenotypes",
"Phenotyping",
"Field Experiement",
"Greenhouse (29\u00baC/20\u00baC)",
"Green house",
"Growth chamber",
"Phenotyping Study",
"Provenance trial",
"Half sibling progeny trial",
"Clonal trial",
"Progeny trial",
"Other",
"Provenance and half sibling progeny trial",
"Species comparison",
"Seed orchard",
"Demonstration test",
"Full sibling progeny trial",
"Juveniles comparison",
"Clonal archiva, clone bank",
"Conservation plot",
"Biomasse test - sylvabiom",
"Response study",
"raw"
]
},
"{with}": {
"replaced_by": "Phenotyping Study"
}
},
"Study"
]
},
"@id": "{.studyPUI}",

"identifier": "{.studyPUI}",
"schema:includedInDataCatalog": "{.source}",
"schema:identifier": "{.studyDbId}",
"schema:name": {
"{or}": ["{.studyName}", "{.name}"]
"{or}": [
"{.studyName}",
"{.name}"
]
},
"name": {
"{or}": [
"{.studyName}",
"{.name}"
]
},
"schema:url": "{.documentationURL}",
"schema:description": {
"url": "{.documentationURL}",
"description": {
"{join}": [
"\"",
{ "{or}": ["{.studyName}", "{.name}"] },
"\" is a ",
{ "{or}": ["{.studyTypeName}", "{.studyType}", "study"] },
{ "{or}": [" conducted from {.startDate} to {.endDate}", " running since {.startDate}"] },
{ "{or}": [
{ "{join}": [
" (seasons: ", {"{join}": ["{.seasons.season}", "{.seasons.year}"], "{separator}": ", "}, ")"],
"{accept_none}": false },
{ "{join}": [" (seasons: ", {"{join}": "{.seasons}", "{separator}": ", "}, ")"], "{accept_none}": false }
]},
{ "{or}": [
" in {.locationURI => .locationName} ({.locationURI => .countryName})",
" in {.locationURI => .countryName}"
]},
{
"{or}": [
"{.studyName}",
"{.name}"
]
},
" is a ",
{
"{or}": [
"{.studyTypeName}",
"{.studyType}",
"study"
]
},
{
"{or}": [
" conducted from {.startDate} to {.endDate}",
" running since {.startDate}"
]
},
{
"{or}": [
{
"{join}": [
" (seasons: ",
{
"{join}": [
"{.seasons.season}",
"{.seasons.year}"
],
"{separator}": ", "
},
")"
],
"{accept_none}": false
},
{
"{join}": [
" (seasons: ",
{
"{join}": "{.seasons}",
"{separator}": ", "
},
")"
],
"{accept_none}": false
}
]
},
{
"{or}": [
" in {.locationURI => .locationName} ({.locationURI => .countryName})",
" in {.locationURI => .countryName}"
]
},
". this study is part of the {.programName} program",
".",
" {.studyDescription}"
]
},

"species": {
"{or}": [
"{.germplasmURIs => .genus + .species + .subtaxa}",
"{.germplasmURIs => .genus + .species}",
"{.germplasmURIs => .genus}"
]
},
"germplasm": {
"cropName": {
"{list}": [
Expand All @@ -81,7 +150,12 @@
"{.germplasmURIs => .genus + .species}",
"{.germplasmURIs => .genus + .species + .subtaxa}",
"{.germplasmURIs => .taxonSynonyms}"
], "{transform}": ["capitalize", "flatten", "distinct"]
],
"{transform}": [
"capitalize",
"flatten",
"distinct"
]
},
"germplasmList": {
"{flatten_distinct}": [
Expand All @@ -100,13 +174,19 @@
]
}
},

"trait": {
"observationVariableIds": {
"{flatten_distinct}": [
"{.observationVariableDbIds}"
]
}
},
"node": "{.source}",
"databaseName": {
"{join}": [
"brapi@",
"{.source}"
]
}
}
}
63 changes: 33 additions & 30 deletions etl/transform/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,43 +132,49 @@ def validate_documents(document_tuples, validation_schemas, logger):
logger.debug(f"Validated {document_count} documents.")


def generate_bulk_headers(document_tuples):
def dump_clean_in_json_files(source_dir, logger, documents_tuples):
"""
Consumes an iterable of document type and document tuples and produces an iterable of tuples
(of bulk index header and document)
Consumes an iterable of document tuples and clean email
"""
for document_type, document in document_tuples:
document_id = document['@id']
bulk_header = {'index': {'_type': document_type, '_id': document_id}}
yield bulk_header, document


def dump_in_bulk_files(source_bulk_dir, logger, documents_tuples):
"""
Consumes an iterable of header and document tuples and dump into the JSONSplitStore
"""
logger.debug("Saving documents to bulk files...")
logger.debug("Saving documents to json files...")

json_stores = dict()
json_dict = dict()
document_count = 0
for document_header, document in documents_tuples:
document_type = document_header['index']['_type']
if document_type not in json_stores:
json_stores[document_type] = JSONSplitStore(source_bulk_dir, document_type)

json_store = json_stores[document_type]
# Hide email
if ("email" in document):
document["email"]= document["email"].replace('@', '_')

if ("contacts" in document):
for contact in document["contacts"]:
if "email" in contact :
contact["email"]= contact["email"].replace('@', '_')

if document_header not in json_dict:
json_dict[document_header] = []

json_dict[document_header].append(document)

document_count += 1
if is_checkpoint(document_count):
logger.debug(f"checkpoint: {document_count} documents saved")

# Dump batch of headers and documents in bulk file
json_store.dump(document_header, document)
save_json(source_dir, json_dict)

logger.debug(f"Total of {document_count} documents saved in json files.")

# Close all json stores
for json_store in json_stores.values():
json_store.close()
logger.debug(f"Total of {document_count} documents saved in bulk files.")

def save_json(source_dir, json_dict):
for type, document in json_dict.items():
file_number = 1
saved_documents = 0
while saved_documents < len(document):
with open(source_dir + "/" + type + '-' + str(file_number) + '.json', 'w') as f:
json.dump(document[saved_documents:file_number*10000], f, ensure_ascii=False)
f.close()
file_number += 1
saved_documents += 10000


def get_document_configs_by_entity(document_configs):
Expand Down Expand Up @@ -273,11 +279,8 @@ def transform_source(source, transform_config, source_json_dir, source_bulk_dir,
# Validate the document schemas
validated_documents = validate_documents(documents, validation_schemas, logger)

# Generate Elasticsearch bulk headers before each documents
documents_with_headers = generate_bulk_headers(validated_documents)

# Write the documents in bulk files
dump_in_bulk_files(source_bulk_dir, logger, documents_with_headers)
# Write the documents in jsonfiles
dump_clean_in_json_files(source_bulk_dir, logger, validated_documents)
# shutil.rmtree(tmp_index_dir, ignore_errors=True)

logger.info(f"SUCCEEDED Transforming BrAPI {source_name}.")
Expand Down
4 changes: 4 additions & 0 deletions etl/transform/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ def get_in_index(link_id):
# upper() to solve case sensitive issues (eg. "ea00371") in WUR data
return uri_index[link_id.upper()].decode()
except KeyError as e:
print(
f"Could not find '{alias or linked_entity}' with id '{link_id}' "
f"found in '{link_path}' of object:\n{data}"
)
raise MissingDataLink(
f"Could not find '{alias or linked_entity}' with id '{link_id}' "
f"found in '{link_path}' of object:\n{data}"
Expand Down
Empty file.
Loading

0 comments on commit 717ab74

Please sign in to comment.