diff --git a/README.md b/README.md index 2e1ccd9..2572dee 100644 --- a/README.md +++ b/README.md @@ -267,10 +267,82 @@ from (select timestamp, bundleUUID, explode(MessageHeader) as messageheader from limit 10 ``` -## Usage: Writing FHIR Data +## Usage: Writing FHIR Data Using No Code/Low Code + +Writing FHIR is supported from Dataframes into standard FHIR schemas thanks to contributions from our partners at [XponentL Data](https://xponentl.ai/). This can be accomplished only by defining a mapping of src column to FHIR column and the export is by row as a FHIR bundle. + +e.g. + +``` python +from dbignite.writer.bundler import * +from dbignite.writer.fhir_encoder import * + +# Create a dummy Dataframe with 2 rows of data +data = spark.createDataFrame([('CLM123', 'PAT01', 'COH123'), ('CLM345', 'PAT02', 'COH123')],['CLAIM_ID', 'PATIENT_ID', 'PATIENT_COHORT_NUM']) + +# Define a mapping from DF columns to FHIR Schema, including a hardcoded value for Patient.identifier.system +maps = [Mapping('CLAIM_ID', 'Claim.id'), + Mapping('PATIENT_COHORT_NUM', 'Patient.identifier.value'), + Mapping('', 'Patient.identifier.system', True), + Mapping('PATIENT_ID', 'Patient.id')] + +# Instance of the encoder & bundle writer +# - Encoder transforms data to valid FHIR format in Spark +# - bundler maps data to json format +m = MappingManager(maps, data.schema) +b = Bundle(m) +result = b.df_to_fhir(data) + +#Pretty printing the resulting RDD +import json +result.map(lambda x: json.loads(x)).foreach(lambda x: print(json.dumps(x, indent=4))) +""" +#Row 1 in FHIR format +{ + "resourceType": "Bundle", + "entry": [ + { + "resourceType": "Claim", + "id": "CLM123" + }, + { + "resourceType": "Patient", + "id": "PAT01", + "identifier": [ + { + "system": "", + "value": "COH123" + } + ] + } + ] +} +#Row 2 in FHIR format +{ + "resourceType": "Bundle", + "entry": [ + { + "resourceType": "Claim", + "id": "CLM345" + }, + { + "resourceType": "Patient", + "id": "PAT02", + "identifier": [ + { + "system": "", + "value": "COH123" + } + ] + } + ] +} +""" +``` + +For limitations and more advanced usage, see [sample notebook](https://github.com/databrickslabs/dbignite/tree/main/dbignite/writer](https://github.com/databrickslabs/dbignite/blob/main/notebooks/dbignite_patient_sample.py) + -> **Warning** -> This section is under construction ## Internal Representation of a FHIR Bundle in DBIgnite diff --git a/dbignite/writer/bundler.py b/dbignite/writer/bundler.py new file mode 100644 index 0000000..7f7481e --- /dev/null +++ b/dbignite/writer/bundler.py @@ -0,0 +1,44 @@ +import json + +class Bundle(): + + # + # Create bundles from FHIR resources + # + def __init__(self, mm): + self.mm = mm + + + # + # Return new FHIR resource for each row + # + def df_to_fhir(self, df): + return self._encode_to_json(self._encode_df(df)) + + def _encode_df(self, df): + return (df + .rdd + .map(lambda row: + list(map(lambda resourceType: self.mm.encode(row, resourceType), self.mm.fhir_resource_list()) + )) + ) + + + # + # Given an RDD of rows, return + # + def _encode_to_json(self, rdd): + return ( + rdd + .map(lambda row: [self._resource_to_fhir(x) for x in row]) + .map(lambda row: {'resourceType': 'Bundle', 'entry': row}) + .map(lambda row: json.dumps(row)) + ) + + + # + # Given an encoded row, return a single resource value + # + def _resource_to_fhir(self, resource): + return {'resourceType': list(resource.keys())[0], **resource[list(resource.keys())[0]] } + diff --git a/dbignite/writer/fhir_encoder.py b/dbignite/writer/fhir_encoder.py new file mode 100644 index 0000000..664d759 --- /dev/null +++ b/dbignite/writer/fhir_encoder.py @@ -0,0 +1,218 @@ +from pyspark.sql.types import * +from pyspark.sql.types import _infer_type +from dbignite.fhir_mapping_model import FhirSchemaModel +from itertools import groupby, chain +from collections import ChainMap + + +class MappingManager(): + + # + # Mappings + # + def __init__(self, mappings, src_schema, em = None): + self.mappings = mappings + self.src_schema = src_schema + if len([x.fhir_resource() for x in self.mappings]) > 1: + Exception("Support for only 1 FHIR resource within a mapping at a time") + self.em = em if em is not None else FhirEncoderManager() + + # + # Given a tgt_resource type, return all encoded values + # + def encode(self, row, fhirResourceType): + data = [(resourceType, mappings) for resourceType, mappings in self.level(0) if resourceType[0] == fhirResourceType][0] + return self.to_fhir(data[0], data[1], row.asDict()) + + # + # Given a target field, get the source mapping else none + # tgt = array of ["Patient","identifier","value"] + # + def get_src(self, tgt): + return None if len([x for x in self.mappings if x.tgt == ".".join(tgt)]) == 0 else [x for x in self.mappings if x.tgt == ".".join(tgt)][0] + + + + # + # Get the func needed to transform x to y + # @param tgt = ["Patient", "identifier", "value"] + # + # + def get_func(self, tgt): + return self.em.get_encoder( + src_type = None if self.get_src(tgt) is None else SchemaDataType.traverse_schema(self.get_src(tgt).src.split("."), self.src_schema), + tgt_name = tgt + ) + + # + # fhir_resources to be mapped + # + def fhir_resource_list(self): + return list(set([x.fhir_resource() for x in self.mappings])) + + """ + # Return a grouping of all resources that match at a level + # @param level = numeric value starting at 0 = FHIR_RESOURCE + # + # @return - list of lists that are at the same level, each value is a tuples + # tuple - (, ) + # + e.g. level=3 [(['Patient', 'identifier', 'value'], []), + (['Patient', 'identifier', 'system'], [])] + + e.g. level=2 [(['Patient', 'identifier'], + [, + ])] + + e.g. level=2 [('Pateint', 'extension'], + ]) + ])] + + """ + + # + # Level number to match + # if mapping provided, only provide level that intersects with this mapping + # + def level(self, level, resources = None): + mappings = resources if resources is not None else self.mappings + return [(k,list(g)) for k,g, in groupby(mappings, lambda x: x.tgt.split(".")[:level+1]) if len(k) >= level+1] + + def to_fhir(self, tgt_prefix, resource_list, row_dict): + field_name = tgt_prefix[-1:][0] + #converting an individual field to fhir + if len(resource_list) == 1 and ".".join(tgt_prefix) == resource_list[0].tgt: + return { + field_name: (self.get_func(tgt_prefix).f(row_dict.get(resource_list[0].src)) if resource_list[0].hardcoded == False else resource_list[0].src ) + } + #converting multiple fields to a single field in FHIR + elif len(resource_list) > 1 and ".".join(tgt_prefix) == resource_list[0].tgt: + return { + field_name: self.em.get_encoder("array<" + _infer_type(type(resource_list[0].src).__name__).simpleString() + ">", tgt_prefix).f([row_dict.get(x.src) for x in resource_list if row_dict.get(x.src) is not None]) + } + else: + return { field_name: self.get_func(tgt_prefix).f( + [self.to_fhir(prefix,resources, row_dict) for prefix,resources in self.level(len(tgt_prefix), resource_list)] + )} + +class Mapping(): + + def __init__(self, src, tgt, hardcoded = False): + self.src = src + self.tgt = tgt + self.hardcoded = hardcoded + + def fhir_resource(self): + return self.tgt.split(".")[0] + + def __str__(self): + return "src:" + str(self.src) +", tgt:" + (self.tgt) + +# +# Holds logic for a single encoding +# +class FhirEncoder(): + + # + # @param src - source value class type + # @param tgt - target value class type + # @param one_to_one - exact match true/false + # @param precision_loss - true/false if converting from source to target loses value + # + def __init__(self, one_to_one, precision_loss, f, default = ''): + self.one_to_one = one_to_one + self.precision_loss = precision_loss + self.f = self.handle(f) + self.default = default + + def handle(self, f): + def wrapper_func(*args, **kw): + try: + return f(*args, **kw) + except: + return self.default + return wrapper_func + +# +# Logic for all bindings in fhir translation logic +# +class FhirEncoderManager(): + """ + A class for converting values between different types. + """ + + # + # @param map - dictionary of key/value pairs for encoding values through lambda functions + # @parma override_encoders - override functions to run when encountering a tgt value, + # - e.g. patient.name.given + # + def __init__(self, map = None, override_encoders = {}, fhir_schema = FhirSchemaModel()): + self.map = map if map is not None else self.DEFAULT_ENCODERS + self.override_encoders = override_encoders + self.fhir_schema = fhir_schema + + #src python binding, tgt Spark Schema.typeName() binding + DEFAULT_ENCODERS = { + "IDENTITY": FhirEncoder(True, False, lambda x: x), + "string": { + "string": FhirEncoder(True, False, lambda x: x), + "integer": FhirEncoder(False, False, lambda x: int(x.strip())), + "float": FhirEncoder(False, False, lambda x: float(x.strip())), + "double": FhirEncoder(False, False, lambda x: float(x.strip())), + "bool": FhirEncoder(False, False, lambda x: bool(x.strip())), + "array": FhirEncoder(False, False, lambda x: [x]) + }, + "array":{ + "string": FhirEncoder(False, False, lambda x: ','.join(x)) + }, + "integer": { + "string": FhirEncoder(False, True, lambda x: str) + }, + "struct": FhirEncoder(False, True, lambda l: dict(ChainMap(*l))), + "array": FhirEncoder(False, True, lambda l: [dict(ChainMap(*l))] ) #default behavior to union dictionary + } + + + # + # Get encoders for src->tgt type. If src_type is None, default to just getting tgt type + # + def _get_encoder(self, src_type, tgt_type): + return self.map.get("IDENTITY") if src_type == tgt_type else self.map.get(src_type, {}).get(tgt_type, self.map.get(tgt_type)) + + # + # @param tgt_name - target field name + # @param src_type - the source spark data type + # + def get_encoder(self, src_type, tgt_name): + return ( self.override_encoders.get('.'.join(tgt_name), None) + if self.override_encoders.get('.'.join(tgt_name), None) is not None + else self._get_encoder(src_type, SchemaDataType.traverse_schema(tgt_name[1:], self.fhir_schema.schema(tgt_name[0])))) + +class SchemaDataType: + + # + # field = List of.. ["Patient" + # + @staticmethod + def traverse_schema(field, struct): + if not field and type(struct) != StructField and type(struct) != StructType: + return struct.dataType + elif not field and type(struct) == StructField: + if struct.dataType.typeName() == "array": + return "array<" + struct.dataType.elementType.typeName() +">" + return struct.dataType.typeName() + elif not field and type(struct) == StructType: + return "struct" + else: + if type(struct) == StructType: + return SchemaDataType.traverse_schema(field[1:], struct[field[0]]) + elif type(struct) == StructField: + return SchemaDataType.traverse_schema(field, struct.dataType) + elif type(struct) == ArrayType: + return SchemaDataType.traverse_schema(field, struct.elementType) + + @staticmethod + def schema_to_python(schema): + return schema.simpleString() + + diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..1291581 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,12 @@ +# 🔥 dbignite +__Health Data Interoperability__ + +This library is designed to provide a low friction entry to performing analytics on +[FHIR](https://hl7.org/fhir/bundle.html) bundles by extracting resources and flattening. + +# Running Tests + + +``` python +python -m unittest tests/*py +``` diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_base.py b/tests/test_base.py new file mode 100644 index 0000000..f1946ad --- /dev/null +++ b/tests/test_base.py @@ -0,0 +1,14 @@ +import unittest +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import * + +class PysparkBaseTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.spark = SparkSession.builder.appName("DBIgniteTests").config("spark.driver.memory", "8g").getOrCreate() + + @classmethod + def tearDownClass(cls): + cls.spark.stop() + diff --git a/tests/test_readers.py b/tests/test_readers.py index 17f76ca..90e6028 100644 --- a/tests/test_readers.py +++ b/tests/test_readers.py @@ -1,16 +1,8 @@ import unittest from pyspark.sql import SparkSession from pyspark.sql.functions import * -from pyspark.sql.types import * - -class PysparkBaseTest(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.spark = SparkSession.builder.appName("TestReaders").config("spark.driver.memory", "8g").getOrCreate() - - @classmethod - def tearDownClass(cls): - cls.spark.stop() +from pyspark.sql.types import * +from .test_base import PysparkBaseTest class TestReaders(PysparkBaseTest): def test_FhirFormat(self): diff --git a/tests/test_writers.py b/tests/test_writers.py new file mode 100644 index 0000000..a89a365 --- /dev/null +++ b/tests/test_writers.py @@ -0,0 +1,69 @@ +import unittest +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import * +from .test_base import PysparkBaseTest + +class TestWriters(PysparkBaseTest): + + def test_encoder(self): + from dbignite.writer.fhir_encoder import FhirEncoder + e = FhirEncoder(False, False, lambda x: int(x.strip())) + assert( e.f("123") == 123 ) + assert( e.f("12a") is None ) + + def test_encoder_manager(self): + from dbignite.writer.fhir_encoder import FhirEncoderManager, SchemaDataType, FhirEncoder + em = FhirEncoderManager() + assert(em.get_encoder("string", ["Patient", "multipleBirthInteger"]).f("1234") == 1234) + assert(em.get_encoder("string", ["Patient", "multipleBirthInteger"]).f("abcdefg") == None) + + #test overrides + em = FhirEncoderManager(override_encoders = {"Patient.multipleBirthInteger": FhirEncoder(False, False, lambda x: float(x) + 1)}) + assert(em.get_encoder("string", ["Patient", "multipleBirthInteger"]).f("1234") == 1235) + + + def test_target_datatype(self): + from dbignite.writer.fhir_encoder import SchemaDataType + from dbignite.fhir_mapping_model import FhirSchemaModel + field = "Patient.name.given" + tgt_schema = FhirSchemaModel.custom_fhir_resource_mapping(field.split(".")[0]).schema(field.split(".")[0]) + result = SchemaDataType.traverse_schema(field.split(".")[1:], tgt_schema) + assert( result == "array") + + + def test_hardcoded_values(self): + from dbignite.writer.bundler import Bundle + from dbignite.writer.fhir_encoder import MappingManager, Mapping + data = self.spark.createDataFrame([('CLM123', 'PAT01', 'COH123'), ('CLM345', 'PAT02', 'COH123')],['CLAIM_ID', 'PATIENT_ID', 'PATIENT_COHORT_NUM']) + maps = [Mapping('CLAIM_ID', 'Claim.id'), + Mapping('PATIENT_COHORT_NUM', 'Patient.identifier.value'), + Mapping('', 'Patient.identifier.system', True), + Mapping('PATIENT_ID', 'Patient.id')] + m = MappingManager(maps, data.schema) + b = Bundle(m) + result = b._encode_df(data).take(2) + assert(result[0][1].get('Patient').get('identifier')[0].get('system') == '') + + + # + # Mapping multiple values into a single array testing + # + def test_multiple_resources_to_single_value(self): + from dbignite.writer.bundler import Bundle + from dbignite.writer.fhir_encoder import MappingManager, Mapping + # Create a dummy Dataframe with 2 rows of data + data = self.spark.createDataFrame([('CLM123', 'Emma', 'Maya'), + ('CLM345', 'E', 'J')], + ['CLAIM_ID', 'SECOND_BORN', 'FIRST_BORN']) + + maps = [Mapping('CLAIM_ID', 'Patient.id'), + Mapping('FIRST_BORN', 'Patient.name.given'), + Mapping('SECOND_BORN', 'Patient.name.given')] + + m = MappingManager(maps, data.schema) + b = Bundle(m) + result = b._encode_df(data).take(2) + assert( result[0][0].get('Patient').get('name')[0].get('given') == ['Maya', 'Emma']) + assert( result[1][0].get('Patient').get('name')[0].get('given') == ['J', 'E']) +