Skip to content

Commit

Permalink
users can now add to DCR per variables. Add basic ontology
Browse files Browse the repository at this point in the history
  • Loading branch information
vemonet committed Mar 20, 2024
1 parent cc8e7b3 commit 23cb83b
Show file tree
Hide file tree
Showing 24 changed files with 1,417 additions and 560 deletions.
57 changes: 57 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: Update ontology documentation
on:
push:
branches: ["main"]
workflow_dispatch:

# Allow one concurrent deployment
concurrency:
group: "pages"
cancel-in-progress: true

jobs:

deploy:
runs-on: ubuntu-latest
permissions:
pages: write
id-token: write
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"

- uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'zulu'

- name: Install dependencies
run: |
bash scripts/install.sh
- name: Build docs
run: |
bash scripts/build.sh
- name: Setup Pages
id: pages
uses: actions/configure-pages@v4

- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: './docs'

- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
out/
data/
.env
docs/

__pycache__
__pycache__

*.jar
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,7 @@ docker compose exec backend curl -X POST -T /data/triplestore_dump_20240225.nq -
> \[!CAUTION]
>
> The path given for `triplestore_dump.nq` is **inside** the docker container
### 🚚 Move the app
If you need to move the app to a different server, just copy the whole `data/` folder.
8 changes: 5 additions & 3 deletions backend/src/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ async def __call__(self, request: Request) -> Optional[str]:


auth_params = {
"audience": "https://explorer.icare4cvd.eu",
# "audience": "https://explorer.icare4cvd.eu",
"audience": "https://other-ihi-app",
"redirect_uri": settings.redirect_uri,
}

Expand Down Expand Up @@ -110,14 +111,15 @@ async def auth_callback(code: str) -> RedirectResponse:
status_code=HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
# print("ACCESS PAYLOAD", access_payload)
print("ACCESS PAYLOAD", access_payload)
# NOTE: if needed user info can be retrieved later from the /userinfo endpoint using the provided access token
# resp = await client.get(f"{settings.authorization_endpoint}/userinfo", headers={"Authorization": f"Bearer {token['access_token']}"})
# print("user_info", resp.json())

# Check in payload if logged in user has the required permissions
if (
"https://explorer.icare4cvd.eu" in access_payload["aud"]
"https://explorer.icare4cvd.eu"
in access_payload["aud"]
# and "read:icare4cvd-dataset-descriptions" in access_payload["permissions"]
):
user_email = id_payload["email"]
Expand Down
4 changes: 2 additions & 2 deletions backend/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
@dataclass
class Settings:
frontend_url: str = field(default_factory=lambda: os.getenv("FRONTEND_URL", "http://localhost:3001"))
api_host: str = field(default_factory=lambda: os.getenv("VIRTUAL_HOST", "localhost:3001"))
api_host: str = field(default_factory=lambda: os.getenv("VIRTUAL_HOST", "localhost:3000"))
sparql_endpoint: str = field(default_factory=lambda: os.getenv("SPARQL_ENDPOINT", "http://localhost:7878"))

auth_endpoint: str = field(default_factory=lambda: os.getenv("AUTH_ENDPOINT", ""))
client_id: str = field(default_factory=lambda: os.getenv("CLIENT_ID", ""))
client_secret: str = field(default_factory=lambda: os.getenv("CLIENT_SECRET", ""))
response_type: str = field(default_factory=lambda: os.getenv("RESPONSE_TYPE", "code"))
scope: str = field(default_factory=lambda: os.getenv("SCOPE", "openid email read:icare4cvd-dataset-descriptions"))
scope: str = field(default_factory=lambda: os.getenv("SCOPE", "openid email read:permissions"))
jwt_secret: str = field(
default_factory=lambda: os.getenv("JWT_SECRET", "vCitcsPBwH4BMCwEqlO1aHJSIn--usrcyxPPRbeYdHM")
)
Expand Down
70 changes: 66 additions & 4 deletions backend/src/decentriq.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from copy import deepcopy
from typing import Any

import decentriq_platform as dq
import decentriq_platform.sql as dqsql
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException

