Skip to content

Commit

Permalink
Merge pull request #50 from databricks-industry-solutions/main
Browse files Browse the repository at this point in the history
writing fhir data
  • Loading branch information
zavoraad authored Aug 30, 2024
2 parents 939b72b + d3c6dcb commit ca15a87
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 13 deletions.
78 changes: 75 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,82 @@ from (select timestamp, bundleUUID, explode(MessageHeader) as messageheader from
limit 10
```

## Usage: Writing FHIR Data
## Usage: Writing FHIR Data Using No Code/Low Code

Writing FHIR is supported from Dataframes into standard FHIR schemas thanks to contributions from our partners at [XponentL Data](https://xponentl.ai/). This can be accomplished only by defining a mapping of src column to FHIR column and the export is by row as a FHIR bundle.

e.g.

``` python
from dbignite.writer.bundler import *
from dbignite.writer.fhir_encoder import *

# Create a dummy Dataframe with 2 rows of data
data = spark.createDataFrame([('CLM123', 'PAT01', 'COH123'), ('CLM345', 'PAT02', 'COH123')],['CLAIM_ID', 'PATIENT_ID', 'PATIENT_COHORT_NUM'])

# Define a mapping from DF columns to FHIR Schema, including a hardcoded value for Patient.identifier.system
maps = [Mapping('CLAIM_ID', 'Claim.id'),
Mapping('PATIENT_COHORT_NUM', 'Patient.identifier.value'),
Mapping('<url of a hardcoded system reference>', 'Patient.identifier.system', True),
Mapping('PATIENT_ID', 'Patient.id')]

# Instance of the encoder & bundle writer
# - Encoder transforms data to valid FHIR format in Spark
# - bundler maps data to json format
m = MappingManager(maps, data.schema)
b = Bundle(m)
result = b.df_to_fhir(data)

#Pretty printing the resulting RDD
import json
result.map(lambda x: json.loads(x)).foreach(lambda x: print(json.dumps(x, indent=4)))
"""
#Row 1 in FHIR format
{
"resourceType": "Bundle",
"entry": [
{
"resourceType": "Claim",
"id": "CLM123"
},
{
"resourceType": "Patient",
"id": "PAT01",
"identifier": [
{
"system": "<url of a hardcoded system reference>",
"value": "COH123"
}
]
}
]
}
#Row 2 in FHIR format
{
"resourceType": "Bundle",
"entry": [
{
"resourceType": "Claim",
"id": "CLM345"
},
{
"resourceType": "Patient",
"id": "PAT02",
"identifier": [
{
"system": "<url of a hardcoded system reference>",
"value": "COH123"
}
]
}
]
}
"""
```

For limitations and more advanced usage, see [sample notebook](https://github.com/databrickslabs/dbignite/tree/main/dbignite/writer](https://github.com/databrickslabs/dbignite/blob/main/notebooks/dbignite_patient_sample.py)


> **Warning**
> This section is under construction

## Internal Representation of a FHIR Bundle in DBIgnite

Expand Down
44 changes: 44 additions & 0 deletions dbignite/writer/bundler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json

class Bundle():

#
# Create bundles from FHIR resources
#
def __init__(self, mm):
self.mm = mm


#
# Return new FHIR resource for each row
#
def df_to_fhir(self, df):
return self._encode_to_json(self._encode_df(df))

def _encode_df(self, df):
return (df
.rdd
.map(lambda row:
list(map(lambda resourceType: self.mm.encode(row, resourceType), self.mm.fhir_resource_list())
))
)


#
# Given an RDD of rows, return
#
def _encode_to_json(self, rdd):
return (
rdd
.map(lambda row: [self._resource_to_fhir(x) for x in row])
.map(lambda row: {'resourceType': 'Bundle', 'entry': row})
.map(lambda row: json.dumps(row))
)


#
# Given an encoded row, return a single resource value
#
def _resource_to_fhir(self, resource):
return {'resourceType': list(resource.keys())[0], **resource[list(resource.keys())[0]] }

218 changes: 218 additions & 0 deletions dbignite/writer/fhir_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from pyspark.sql.types import *
from pyspark.sql.types import _infer_type
from dbignite.fhir_mapping_model import FhirSchemaModel
from itertools import groupby, chain
from collections import ChainMap


class MappingManager():

#
# Mappings
#
def __init__(self, mappings, src_schema, em = None):
self.mappings = mappings
self.src_schema = src_schema
if len([x.fhir_resource() for x in self.mappings]) > 1:
Exception("Support for only 1 FHIR resource within a mapping at a time")
self.em = em if em is not None else FhirEncoderManager()

#
# Given a tgt_resource type, return all encoded values
#
def encode(self, row, fhirResourceType):
data = [(resourceType, mappings) for resourceType, mappings in self.level(0) if resourceType[0] == fhirResourceType][0]
return self.to_fhir(data[0], data[1], row.asDict())

#
# Given a target field, get the source mapping else none
# tgt = array of ["Patient","identifier","value"]
#
def get_src(self, tgt):
return None if len([x for x in self.mappings if x.tgt == ".".join(tgt)]) == 0 else [x for x in self.mappings if x.tgt == ".".join(tgt)][0]



#
# Get the func needed to transform x to y
# @param tgt = ["Patient", "identifier", "value"]
#
#
def get_func(self, tgt):
return self.em.get_encoder(
src_type = None if self.get_src(tgt) is None else SchemaDataType.traverse_schema(self.get_src(tgt).src.split("."), self.src_schema),
tgt_name = tgt
)

#
# fhir_resources to be mapped
#
def fhir_resource_list(self):
return list(set([x.fhir_resource() for x in self.mappings]))

"""
# Return a grouping of all resources that match at a level
# @param level = numeric value starting at 0 = FHIR_RESOURCE
#
# @return - list of lists that are at the same level, each value is a tuples
# tuple - (<matcing fhir path>, <list of mapping objects matched>)
#
e.g. level=3 [(['Patient', 'identifier', 'value'], [<dbignite.writer.fhir_encoder.Mapping object at 0x104ef81f0>]),
(['Patient', 'identifier', 'system'], [<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])]
e.g. level=2 [(['Patient', 'identifier'],
[<dbignite.writer.fhir_encoder.Mapping object at 0x104ef81f0>,
<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])]
e.g. level=2 [('Pateint', 'extension'],
<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])
<dbignite.writer.fhir_encoder.Mapping object at 0x104eaef40>])]
"""

#
# Level number to match
# if mapping provided, only provide level that intersects with this mapping
#
def level(self, level, resources = None):
mappings = resources if resources is not None else self.mappings
return [(k,list(g)) for k,g, in groupby(mappings, lambda x: x.tgt.split(".")[:level+1]) if len(k) >= level+1]

def to_fhir(self, tgt_prefix, resource_list, row_dict):
field_name = tgt_prefix[-1:][0]
#converting an individual field to fhir
if len(resource_list) == 1 and ".".join(tgt_prefix) == resource_list[0].tgt:
return {
field_name: (self.get_func(tgt_prefix).f(row_dict.get(resource_list[0].src)) if resource_list[0].hardcoded == False else resource_list[0].src )
}
#converting multiple fields to a single field in FHIR
elif len(resource_list) > 1 and ".".join(tgt_prefix) == resource_list[0].tgt:
return {
field_name: self.em.get_encoder("array<" + _infer_type(type(resource_list[0].src).__name__).simpleString() + ">", tgt_prefix).f([row_dict.get(x.src) for x in resource_list if row_dict.get(x.src) is not None])
}
else:
return { field_name: self.get_func(tgt_prefix).f(
[self.to_fhir(prefix,resources, row_dict) for prefix,resources in self.level(len(tgt_prefix), resource_list)]
)}

class Mapping():

def __init__(self, src, tgt, hardcoded = False):
self.src = src
self.tgt = tgt
self.hardcoded = hardcoded

def fhir_resource(self):
return self.tgt.split(".")[0]

def __str__(self):
return "src:" + str(self.src) +", tgt:" + (self.tgt)

#
# Holds logic for a single encoding
#
class FhirEncoder():

#
# @param src - source value class type
# @param tgt - target value class type
# @param one_to_one - exact match true/false
# @param precision_loss - true/false if converting from source to target loses value
#
def __init__(self, one_to_one, precision_loss, f, default = ''):
self.one_to_one = one_to_one
self.precision_loss = precision_loss
self.f = self.handle(f)
self.default = default

def handle(self, f):
def wrapper_func(*args, **kw):
try:
return f(*args, **kw)
except:
return self.default
return wrapper_func

#
# Logic for all bindings in fhir translation logic
#
class FhirEncoderManager():
"""
A class for converting values between different types.
"""

#
# @param map - dictionary of key/value pairs for encoding values through lambda functions
# @parma override_encoders - override functions to run when encountering a tgt value,
# - e.g. patient.name.given
#
def __init__(self, map = None, override_encoders = {}, fhir_schema = FhirSchemaModel()):
self.map = map if map is not None else self.DEFAULT_ENCODERS
self.override_encoders = override_encoders
self.fhir_schema = fhir_schema

#src python binding, tgt Spark Schema.typeName() binding
DEFAULT_ENCODERS = {
"IDENTITY": FhirEncoder(True, False, lambda x: x),
"string": {
"string": FhirEncoder(True, False, lambda x: x),
"integer": FhirEncoder(False, False, lambda x: int(x.strip())),
"float": FhirEncoder(False, False, lambda x: float(x.strip())),
"double": FhirEncoder(False, False, lambda x: float(x.strip())),
"bool": FhirEncoder(False, False, lambda x: bool(x.strip())),
"array<string>": FhirEncoder(False, False, lambda x: [x])
},
"array<string>":{
"string": FhirEncoder(False, False, lambda x: ','.join(x))
},
"integer": {
"string": FhirEncoder(False, True, lambda x: str)
},
"struct": FhirEncoder(False, True, lambda l: dict(ChainMap(*l))),
"array<struct>": FhirEncoder(False, True, lambda l: [dict(ChainMap(*l))] ) #default behavior to union dictionary
}


#
# Get encoders for src->tgt type. If src_type is None, default to just getting tgt type
#
def _get_encoder(self, src_type, tgt_type):
return self.map.get("IDENTITY") if src_type == tgt_type else self.map.get(src_type, {}).get(tgt_type, self.map.get(tgt_type))

#
# @param tgt_name - target field name
# @param src_type - the source spark data type
#
def get_encoder(self, src_type, tgt_name):
return ( self.override_encoders.get('.'.join(tgt_name), None)
if self.override_encoders.get('.'.join(tgt_name), None) is not None
else self._get_encoder(src_type, SchemaDataType.traverse_schema(tgt_name[1:], self.fhir_schema.schema(tgt_name[0]))))

class SchemaDataType:

#
# field = List of.. ["Patient"
#
@staticmethod
def traverse_schema(field, struct):
if not field and type(struct) != StructField and type(struct) != StructType:
return struct.dataType
elif not field and type(struct) == StructField:
if struct.dataType.typeName() == "array":
return "array<" + struct.dataType.elementType.typeName() +">"
return struct.dataType.typeName()
elif not field and type(struct) == StructType:
return "struct"
else:
if type(struct) == StructType:
return SchemaDataType.traverse_schema(field[1:], struct[field[0]])
elif type(struct) == StructField:
return SchemaDataType.traverse_schema(field, struct.dataType)
elif type(struct) == ArrayType:
return SchemaDataType.traverse_schema(field, struct.elementType)

@staticmethod
def schema_to_python(schema):
return schema.simpleString()


12 changes: 12 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# 🔥 dbignite
__Health Data Interoperability__

This library is designed to provide a low friction entry to performing analytics on
[FHIR](https://hl7.org/fhir/bundle.html) bundles by extracting resources and flattening.

# Running Tests


``` python
python -m unittest tests/*py
```
Empty file added tests/base.py
Empty file.
14 changes: 14 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import unittest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

class PysparkBaseTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.appName("DBIgniteTests").config("spark.driver.memory", "8g").getOrCreate()

@classmethod
def tearDownClass(cls):
cls.spark.stop()

12 changes: 2 additions & 10 deletions tests/test_readers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
import unittest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

class PysparkBaseTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.appName("TestReaders").config("spark.driver.memory", "8g").getOrCreate()

@classmethod
def tearDownClass(cls):
cls.spark.stop()
from pyspark.sql.types import *
from .test_base import PysparkBaseTest

class TestReaders(PysparkBaseTest):
def test_FhirFormat(self):
Expand Down
Loading

0 comments on commit ca15a87

Please sign in to comment.