forked from google/fhir-data-pipes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase.py
236 lines (202 loc) · 8.75 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This is the main higher level library to query FHIR resources.
The public interface of this library is intended to be independent of the actual
query engine, e.g., Spark, SQL/BigQuery, etc. The only exception is a single
function that defines the source of the data.
"""
# See https://stackoverflow.com/questions/33533148 why this is needed.
from __future__ import annotations
import typing as tp
import pandas
class ObsConstraints:
"""An abstraction layer around observation constraints for a single code.
It is assumed that the conditions generated by the `sql` function is applied
on an already flattened observation view.
"""
def __init__(
self,
code: str,
values: tp.List[str] = None,
value_sys: str = None,
min_value: float = None,
max_value: float = None,
min_time: str = None,
max_time: str = None,
) -> None:
self.code = code
self.sys_str = '="{}"'.format(value_sys) if value_sys else "IS NULL"
self.values = values
self.min_time = min_time
self.max_time = max_time
self.min_value = min_value
self.max_value = max_value
class EncounterConstraints:
"""An abstraction layer around all encounter constraints.
It is assumed that the conditions generated by the `sql` function is applied
on an already flattened encounter view.
"""
def __init__(
self,
locationId: tp.List[str] = None,
typeSystem: str = None,
typeCode: tp.List[str] = None,
):
self.location_id = locationId
self.type_system = typeSystem
self.type_code = typeCode
def has_location(self) -> bool:
return self.location_id != None
def has_type(self) -> bool:
return (self.type_code is not None) or (self.type_system is not None)
# TODO add Patient filtering criteria to this query API.
class PatientQuery:
"""The main class for specifying a patient query.
The expected usage flow is:
- The user specifies where the data comes from and what query engine should
be used, e.g., Parquet files with Spark, a SQL engine like BigQuery,
or even a FHIR server/API (future).
- Constraints are set, e.g., observation codes, values, date, etc.
- The query is run on the underlying engine and a Pandas DataFrame
is created.
- The DataFrame is fetched or more manipulation is done on it by the
library.
"""
def __init__(self, code_system: str = None):
self._code_constraint = {}
self._enc_constraint = EncounterConstraints()
self._include_all_codes = False
self._all_codes_min_time = None
self._all_codes_max_time = None
self._code_system = code_system
def include_obs_in_value_and_time_range(
self,
code: str,
min_val: float = None,
max_val: float = None,
min_time: str = None,
max_time: str = None,
) -> PatientQuery:
if code in self._code_constraint:
raise ValueError("Duplicate constraints for code {}".format(code))
self._code_constraint[code] = ObsConstraints(
code,
value_sys=self._code_system,
min_value=min_val,
max_value=max_val,
min_time=min_time,
max_time=max_time,
)
return self
def include_obs_values_in_time_range(
self,
code: str,
values: tp.List[str] = None,
min_time: str = None,
max_time: str = None,
) -> PatientQuery:
if code in self._code_constraint:
raise ValueError("Duplicate constraints for code {}".format(code))
self._code_constraint[code] = ObsConstraints(
code,
values=values,
value_sys=self._code_system,
min_time=min_time,
max_time=max_time,
)
return self
def include_all_other_codes(
self, include: bool = True, min_time: str = None, max_time: str = None
) -> PatientQuery:
self._include_all_codes = include
self._all_codes_min_time = min_time
self._all_codes_max_time = max_time
return self
def encounter_constraints(
self,
locationId: tp.List[str] = None,
typeSystem: str = None,
typeCode: tp.List[str] = None,
):
"""Specifies constraints on encounters to be included.
Note calling this erases previous encounter constraints. Any constraint
that is None is ignored.
Args:
locationId: The list of locations that should be kept or None if there
are no location constraints.
typeSystem: An string representing the type system or None.
typeCode: A list of encounter type codes that should be kept or None
if there are no type constraints.
"""
self._enc_constraint = EncounterConstraints(
locationId, typeSystem, typeCode
)
def get_patient_obs_view(
self, sample_count: tp.Optional[Int] = None
) -> pandas.DataFrame:
"""Creates a patient * observation code aggregated view.
For each patient and observation code, group all such observation and
returns some aggregated values. Loads the data if that is necessary.
Args:
sample_count: Count of records to return. Used for quick discovery.
Returns:
A Pandas DataFrame with the following columns:
- `patientId` the patient for whom the aggregation is done
- `birthDate` the patient's birth date
- `gender` the patient's gender
- `code` the code of the observation in the `code_system`
- `num_obs` number of observations with above spec
- `min_value` the minimum obs value in the specified period
or `None` if this observation does not have a numeric value.
- `max_value` the maximum obs value in the specified period
or `None`
- `min_date` the first time that an observation with the given code
was observed in the specified period.
- `max_date` ditto for last time
- `first_value` the value corresponding to `min_date`
- `last_value` the value corresponding to `max_date`
- `first_value_code` the coded value corresponding to `min_date`
- `last_value_code` the coded value corresponding to `max_date`
"""
raise NotImplementedError("This should be implemented by sub-classes!")
def get_patient_encounter_view(
self,
force_location_type_columns: bool = True,
sample_count: tp.Optional[int] = None,
) -> pandas.DataFrame:
"""Aggregates encounters for each patient based on location, type, etc.
For each patient and encounter attributes (e.g., location, type, etc.)
finds aggregate values. Loads the data if that is necessary.
Args:
force_location_type_columns: whether to include location and type
related columns regardless of the constraints. Note this can
duplicate a singleencounter to many rows if that row has multiple
locations and types.
sample_count: Count of records to return. Used for quick discovery.
Returns:
A Pandas DataFrame with the following columns:
- `patientId` the patient for whom the aggregation is done
- `locationId` the location ID of where the encounters took place;
this and the next one are provided only if there is a location
constraint or `force_location_type_columns` is `True`.
- `locationDisplay` the human readable name of the location
- `encTypeSystem` the encounter type system this and the next one are
provided only if there is a type constraint or
`force_location_type_columns` is `True`.
- `encTypeCode` the encounter type code
- `numEncounters` number of encounters with that type and location
- `firstDate` the first date such an encounter happened
- `lastDate` the last date such an encounter happened
"""
raise NotImplementedError("This should be implemented by sub-classes!")