from src.auth import get_current_user
from src.config import settings
Expand Down Expand Up @@ -71,6 +72,34 @@ def create_provision_dcr(user: Any, cohort: Cohort) -> dict[str, Any]:
}


def pandas_script_filter_cohort_vars(cohort: Cohort, requested_vars: list[str], df_var: str) -> str:
"""Generate pandas script for filtering variables"""
if len(requested_vars) <= len(cohort.variables):
filter_columns_script = f"{df_var} = {df_var}[{requested_vars}]\n"
return filter_columns_script


def pandas_script_merge_cohorts(merged_cohorts: dict[str, list[str]], all_cohorts: dict[str, Cohort]) -> str:
"""Generate pandas script for merging cohorts on variables mapped_id"""
# TODO: to be fixed
merge_script = ""
dfs_to_merge = []
for cohort_id, vars_requested in merged_cohorts.items():
if cohort_id not in all_cohorts:
raise ValueError(f"Cohort {cohort_id} does not exist.")
# Assuming you have a way to get dataframe variable names (mapped_id) from vars_requested
df_name = f"df_{cohort_id}"
vars_mapped = [f"'{var}'" for var in vars_requested] # Example to generate a list of variable names
dfs_to_merge.append(df_name)
merge_script += (
f"{df_name} = pd.DataFrame({cohort_id})[{vars_mapped}]\n" # Placeholder for actual data retrieval
)

# Assuming all dataframes have a common column for merging
merge_script += f"merged_df = pd.concat([{', '.join(dfs_to_merge)}], ignore_index=True)\n"
return merge_script


