diff --git a/exampleUsage.js b/exampleUsage.js index 21444ff..e87973d 100644 --- a/exampleUsage.js +++ b/exampleUsage.js @@ -3,12 +3,13 @@ let caching = new CacheFhirToES({ ESBaseURL: 'http://localhost:9200', ESUsername: '', ESPassword: '', - ESMaxCompilationRate: '10000/1m', + ESMaxCompilationRate: '60000/1m', FHIRBaseURL: 'http://localhost:8081/hapi4/fhir', FHIRUsername: '', FHIRPassword: '', - relationshipsIDs: ["ihris-es-report-mhero-send-message"], //if not specified then all relationships will be processed - reset: true + // relationshipsIDs: ["ihris-es-report-mhero-send-message"], //if not specified then all relationships will be processed + relationshipsIDs: ["testgroup"], //if not specified then all relationships will be processed + reset: false }) caching.cache().then(() => { console.log('Done') diff --git a/package.json b/package.json index 37ae724..36de927 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fhir2es", - "version": "1.3.0", + "version": "2.0.0", "description": "Reads and caches fhir data to elasticsearch", "main": "reports.js", "scripts": { @@ -11,19 +11,23 @@ "url": "git+https://github.com/intrahealth/fhir2es.git" }, "keywords": [ + "fhir to elasticsearch", + "fhir2es", "fhir", - "elasticsearch" + "elasticsearch", + "elasticsearch caching" ], - "author": "Ally Shaban and Luke Duncan", + "author": "Ally Shaban", "license": "ISC", "bugs": { - "url": "https://github.com/iHRIS/fhir2es/issues" + "url": "https://github.com/intrahealth/fhir2es/issues" }, "homepage": "https://github.com/intrahealth/fhir2es", "dependencies": { "async": "^3.2.0", "axios": "^0.19.2", "fhir": "^4.7.3", + "fhirpath": "^2.6.2", "lodash": "^4.17.15", "moment": "^2.27.0", "urijs": "^1.19.2", diff --git a/reports.js b/reports.js index 6cd8dec..4143f47 100644 --- a/reports.js +++ b/reports.js @@ -6,9 +6,8 @@ const logger = require('./winston') const fs = require('fs'); const URI = require('urijs'); const _ = require('lodash'); -const Fhir = require('fhir').Fhir; - -const fhir = new Fhir(); +const fhirpath = require('fhirpath'); +const { split } = require('lodash'); class CacheFhirToES { constructor({ ESBaseURL, @@ -107,7 +106,12 @@ class CacheFhirToES { } (async () => { if (Array.isArray(value)) { - let val = await this.getElementValFromExtension(value, element) + let val + try { + val = await this.getElementValFromExtension(value, element) + } catch (error) { + logger.error(error); + } if (val) { elementValue = val } @@ -125,44 +129,44 @@ class CacheFhirToES { }) } - getImmediateLinks(orderedResources, links, callback) { - if (orderedResources.length - 1 === links.length) { - return callback(orderedResources); + getImmediateLinks(links, callback) { + if (this.orderedResources.length - 1 === links.length) { + return callback(this.orderedResources); } let promises = []; for (let link of links) { promises.push( new Promise((resolve, reject) => { link = this.flattenComplex(link.extension); - let parentOrdered = orderedResources.find(orderedResource => { + let parentOrdered = this.orderedResources.find(orderedResource => { let linkToResource = link.linkTo.split('.').shift() return orderedResource.name === linkToResource; }); - let exists = orderedResources.find(orderedResource => { + let exists = this.orderedResources.find(orderedResource => { return JSON.stringify(orderedResource) === JSON.stringify(link); }); if (parentOrdered && !exists) { - orderedResources.push(link); + this.orderedResources.push(link); } resolve(); }) ); } Promise.all(promises).then(() => { - if (orderedResources.length - 1 !== links.length) { - this.getImmediateLinks(orderedResources, links, orderedResources => { - return callback(orderedResources); + if (this.orderedResources.length - 1 !== links.length) { + this.getImmediateLinks(links, () => { + return callback(); }); } else { - return callback(orderedResources); + return callback(); } }); }; getReportRelationship(callback) { let url = URI(this.FHIRBaseURL) - .segment('Basic'); - url.addQuery('code', 'iHRISRelationship'); + .segment('Basic') + .addQuery('code', 'iHRISRelationship'); for(let relationship of this.relationshipsIDs) { url.addQuery('_id', relationship); } @@ -222,6 +226,10 @@ class CacheFhirToES { axios({ url: URI(this.ESBaseURL).segment('syncdata').segment("_update_by_query").toString(), method: 'POST', + auth: { + username: this.ESUsername, + password: this.ESPassword + }, data: { script: { lang: "painless", @@ -250,11 +258,6 @@ class CacheFhirToES { getLastIndexingTime() { return new Promise((resolve, reject) => { logger.info('Getting lastIndexingTime') - if(this.reset) { - logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00') - this.lastIndexingTime = '1970-01-01T00:00:00' - return resolve() - } axios({ method: "GET", url: URI(this.ESBaseURL).segment('syncdata').segment("_search").toString(), @@ -263,6 +266,11 @@ class CacheFhirToES { password: this.ESPassword } }).then((response) => { + if(this.reset) { + logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00') + this.lastIndexingTime = '1970-01-01T00:00:00' + return resolve() + } if(response.data.hits.hits.length === 0) { logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00') this.lastIndexingTime = '1970-01-01T00:00:00' @@ -273,6 +281,7 @@ class CacheFhirToES { return resolve() }).catch((err) => { if (err.response && err.response.status && err.response.status === 404) { + this.lastIndexingTime = '1970-01-01T00:00:00' logger.info('Index not found, creating index syncData'); let mappings = { mappings: { @@ -297,13 +306,16 @@ class CacheFhirToES { logger.error('Something went wrong and index was not created'); logger.error(response.data); logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00') - this.lastIndexingTime = '1970-01-01T00:00:00' return reject() } else { logger.info('Index syncdata created successfully'); logger.info('Adding default lastIndexTime which is 1970-01-01T00:00:00') axios({ method: 'PUT', + auth: { + username: this.ESUsername, + password: this.ESPassword, + }, url: URI(this.ESBaseURL).segment('syncdata').segment("_doc").segment("518fad1f-3ab5-4488-a099-e7b6ab335bc1").toString(), data: { "lastIndexingTime": "1970-01-01T00:00:00" @@ -313,19 +325,28 @@ class CacheFhirToES { logger.info('Default lastIndexTime added') } else { logger.error('An error has occured while saving default lastIndexTime'); + return reject() } logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00') - this.lastIndexingTime = '1970-01-01T00:00:00' - return reject() - }).catch(() => { + return resolve() + }).catch((err) => { logger.error('An error has occured while saving default lastIndexTime'); + if (err.response && err.response.data) { + logger.error(err.response.data); + } + if (err.error) { + logger.error(err.error); + } + if (!err.response) { + logger.error(err); + } + return reject() }) } }) .catch(err => { logger.error('Error: ' + err); logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00') - this.lastIndexingTime = '1970-01-01T00:00:00' return reject() }); } else { @@ -448,6 +469,114 @@ class CacheFhirToES { }) }; + getResourcesFields(resources) { + let fields = [] + for(let resource of resources) { + if(resource["http://ihris.org/fhir/StructureDefinition/iHRISReportElement"]) { + for (let element of resource["http://ihris.org/fhir/StructureDefinition/iHRISReportElement"]) { + let fieldLabel + let fhirPathValue + let fieldAutogenerated = false + for (let el of element) { + let value = ''; + for (let key of Object.keys(el)) { + if (key !== 'url') { + value = el[key]; + } + } + if (el.url === "label") { + let fleldChars = value.split(' ') + //if label has space then format it + if (fleldChars.length > 1) { + fieldLabel = value.toLowerCase().split(' ').map(word => word.replace(word[0], word[0].toUpperCase())).join(''); + } else { + fieldLabel = value + } + } else if (el.url === "name") { + fhirPathValue = value + } else if (el.url === "autoGenerated") { + fieldAutogenerated = value + } + } + fields.push({ + resourceName: resource.name, + resourceType: resource.resource, + field: fieldLabel, + fhirPath: fhirPathValue, + fieldAutogenerated + }) + } + fields.push({ + resourceName: resource.name, + resourceType: resource.resource, + field: resource.name, + fhirPath: "id", + fieldAutogenerated: false + }) + } + } + return fields; + } + + getChildrenResources(resourceName) { + let childrenResources = [] + for(let orderedResource of this.orderedResources) { + if(orderedResource.linkTo === resourceName || (orderedResource.linkTo && orderedResource.linkTo.startsWith(resourceName + '.'))) { + childrenResources.push(orderedResource) + let grandChildren = this.getChildrenResources(orderedResource.name) + childrenResources = childrenResources.concat(grandChildren) + } + } + return childrenResources + } + + getESDocument(index, query, callback) { + let error = false + let documents = [] + if(!query) { + query = {} + } + query.size = 10000 + let url = URI(this.ESBaseURL).segment(index).segment('_search').addQuery('scroll', '1m').toString() + let scroll_id = null + async.doWhilst( + (callback) => { + axios({ + method: 'GET', + url, + data: query, + auth: { + username: this.ESUsername, + password: this.ESPassword + } + }).then((response) => { + if(response.data.hits.hits.length === 0 || !response.data._scroll_id) { + scroll_id = null + } else { + scroll_id = response.data._scroll_id + documents = documents.concat(response.data.hits.hits) + url = URI(this.ESBaseURL).segment('_search').segment('scroll').toString() + query = { + scroll_id: scroll_id + } + } + return callback(null) + }).catch((err) => { + error = err + logger.error(err); + scroll_id = null + return callback(null) + }) + }, + (callback) => { + return callback(null, scroll_id !== null) + }, + () => { + return callback(error, documents) + } + ) + } + deleteESDocument(query, index) { return new Promise(async(resolve, reject) => { // await this.refreshIndex(index); @@ -490,7 +619,8 @@ class CacheFhirToES { }) }; - async updateESDocument(body, record, index, orderedResource, resourceId, multiple, tryDeleting, callback) { + async updateESDocument(body, record, index, orderedResource, resourceData, tryDeleting, callback) { + let multiple = orderedResource.multiple // await this.refreshIndex(index); //this handles records that should be deleted instead of its fields being truncated let recordDeleted = false; @@ -500,22 +630,18 @@ class CacheFhirToES { if(!orderedResource.hasOwnProperty('linkElement') || !multiple) { return callback(null) } - let url = URI(this.ESBaseURL).segment(index).segment('_search').addQuery('size', 10000).toString() - axios({ - method: 'GET', - url, - auth: { - username: this.ESUsername, - password: this.ESPassword, - }, - data: { - query: { - terms: body.query.terms - } + let query = { + query: { + terms: body.query.terms + } + } + this.getESDocument(index, query, async(err, documents) => { + if(err) { + logger.error('Req Data: ' + JSON.stringify(query,0,2)); + return callback(null) } - }).then(async (response) => { //if field values for this record needs to be truncated and there are multiple records of the parent, then delete the one we are truncating instead of updating - if(response.data.hits.hits.length > 1 && tryDeleting) { + if(documents.length > 1 && tryDeleting) { let recordFields = Object.keys(record) let idField = recordFields[recordFields.length - 1] let termField = Object.keys(body.query.terms)[0] @@ -536,12 +662,17 @@ class CacheFhirToES { } must2.terms[idField] = [record[idField]] delQry.query.bool.must.push(must2) - await this.deleteESDocument(delQry, index) + try { + await this.deleteESDocument(delQry, index) + } catch (error) { + logger.error(error); + } recordDeleted = true; } if(recordDeleted) { return callback(null) } + //because this resource supports multiple rows, then we are trying to add new rows let newRowBody = {} // take the last field because it is the ID let recordFields = Object.keys(record) @@ -549,11 +680,11 @@ class CacheFhirToES { for(let linkField in body.query.terms) { for(let index in body.query.terms[linkField]) { // create new row only if there is no checkField or checkField exist but it is different - let updateThis = response.data.hits.hits.find((hit) => { + let updateThis = documents.find((hit) => { return hit['_source'][linkField] === body.query.terms[linkField][index] && (!hit['_source'][checkField] || hit['_source'][checkField] === record[checkField]) }) if(!updateThis) { - let hit = response.data.hits.hits.find((hit) => { + let hit = documents.find((hit) => { return hit['_source'][linkField] === body.query.terms[linkField][index] }) if(!hit) { @@ -583,14 +714,12 @@ class CacheFhirToES { return callback(null) }).catch((err) => { logger.error(err); + logger.error('Req Data: ' + JSON.stringify(newRowBody,0,2)); return callback(null) }) } else { return callback(null) } - }).catch((err) => { - logger.error(err); - return callback(null) }) }, updateRow: (callback) => { @@ -650,7 +779,7 @@ class CacheFhirToES { if (err.response && (err.response.statusText === 'Conflict' || err.response.status === 409)) { logger.warn('Conflict occured, rerunning this request'); setTimeout(() => { - this.updateESDocument(body, record, index, orderedResource, resourceId, multiple, tryDeleting, () => { + this.updateESDocument(body, record, index, orderedResource, resourceData, tryDeleting, () => { return callback(null) }) }, 2000) @@ -665,6 +794,7 @@ class CacheFhirToES { if (!err.response) { logger.error(err); } + logger.error('Req Data: ' + JSON.stringify(updBodyData,0,2)); return callback(null) } }); @@ -681,7 +811,7 @@ class CacheFhirToES { }).then(response => { // if nothing was updated and its from the primary (top) resource then create as new if (response.data.updated == 0 && !orderedResource.hasOwnProperty('linkElement')) { - logger.info('No record with id ' + resourceId + ' found on elastic search, creating new'); + logger.info('No record with id ' + resourceData.id + ' found on elastic search, creating new'); let url = URI(this.ESBaseURL) .segment(index) .segment('_doc') @@ -701,6 +831,7 @@ class CacheFhirToES { .catch(err => { logger.error('Error occured while saving document into ES'); logger.error(err); + logger.error('Req Data: ' + JSON.stringify(record,0,2)); return callback(null) }); } else { @@ -710,7 +841,7 @@ class CacheFhirToES { if (err.response && (err.response.statusText === 'Conflict' || err.response.status === 409)) { logger.warn('Conflict occured, rerunning this request'); setTimeout(() => { - this.updateESDocument(body, record, index, orderedResource, resourceId, multiple, tryDeleting, () => { + this.updateESDocument(body, record, index, orderedResource, resourceData, tryDeleting, () => { return callback(null) }) }, 2000) @@ -725,6 +856,7 @@ class CacheFhirToES { if (!err.response) { logger.error(err); } + logger.error('Req Data: ' + JSON.stringify(bodyData,0,2)); return callback(null) } }); @@ -732,6 +864,90 @@ class CacheFhirToES { }, () => { return callback(null) }) + }, + cleanBrokenLinks: (callback) => { + if(!orderedResource.hasOwnProperty('linkElement') || resourceData.meta.versionId == '1') { + return callback(null) + } + // get all documents that doesnt meet the search terms but are linked to this resource and truncate + let qry = { + query: { + bool: { + must_not: [], + must: [] + } + } + } + qry.query.bool.must_not = [{ + terms: body.query.terms + }] + let recordFields = Object.keys(record) + let idField = recordFields[recordFields.length - 1] + let term = {} + term[idField] = record[idField] + qry.query.bool.must = [{ + term + }] + let childrenResources = this.getChildrenResources(orderedResource.name); + childrenResources.unshift(orderedResource) + let fields = this.getResourcesFields(childrenResources) + let ctx = '' + for(let field of fields) { + ctx += 'ctx._source.' + field.field + "=null;"; + } + this.getESDocument(index, qry, (err, documents) => { + let ids = [] + for(let hit of documents) { + ids.push(hit._id) + } + if(ids.length > 0) { + let body = { + script: { + lang: 'painless', + source: ctx + }, + query: { + terms: { + _id: ids + } + }, + }; + let url = URI(this.ESBaseURL).segment(index).segment('_update_by_query').addQuery('conflicts', 'proceed').toString(); + axios({ + method: 'post', + url, + data: body, + auth: { + username: this.ESUsername, + password: this.ESPassword, + }, + }).then(response => { + return callback(null) + }).catch(err => { + if (err.response && (err.response.statusText === 'Conflict' || err.response.status === 409)) { + logger.warn('Conflict occured, rerunning this request'); + setTimeout(() => { + logger.error('rerun on conflict is not yet implemented'); + return callback(null) + }, 2000) + } else { + logger.error('Error Occured while truncating ES documents'); + if (err.response && err.response.data) { + logger.error(err.response.data); + } + if (err.error) { + logger.error(err.error); + } + if (!err.response) { + logger.error(err); + } + return callback(null) + } + }) + } else { + return callback(null) + } + }) } }, () => { return callback() @@ -740,7 +956,11 @@ class CacheFhirToES { cache() { return new Promise(async(resolve) => { - await this.getLastIndexingTime() + try { + await this.getLastIndexingTime() + } catch (error) { + logger.error(error); + } let newLastIndexingTime = moment() .subtract('1', 'minutes') .format('Y-MM-DDTHH:mm:ss'); @@ -752,7 +972,7 @@ class CacheFhirToES { logger.error('invalid resource returned'); return; } - async.each(relationships.entry, (relationship, nxtRelationship) => { + async.eachSeries(relationships.entry, (relationship, nxtRelationship) => { logger.info('processing relationship ID ' + relationship.resource.id); relationship = relationship.resource; let details = relationship.extension.find(ext => ext.url === 'http://ihris.org/fhir/StructureDefinition/iHRISReportDetails'); @@ -825,9 +1045,9 @@ class CacheFhirToES { } } reportDetails = this.flattenComplex(details.extension); - let orderedResources = []; + this.orderedResources = []; // reportDetails.resource = subject._type; - orderedResources.push(reportDetails); + this.orderedResources.push(reportDetails); IDFields.push(reportDetails.name); this.updateESCompilationsRate(() => { this.createESIndex(reportDetails.name, IDFields, err => { @@ -836,8 +1056,8 @@ class CacheFhirToES { return nxtRelationship(); } logger.info('Done creating ES Index'); - this.getImmediateLinks(orderedResources, links, () => { - async.eachSeries(orderedResources, (orderedResource, nxtResourceType) => { + this.getImmediateLinks(links, () => { + async.eachSeries(this.orderedResources, (orderedResource, nxtResourceType) => { let processedRecords = [] this.count = 1; let url = URI(this.FHIRBaseURL) @@ -880,9 +1100,15 @@ class CacheFhirToES { return callback(null, false) }); }, async() => { - await this.refreshIndex(reportDetails.name); + try { + await this.refreshIndex(reportDetails.name); + } catch (error) { + logger.error(error); + } logger.info('Done Writting resource data for resource ' + orderedResource.name + ' into elastic search'); - return nxtResourceType() + this.fixDataInconsistency(reportDetails, orderedResource, () => { + return nxtResourceType() + }) } ); }, () => { @@ -928,32 +1154,25 @@ class CacheFhirToES { for (let query of queries) { let limits = query.split('='); let limitParameters = limits[0]; - let limitValue = limits[1]; - if (!limitValue) { + let limitValue = this.dataTypeConversion(limits[1]); + if (!limitValue && limitValue !== false) { limitValue = '' } - let resourceValue = fhir.evaluate(data.resource, limitParameters); + let resourceValue + try { + resourceValue = fhirpath.evaluate(data.resource, limitParameters); + } catch (error) { + logger.error(error); + } if (Array.isArray(resourceValue) && !resourceValue.includes(limitValue)) { - //if this entry was previousely added and now doesnt meet filters then delete - if(processed) { - deleteRecord = true - } else { - return nxtResource(); - } + //delete this entry as it is no longer meet filters + deleteRecord = true } else if (limitValue && !resourceValue) { - //if this entry was previousely added and now doesnt meet filters then delete - if(processed) { - deleteRecord = true - } else { - return nxtResource(); - } + //delete this entry as it is no longer meet filters + deleteRecord = true } else if (resourceValue.toString() != limitValue.toString()) { - //if this entry was previousely added and now doesnt meet filters then delete - if(processed) { - deleteRecord = true - } else { - return nxtResource(); - } + //delete this entry as it is no longer meet filters + deleteRecord = true } } let record = {}; @@ -984,10 +1203,19 @@ class CacheFhirToES { fieldAutogenerated = value } } - let displayData = fhir.evaluate(data.resource, fieldName); + let displayData + try { + displayData = fhirpath.evaluate(data.resource, fieldName); + } catch (error) { + logger.error(error); + } let value if ((!displayData || (Array.isArray(displayData) && displayData.length === 1 && displayData[0] === undefined)) && data.resource.extension) { - value = await this.getElementValFromExtension(data.resource.extension, fieldName) + try { + value = await this.getElementValFromExtension(data.resource.extension, fieldName) + } catch (error) { + logger.error(error); + } } else if (Array.isArray(displayData) && displayData.length === 1 && displayData[0] === undefined) { value = undefined } else if (Array.isArray(displayData)) { @@ -1000,7 +1228,12 @@ class CacheFhirToES { if (value.reference && fieldAutogenerated) { value = value.reference } else if (value.reference && !fieldAutogenerated) { - let referencedResource = await this.getResourceFromReference(value.reference); + let referencedResource + try { + referencedResource = await this.getResourceFromReference(value.reference); + } catch (error) { + logger.error(error); + } if (referencedResource) { value = referencedResource.name } @@ -1012,6 +1245,8 @@ class CacheFhirToES { value = data.resource.resourceType + '/' + value } record[fieldLabel] = value + } else { + record[fieldLabel] = null } } } @@ -1019,7 +1254,12 @@ class CacheFhirToES { let match = {}; if (orderedResource.hasOwnProperty('linkElement')) { let linkElement = orderedResource.linkElement.replace(orderedResource.resource + '.', ''); - let linkTo = fhir.evaluate(data.resource, linkElement); + let linkTo + try { + linkTo = fhirpath.evaluate(data.resource, linkElement); + } catch (error) { + logger.error(error); + } if (linkElement === 'id') { linkTo = orderedResource.resource + '/' + linkTo } @@ -1048,12 +1288,20 @@ class CacheFhirToES { } record[field] = recordFieldArr.join(''); } - if(deleteRecord) { - ctx += 'ctx._source.' + field + "='';"; + if(deleteRecord || !record[field]) { + ctx += 'ctx._source.' + field + "='null';"; } else { ctx += 'ctx._source.' + field + "='" + record[field] + "';"; } } + // truncate fields of any other resources that are linked to this resource + if(deleteRecord) { + let childrenResources = this.getChildrenResources(orderedResource.name); + let fields = this.getResourcesFields(childrenResources) + for(let field of fields) { + ctx += 'ctx._source.' + field.field + "=null;"; + } + } let body = { script: { @@ -1064,18 +1312,21 @@ class CacheFhirToES { terms: match , }, }; - let multiple = orderedResource.multiple if(!deleteRecord) { - this.updateESDocument(body, record, reportDetails.name, orderedResource, data.resource.id, multiple, deleteRecord, () => { + this.updateESDocument(body, record, reportDetails.name, orderedResource, data.resource, deleteRecord, () => { return nxtResource(); }) } else { //if this is the primary resource then delete the whole document, otherwise delete respective fields data if(!orderedResource.hasOwnProperty('linkElement')) { - await this.deleteESDocument({query: body.query}, reportDetails.name) + try { + await this.deleteESDocument({query: body.query}, reportDetails.name) + } catch (error) { + logger.error(error); + } return nxtResource(); } else { - this.updateESDocument(body, record, reportDetails.name, orderedResource, data.resource.id, multiple, deleteRecord, () => { + this.updateESDocument(body, record, reportDetails.name, orderedResource, data.resource, deleteRecord, () => { return nxtResource(); }) } @@ -1085,6 +1336,163 @@ class CacheFhirToES { return callback() }); } + + fixDataInconsistency(reportDetails, orderedResource, callback) { + //these must be run in series + async.series({ + //this fix missing data i.e __location_link is available but location is missing + fixMissing: (callback) => { + let query = { + query: { + bool: { + must_not: { + exists: { + field: orderedResource.name + } + } + } + } + } + runCleaner(query, false, this, () => { + return callback(null) + }) + }, + //this fix invalid data i.e __location_link not equal to location, and location is always invalid, not __location_link + differences: (callback) => { + if(!orderedResource.linkElement) { + return callback(null) + } + let query = { + query: { + bool: { + must: { + script: { + script: { + source: `doc['__${orderedResource.name}_link'].value != doc['${orderedResource.name}'].value`, + lang: "painless" + } + } + } + } + } + } + runCleaner(query, true, this, () => { + return callback(null) + }) + } + }, () => { + return callback() + }) + + function runCleaner(query, ignoreReverseLinked, me, callback) { + try { + me.getESDocument(reportDetails.name, query, (err, documents) => { + if(documents.length === 0) { + return callback(null) + } + /** + * this is for reversed linked resources i.e Practitioner and PractitionerRole + */ + let reverseLink = false + // end of reversed linked resources + let resIds = '' + for(let doc of documents) { + logger.error(doc._source['__' + orderedResource.name + '_link']); + if(doc._source['__' + orderedResource.name + '_link']) { + if(!resIds) { + let linkResType = doc._source['__' + orderedResource.name + '_link'].split('/')[0] + if(linkResType !== orderedResource.resource) { + reverseLink = true + if(ignoreReverseLinked) { + break; + } + } + } + if(!resIds) { + resIds = doc._source['__' + orderedResource.name + '_link'] + } else { + resIds += ',' + doc._source['__' + orderedResource.name + '_link'] + } + } else { + logger.error(JSON.stringify(doc,0,2)); + logger.error('There is a serious data inconsistency that needs to be addressed on index ' + reportDetails.name + ' and index id ' + doc._id + ', field ' + '__' + orderedResource.name + '_link' + ' is missing'); + continue; + } + } + if(!resIds || (ignoreReverseLinked && reverseLink)) { + return callback() + } + let processedRecords = [] + me.count = 1; + let url = URI(me.FHIRBaseURL) + .segment(orderedResource.resource) + .addQuery('_count', 200) + if(!reverseLink) { + url = url.addQuery('_id', resIds) + } else { + url = url.addQuery(orderedResource.linkElementSearchParameter, resIds) + } + logger.error(resIds + ' ' + url.toString()); + url = url.toString() + async.whilst( + (callback) => { + return callback(null, url !== null) + }, + (callback) => { + axios.get(url, { + withCredentials: true, + auth: { + username: me.FHIRUsername, + password: me.FHIRPassword, + }, + }).then(response => { + me.totalResources = response.data.total; + url = null; + const next = response.data.link.find( + link => link.relation === 'next' + ); + if (next) { + url = next.url + } + if (response.data.total > 0 && response.data.entry && response.data.entry.length > 0) { + me.processResource(response.data.entry, orderedResource, reportDetails, processedRecords, () => { + return callback(null, url); + }) + } else { + return callback(null, url); + } + }).catch((err) => { + logger.error('Error occured while getting resource data'); + logger.error(err); + return callback(null, null) + }) + }, + async() => { + try { + await me.refreshIndex(reportDetails.name); + } catch (error) { + logger.error(error); + } + return callback() + } + ) + }) + } catch (error) { + logger.error(error); + return callback() + } + } + } + + dataTypeConversion(value) { + var v = Number (value); + return !isNaN(v) ? v : + value === "undefined" ? undefined + : value === "null" ? null + : value === "true" ? true + : value === "false" ? false + : value + } } module.exports = { CacheFhirToES