forked from google/fhir-data-pipes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindicator_lib.py
312 lines (259 loc) · 13.2 KB
/
indicator_lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Set of functions to work with Spark DataFrames containing FHIR resources.
See test_spark.ipynb for real examples of how to create/use these functions.
"""
# TODO move common and query related parts to `query_lib.py` and only keep
# indicator calculation logic that is independent of Spark here.
from typing import List
from datetime import datetime
from dateutil import parser as date_parser
import pandas as pd
import common
def _find_age_band(birth_date: str, end_date: datetime) -> str:
"""Given the birth date, finds the age_band for PEPFAR disaggregation."""
birth = date_parser.parse(birth_date)
age = int((end_date - birth).days / 365.25)
if age < 1:
return '0-1'
if age <= 4:
return '1-4'
if age <= 9:
return '5-9'
if age <= 14:
return '10-14'
if age <= 19:
return '15-19'
if age <= 24:
return '20-24'
if age <= 49:
return '25-49'
return '50+'
def _agg_buckets(birth_date: str, gender: str, end_date: datetime) -> List[str]:
"""Generates the list of all PEPFAR disaggregation buckets."""
age_band = _find_age_band(birth_date, end_date)
return [age_band + '_' + gender, 'ALL-AGES_' + gender,
age_band + '_ALL-GENDERS', 'ALL-AGES_ALL-GENDERS']
def _gen_counts_and_ratio(temp_df: pd.DataFrame, end_date: datetime,
ind_name: str) -> pd.DataFrame:
"""Generates aggregated dataframe when supplied with patient-level df"""
temp_df['buckets'] = temp_df.apply(
lambda x: _agg_buckets(x.birthDate, x.gender, end_date), axis=1)
temp_df_exp = temp_df.explode('buckets')
temp_df_exp = temp_df_exp.groupby(
[ind_name, 'buckets'], as_index=False).count()[[
ind_name, 'buckets', 'patientId']].rename(
columns={'patientId': ind_name + '_count'})
# calculate ratio
num_patients = len(temp_df.index)
temp_df_exp[ind_name + '_ratio'] = temp_df_exp[
ind_name + '_count'] / num_patients
return temp_df_exp
def calc_TX_PVLS(patient_agg_obs: pd.DataFrame, VL_code: str,
failure_threshold: int, end_date_str: str = None) -> pd.DataFrame:
"""Calculates TX_PVLS indicator with its corresponding disaggregations.
Args:
patient_agg_obs: An output from `patient_query.find_patient_aggregates()`.
VL_code: The code for viral load values.
failure_threshold: VL count threshold of failure.
end_date_str: The string representation of the last date.
Returns:
The aggregated DataFrame with age/gender buckets.
"""
end_date = datetime.today()
if end_date_str:
end_date = date_parser.parse(end_date_str)
temp_df = patient_agg_obs[(patient_agg_obs['code'] == VL_code)].copy()
# Note the above copy is used to avoid setting a new column on a slice next:
temp_df['latest_vl_value'] = temp_df['last_value'].astype(float)
temp_df['sup_VL'] = (temp_df['latest_vl_value'] < failure_threshold)
temp_df = _gen_counts_and_ratio(temp_df, end_date, 'sup_VL')
return temp_df
def calc_TX_NEW(patient_agg_obs: pd.DataFrame, ARV_plan: str,
start_drug: List[str], end_date_str: str = None) -> pd.DataFrame:
"""Calculates TX_NEW indicator with its corresponding disaggregations.
TX_NEW indicator counts the number of adults and children newly enrolled
on antiretroviral therapy (ART) prior to the provided end-date
Args:
patient_agg_obs: A DataFrame generated by `patient_query.find_patient_aggregates()`.
ARV_plan: The concept question code for ANTIRETROVIRAL PLAN
start_drug: The concept answer codes for START DRUG
end_date_str: The string representation of the last date.
Returns:
The aggregated DataFrame with age/gender buckets.
"""
end_date = datetime.today()
if end_date_str:
end_date = date_parser.parse(end_date_str)
temp_df = patient_agg_obs[(patient_agg_obs['code'] == ARV_plan)].copy()
# Note the above copy is used to avoid setting a new column on a slice next:
temp_df['TX_NEW'] = (temp_df['last_value_code'].isin(start_drug))
temp_df = _gen_counts_and_ratio(temp_df, end_date, 'TX_NEW')
return temp_df
def calc_TX_CURR(patient_agg_obs: pd.DataFrame, ARV_plan: str,
ARV_plan_answer: List[str], end_date_str: str = None) -> pd.DataFrame:
"""Calculates TX_CURR indicator with its corresponding disaggregations.
TX_CURR indicator counts the number of adults and children currently receiving
antiretroviral therapy (ART).
Args:
patient_agg_obs: A DataFrame generated by `patient_query.find_patient_aggregates()`.
ARV_plan: The concept question code for ANTIRETROVIRAL PLAN
ARV_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
end_date_str: The string representation of the last date.
Returns:
The aggregated DataFrame with age/gender buckets.
"""
end_date = datetime.today()
if end_date_str:
end_date = date_parser.parse(end_date_str)
temp_df = patient_agg_obs[(patient_agg_obs['code'] == ARV_plan)].copy()
temp_df['TX_CURR'] = (temp_df['last_value_code'].isin(ARV_plan_answer))
temp_df = _gen_counts_and_ratio(temp_df, end_date, 'TX_CURR')
return temp_df
def calc_TB_STAT(patient_agg_obs: pd.DataFrame, TB_TX_plan: str,
ARV_plan: str, TB_plan_answer: List[str],
end_date_str: str = None) -> pd.DataFrame:
"""Calculates TB_STAT indicator with its corresponding disaggregations.
TB_STAT indicator counts the number of new and relapse TB cases with
documented HIV status
Args:
patient_agg_obs: A DataFrame generated by `patient_query.find_patient_aggregates()`.
TB_TX_plan: The concept question code for TB treatment PLAN
ARV_plan: The concept question code for ANTIRETROVIRAL PLAN
TB_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
end_date_str: The string representation of the last date.
Returns:
The aggregated DataFrame with age/gender buckets.
"""
end_date = datetime.today()
if end_date_str:
end_date = date_parser.parse(end_date_str)
# Check for TB TREATMENT PLAN (if START/RELAPSE it means diagnosis was done)
tb_tx_df = patient_agg_obs[(patient_agg_obs['code'] == TB_TX_plan)].copy()
tb_tx_df['TB_TX'] = (tb_tx_df['last_value_code'].isin(TB_plan_answer))
# check if patient is HIV +
art_tx_df = patient_agg_obs[(patient_agg_obs['code'] == ARV_plan)].copy()
art_tx_df['HIV_positive'] = (art_tx_df['last_value_code'].notnull())
# join the 3 DF
temp_df = tb_tx_df.merge(art_tx_df[['patientId', 'HIV_positive']], on='patientId')
# evaluate
temp_df['TB_STAT'] = ((temp_df['HIV_positive'] == True) & (temp_df['TB_TX'] == True))
temp_df = _gen_counts_and_ratio(temp_df, end_date, 'TB_STAT')
return temp_df
def calc_TB_ART(patient_agg_obs: pd.DataFrame, TB_TX_plan: str, ARV_plan: str,
TB_plan_answer: List[str], ARV_plan_answer: List[str],
end_date_str: str = None) -> pd.DataFrame:
"""Calculates TB_ART indicator with its corresponding disaggregations.
TB_ART indicator counts the number of ART patients screened for
TB in the semiannual reporting period who start TB treatment.
Args:
patient_agg_obs: A DataFrame generated by `patient_query.find_patient_aggregates()`.
TB_TX_plan: The concept question code for TB treatment PLAN
ARV_plan: The concept question code for ANTIRETROVIRAL PLAN
TB_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
ARV_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
end_date_str: The string representation of the last date.
Returns:
The aggregated DataFrame with age/gender buckets.
"""
end_date = datetime.today()
if end_date_str:
end_date = date_parser.parse(end_date_str)
# Check for TB TREATMENT PLAN (if START/RELAPSE it means diagnosis was done)
tb_tx_df = patient_agg_obs[(patient_agg_obs['code'] == TB_TX_plan)].copy()
tb_tx_df['TB_TX'] = (tb_tx_df['last_value_code'].isin(TB_plan_answer))
# check if patient is HIV +
art_tx_df = patient_agg_obs[(patient_agg_obs['code'] == ARV_plan)].copy()
art_tx_df['ART_TX'] = (art_tx_df['last_value_code'].isin(ARV_plan_answer))
# join the 2 DF
temp_df = tb_tx_df.merge(art_tx_df[['patientId', 'ART_TX']], on='patientId')
# evaluate
temp_df['TB_ART'] = ((temp_df['ART_TX'] == True) & (temp_df['TB_TX'] == True))
temp_df = _gen_counts_and_ratio(temp_df, end_date, 'TB_ART')
return temp_df
def calc_TB_PREV(patient_agg_obs: pd.DataFrame, TB_PREV_plan: str, ARV_plan: str,
TB_PREV_plan_answer: List[str], ART_plan_answer: List[str],
TB_CURR_plan_answer: List[str], end_date_str: str = None) -> pd.DataFrame:
"""Calculates TB_PREV indicator with its corresponding disaggregations.
TB_PREV indicator counts the number of ART patients who started on a standard course of
TB Preventive Treatment (TPT) in the previous reporting period who completed therapy
Args:
patient_agg_obs: A DataFrame generated by `patient_query.find_patient_aggregates()`.
TB_PREV_plan: The concept question code for TB prevention PLAN
ARV_plan: The concept question code for ANTIRETROVIRAL PLAN
TB_PREV_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
TB_CURR_plan_answer: The concept answer codes for COMPLETED REGIMEN
ART_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
end_date_str: The string representation of the last date.
Returns:
The aggregated DataFrame with age/gender buckets.
"""
end_date = datetime.today()
if end_date_str:
end_date = date_parser.parse(end_date_str)
# Check for TB PREVENTION PLAN (if START/RELAPSE it means diagnosis was done)
tb_prev_df = patient_agg_obs[(patient_agg_obs['code'] == TB_PREV_plan)].copy()
tb_prev_df['Completed_TB_PREV'] = (tb_prev_df['first_value_code'].isin(TB_PREV_plan_answer) &
tb_prev_df['last_value_code'].isin(TB_CURR_plan_answer)
)
# check if patient is HIV +
art_tx_df = patient_agg_obs[(patient_agg_obs['code'] == ARV_plan)].copy()
art_tx_df['ART_TX'] = (art_tx_df['last_value_code'].isin(ART_plan_answer))
# join the 2 DF
temp_df = tb_prev_df.merge(art_tx_df[['patientId', 'ART_TX']], on='patientId')
# evaluate
temp_df['TB_PREV'] = ((temp_df['ART_TX'] == True) & (temp_df['Completed_TB_PREV'] == True))
temp_df = _gen_counts_and_ratio(temp_df, end_date, 'TB_PREV')
return temp_df
def calc_TX_TB(patient_agg_obs: pd.DataFrame, TX_TB_plan: str, ARV_plan: str,
TX_TB_plan_answer: List[str], ART_plan_answer: List[str],
TB_screening: str, YES_CODE: str, end_date_str: str = None) -> pd.DataFrame:
"""Calculates TX_TB indicator with its corresponding disaggregations.
TX_TB indicator counts the number of ART patients screened for TB in
the semiannual reporting period who start TB treatment.
Args:
patient_agg_obs: A DataFrame generated by `patient_query.find_patient_aggregates()`.
TX_TB_plan: The concept question code for TB treatment PLAN
ARV_plan: The concept question code for ANTIRETROVIRAL PLAN
TB_screening: The concept question code screened for TB
YES_CODE: The concept answer codes for YES
TX_TB_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
ART_plan_answer: The concept answer codes for START DRUG, CONTINUE REGIMEN, REFILLED
end_date_str: The string representation of the last date.
Returns:
The aggregated DataFrame with age/gender buckets.
"""
end_date = datetime.today()
if end_date_str:
end_date = date_parser.parse(end_date_str)
# Check for TB TREATMENT PLAN (if START/RELAPSE it means diagnosis was done)
tb_tx_df = patient_agg_obs[(patient_agg_obs['code'] == TX_TB_plan)].copy()
tb_tx_df['TX_TB_status'] = (tb_tx_df['last_value_code'].isin(TX_TB_plan_answer))
# check if patient is on ART
art_tx_df = patient_agg_obs[(patient_agg_obs['code'] == ARV_plan)].copy()
art_tx_df['ART_TX'] = (art_tx_df['last_value_code'].isin(ART_plan_answer))
# check if patient was screened for TB
tb_screen_df = patient_agg_obs[(patient_agg_obs['code'] == TB_screening)].copy()
tb_screen_df['TB_screening'] = (tb_screen_df['last_value_code'].isin([YES_CODE]))
# join the 2 DF
temp_df = tb_tx_df.merge(art_tx_df[['patientId', 'ART_TX']],
on='patientId').merge(tb_screen_df[['patientId', 'TB_screening']],
on='patientId')
# evaluate
temp_df['TX_TB'] = ((temp_df['ART_TX'] == True) & (temp_df['TX_TB_status'] == True) &
(temp_df['TB_screening'] == True))
common.custom_log('Number of rows in TX_TB temp_df= {}'.format(
temp_df.index.size))
temp_df = _gen_counts_and_ratio(temp_df, end_date, 'TX_TB')
return temp_df