Skip to content

Commit

Permalink
Merge pull request #158 from lifeomic/data-lake-easy
Browse files Browse the repository at this point in the history
Add DataLake to easy modules
  • Loading branch information
rcdilorenzo authored Sep 21, 2021
2 parents f67401e + c066154 commit 0638200
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 1 deletion.
2 changes: 2 additions & 0 deletions phc/easy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from phc.easy.composition import Composition
from phc.easy.condition import Condition
from phc.easy.consent import Consent
from phc.easy.data_lake import DataLake
from phc.easy.diagnostic_report import DiagnosticReport
from phc.easy.document_reference import DocumentReference
from phc.easy.encounter import Encounter
Expand Down Expand Up @@ -55,6 +56,7 @@
"Composition",
"Condition",
"Consent",
"DataLake",
"DiagnosticReport",
"DocumentReference",
"Encounter",
Expand Down
104 changes: 104 additions & 0 deletions phc/easy/data_lake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import re
from typing import Callable, Optional
import pandas as pd
import warnings
from funcy import identity
from glob import glob

from phc.util import DataLakeQuery as DataLakeServiceQuery
from phc.services import Analytics
from phc.easy.auth import Auth
from phc.easy.util.api_cache import APICache
from phc.easy.util.api_cache import DIR as _API_CACHE_DIR

API_CACHE_DIR = os.path.expanduser(_API_CACHE_DIR)


class DataLake:
@classmethod
def execute_sql(
cls,
sql: str,
transform: Callable[[pd.DataFrame], pd.DataFrame] = identity,
ignore_cache: bool = False,
auth_args: Auth = Auth.shared(),
extension: str = "parquet",
):
auth = Auth(auth_args)
client = Analytics(auth.session())

sql_plus_project = auth.project_id + ":" + sql

cache_folder = "/tmp" if ignore_cache else API_CACHE_DIR
csv_filename = APICache.filename_for_sql(sql_plus_project, "csv")
output_filename = APICache.filename_for_sql(
sql_plus_project, extension=extension
)
csv_cache_path = os.path.join(cache_folder, csv_filename)
output_cache_path = os.path.join(cache_folder, output_filename)

has_cache_file = APICache.does_cache_for_sql_exist(
sql_plus_project, extension=extension
)

if not ignore_cache and extension == "csv" and has_cache_file:
print(f'[CACHE] Loading from "{output_cache_path}"')
return APICache.read_csv(output_cache_path)
elif not ignore_cache and extension == "parquet" and has_cache_file:
print(f'[CACHE] Loading from "{output_cache_path}"')
return pd.read_parquet(output_cache_path)

service_query = DataLakeServiceQuery(auth.project_id, sql, csv_filename)

frame = client.execute_data_lake_query_to_dataframe(
service_query, dest_dir=cache_folder
)

csv_cache_path = cls.find_cache_file(csv_filename)

# Remove loaded raw file from data lake
if csv_cache_path:
os.remove(csv_cache_path)
else:
warnings.warn(
"Couldn't find downloaded data lake cache file for removal. Skipping..."
)

frame = transform(frame)

if ignore_cache:
return frame

print(f'Loading from "{output_cache_path}"')

if extension == "csv":
frame.to_csv(output_cache_path, index=False)
elif extension == "parquet":
frame.to_parquet(output_cache_path, index=False)
else:
warnings.warn(
f'Invalid cache extension "{extension}". Not writing to cache.'
)

return frame

@staticmethod
def find_cache_file(filename: str) -> Optional[str]:
"""Find files even if data lake writes a file with an appended number
Example: my_file.csv(2)
"""

NON_OVERRIDE_REGEX = r"(\((\d+)\))?$"

paths = glob(os.path.join(API_CACHE_DIR, filename + "*"))

if len(paths) == 0:
return None

def sort_value(path: str) -> int:
return int(
next(re.finditer(NON_OVERRIDE_REGEX, path)).group(2) or "0"
)

return sorted(paths, key=sort_value)[-1]
20 changes: 19 additions & 1 deletion phc/easy/util/api_cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hashlib
import json
import re
import os
from pathlib import Path
from typing import Callable, Optional
Expand All @@ -9,22 +10,39 @@
from phc.easy.query.fhir_aggregation import FhirAggregation
from phc.util.csv_writer import CSVWriter

TABLE_REGEX = r"^[^F]+FROM (\w+)"
DIR = "~/Downloads/phc/api-cache"
DATE_FORMAT_REGEX = (
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?([-+]\d{4}|Z)"
)

DATA_LAKE = "data_lake"
FHIR_DSL = "fhir_dsl"


class APICache:
@staticmethod
def filename_for_sql(sql: str, extension: str = "parquet"):
results = re.findall(TABLE_REGEX, sql)
table_name = results[0] if len(results) > 0 else "table"
hexdigest = hashlib.sha256(sql.encode("utf-8")).hexdigest()[0:8]
return "_".join([DATA_LAKE, table_name, hexdigest]) + "." + extension

@staticmethod
def does_cache_for_sql_exist(sql: str, extension: str = "parquet") -> bool:
return (
Path(DIR)
.expanduser()
.joinpath(APICache.filename_for_sql(sql, extension))
.exists()
)

@staticmethod
def filename_for_query(query: dict, namespace: Optional[str] = None):
"Descriptive filename with hash of query for easy retrieval"
is_aggregation = FhirAggregation.is_aggregation_query(query)

agg_description = "agg" if is_aggregation else ""

column_description = (
f"{len(query.get('columns', []))}col"
if not is_aggregation and isinstance(query.get("columns"), list)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ lenses
toolz
pydantic
python-pmap
fastparquet
12 changes: 12 additions & 0 deletions tests/test_api_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,15 @@ def test_reading_cache_file_with_invalid_date_does_not_raise():
sample_file.seek(0)

APICache.read_csv(sample_file)


def test_filename_for_sql():
assert (
APICache.filename_for_sql("SELECT * FROM my_table", extension="parquet")
== "data_lake_my_table_f84bab09.parquet"
)

assert (
APICache.filename_for_sql("unknown sql command", extension="parquet")
== "data_lake_table_a61006f5.parquet"
)

0 comments on commit 0638200

Please sign in to comment.