@router.post(
"/create-dcr",
name="Create Data Clean Room for computing",
Expand All @@ -82,8 +111,40 @@ async def create_compute_dcr(
) -> dict[str, Any]:
"""Create a Data Clean Room for computing with the cohorts requested using Decentriq SDK"""
users = [user["email"]]
# Get metadata for selected cohorts
cohorts = {cohort_id: metadata for cohort_id, metadata in retrieve_cohorts_metadata(user["email"]).items() if cohort_id in cohorts_request["cohorts"]}
# TODO: cohorts_request could also be a dict of union of cohorts to merge
# {"cohorts": {"cohort_id": ["var1", "var2"], "merged_cohort3": {"cohort1": ["weight", "sex"], "cohort2": ["gender", "patient weight"]}}}
# We automatically merge the cohorts, figuring out which variables are the same thanks to mappings
all_cohorts = retrieve_cohorts_metadata(user["email"])

# Get metadata for selected cohorts and variables
selected_cohorts = {}
# We generate a pandas script to automatically prepare the data from the cohort based on known metadata
pandas_script = "import pandas as pd\n\n"

for cohort_id, requested_vars in cohorts_request["cohorts"].items():
cohort_meta = deepcopy(all_cohorts[cohort_id])
df_var = f"df_{cohort_id.replace(' ', '_').replace('-', '_').replace('(', '').replace(')', '')}"
if isinstance(requested_vars, list):
# Direct cohort variables list
pandas_script += f"{df_var} = pd.read_csv('{cohort_id}.csv')\n"

if len(requested_vars) <= len(cohort_meta.variables):
# Add filter variables to pandas script
pandas_script += f"{df_var} = {df_var}[{requested_vars}]\n"
# Get all cohort and variables metadata for selected variables
for var in all_cohorts[cohort_id].variables:
if var not in requested_vars:
del cohort_meta.variables[var]
selected_cohorts[cohort_id] = cohort_meta
elif isinstance(requested_vars, dict):
# Merge operation, need to be implemented on the frontend
pandas_script += pandas_script_merge_cohorts(requested_vars, all_cohorts)
# TODO: add merged cohorts schema to selected_cohorts
else:
raise HTTPException(status_code=400, detail=f"Invalid structure for cohort {cohort_id}")

# TODO: Add pandas_script to the DCR?
# print(pandas_script)

# Establish connection to Decentriq
client = dq.create_client(settings.decentriq_email, settings.decentriq_token)
Expand All @@ -103,7 +164,7 @@ async def create_compute_dcr(
builder = dq.DataRoomBuilder(f"iCare4CVD DCR compute {dcr_count}", enclave_specs=enclave_specs)

# Convert cohort variables to decentriq schema
for cohort_id, cohort in cohorts.items():
for cohort_id, cohort in selected_cohorts.items():
# Create data node for cohort
data_node_builder = dqsql.TabularDataNodeBuilder(cohort_id, schema=get_cohort_schema(cohort))
data_node_builder.add_to_builder(builder, authentication=client.decentriq_pki_authentication, users=users)
Expand Down Expand Up @@ -131,5 +192,6 @@ async def create_compute_dcr(
"dcr_url": dcr_url,
"dcr_title": dcr_desc["title"],
"dcr": dcr_desc,
"merge_script": pandas_script,
**cohorts_request,
}
11 changes: 6 additions & 5 deletions backend/src/explore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dataclasses import field
import os
from typing import Any

Expand Down Expand Up @@ -37,11 +36,13 @@ async def download_cohort_spreasheet(cohort_id: str, user: Any = Depends(get_cur


@router.get("/search-concepts")
async def search_concepts(query: str, domain: list[str] | None = Query(default=None), user: Any = Depends(get_current_user)):
async def search_concepts(
query: str, domain: list[str] | None = Query(default=None), user: Any = Depends(get_current_user)
):
"""Search for concepts in the Athena API and check how many time those concepts are use in our KG."""
if not domain:
domain = []
vocabs = ["LOINC", "ATC", "SNOMED"] # "RxNorm"
vocabs = ["LOINC", "ATC", "SNOMED"] # "RxNorm"
try:
response = requests.get(
"https://athena.ohdsi.org/api/v1/concepts",
Expand Down Expand Up @@ -97,12 +98,12 @@ async def search_concepts(query: str, domain: list[str] | None = Query(default=N
}}
GRAPH ?cohortVarGraph {{
?cohort icare:has_variable ?variable .
?variable a icare:Variable ;
dc:identifier ?varName ;
rdfs:label ?varLabel ;
icare:var_type ?varType ;
icare:index ?index ;
dcterms:isPartOf ?cohort .
icare:index ?index .
OPTIONAL {{ ?variable icare:omop ?omopDomain }}
}}
Expand Down
3 changes: 1 addition & 2 deletions backend/src/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass, field, asdict
from dataclasses import asdict, dataclass, field
from typing import Dict, List, Optional


Expand Down Expand Up @@ -42,7 +42,6 @@ class Cohort:
cohort_id: str
cohort_type: Optional[str] = None
cohort_email: list[str] = field(default_factory=list)
owner: Optional[str] = None
institution: str = ""
study_type: Optional[str] = None
study_participants: Optional[str] = None
Expand Down
24 changes: 13 additions & 11 deletions backend/src/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ def delete_existing_triples(graph_uri: str | URIRef, subject="?s", predicate="?p
def get_cohort_uri(cohort_id: str) -> URIRef:
return ICARE[f"cohort/{cohort_id.replace(' ', '_')}"]


def get_cohort_mapping_uri(cohort_id: str) -> URIRef:
return ICARE[f"cohort/{cohort_id.replace(' ', '_')}/mappings"]


def get_var_uri(cohort_id: str | URIRef, var_id: str) -> URIRef:
return ICARE[f"cohort/{cohort_id.replace(' ', '_')}/{var_id.replace(' ', '_')}"]

Expand Down Expand Up @@ -182,7 +184,6 @@ def load_cohort_dict_file(dict_path: str, cohort_id: str, user_email: str) -> Da
g = init_graph()
g.add((cohort_uri, RDF.type, ICARE.Cohort, cohort_uri))
g.add((cohort_uri, DC.identifier, Literal(cohort_id), cohort_uri))
# g.add((cohort_uri, ICARE["owner"], Literal(owner_email), cohort_uri))

# Record all errors and raise them at the end
errors = []
Expand All @@ -191,15 +192,17 @@ def load_cohort_dict_file(dict_path: str, cohort_id: str, user_email: str) -> Da
if not row["VARIABLE NAME"] or not row["VARIABLE LABEL"] or not row["VAR TYPE"]:
errors.append(f"Row {i} is missing required data: variable_name, variable_label, or var_type")
if row["VAR TYPE"] not in ACCEPTED_DATATYPES:
errors.append(f"Row {i} for variable {row['VARIABLE NAME']} is using a wrong datatype: {row['VAR TYPE']}. It should be one of: {', '.join(ACCEPTED_DATATYPES)}")
errors.append(
f"Row {i} for variable {row['VARIABLE NAME']} is using a wrong datatype: {row['VAR TYPE']}. It should be one of: {', '.join(ACCEPTED_DATATYPES)}"
)
# TODO: raise error when duplicate value for VARIABLE LABEL?

# Create a URI for the variable
variable_uri = get_var_uri(cohort_id, row["VARIABLE NAME"])
g.add((cohort_uri, ICARE.has_variable, variable_uri, cohort_uri))

# Add the type of the resource
g.add((variable_uri, RDF.type, ICARE.Variable, cohort_uri))
g.add((variable_uri, DCTERMS.isPartOf, cohort_uri, cohort_uri))
g.add((variable_uri, DC.identifier, Literal(row["VARIABLE NAME"]), cohort_uri))
g.add((variable_uri, RDFS.label, Literal(row["VARIABLE LABEL"]), cohort_uri))
g.add((variable_uri, ICARE["index"], Literal(i, datatype=XSD.integer), cohort_uri))
Expand All @@ -221,7 +224,9 @@ def load_cohort_dict_file(dict_path: str, cohort_id: str, user_email: str) -> Da
# Handle Category
if column in ["categories"]:
if len(value) == 1:
errors.append(f"Row {i} for variable {row['VARIABLE NAME']} has only one category {row['categories']}. It should have at least two.")
errors.append(
f"Row {i} for variable {row['VARIABLE NAME']} has only one category {row['categories']}. It should have at least two."
)
continue
for index, category in enumerate(value):
cat_uri = get_category_uri(variable_uri, index)
Expand Down Expand Up @@ -253,7 +258,7 @@ async def upload_cohort(
) -> dict[str, Any]:
"""Upload a cohort metadata file to the server and add its variables to the triplestore."""
user_email = user["email"]
cohort_info = retrieve_cohorts_metadata(user["email"]).get(cohort_id)
cohort_info = retrieve_cohorts_metadata(user_email).get(cohort_id)
# cohorts = retrieve_cohorts_metadata(user_email)
if not cohort_info:
raise HTTPException(
Expand All @@ -270,16 +275,11 @@ async def upload_cohort(
os.makedirs(cohorts_folder, exist_ok=True)
# Check if cohort already uploaded
if cohort_info and len(cohort_info.variables) > 0:
authorized_users = [*settings.admins_list, user_email]
if user["email"] not in authorized_users:
raise HTTPException(status_code=403, detail=f"You are not the owner of cohort {cohort_id}.")
# Make sure we keep the original owner in case an admin edits it
user_email = cohort_info.owner
# Check for existing data dictionary file and back it up
for file_name in os.listdir(cohorts_folder):
if file_name.endswith("_datadictionary.csv"):
# Construct the backup file name with the current date
backup_file_name = f"{file_name.rsplit('.', 1)[0]}_{datetime.now().strftime('%Y%m%d')}.csv"
backup_file_name = f"{file_name.rsplit('.', 1)[0]}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
backup_file_path = os.path.join(cohorts_folder, backup_file_name)
existing_file_path = os.path.join(cohorts_folder, file_name)
# Rename (backup) the existing file
Expand Down Expand Up @@ -319,6 +319,7 @@ async def upload_cohort(

COHORTS_METADATA_FILEPATH = os.path.join(settings.data_folder, "iCARE4CVD_Cohorts.xlsx")


@router.post(
"/upload-cohorts-metadata",
name="Upload metadata file for all cohorts",
Expand Down Expand Up @@ -373,6 +374,7 @@ def cohorts_metadata_file_to_graph(filepath: str) -> Dataset:
g.add((cohort_uri, ICARE.study_objective, Literal(row["Primary objective"]), cohorts_graph))
return g


def init_triplestore() -> None:
"""Initialize triplestore with the OMOP CDM ontology and the iCARE4CVD cohorts metadata."""
# If triples exist, skip initialization
Expand Down
Loading

0 comments on commit 23cb83b

Please sign in to comment.