Skip to content

Commit

Permalink
[ENH] Allow unrecognized pipeline names and versions in processing st…
Browse files Browse the repository at this point in the history
…atus file (#401)

* refactor utils to include check if all pipeline names/vers are unrecognized

* rename utility functions

* update tests of unrecognized pipeline names, fix warning msg

* update tests of pipeline version check

* add e2e test of proc status file with some unrecognized pipelines

* fix message formatting

* test error when no pipeline-versions recognized

* add new example processing status TSVs

* update message template for unrecognized pipelines
  • Loading branch information
alyssadai authored Dec 11, 2024
1 parent 2d5839d commit f68468a
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 54 deletions.
15 changes: 3 additions & 12 deletions bagel/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,18 +414,9 @@ def derivatives(
f"We found missing values in the following rows (first row is zero): {row_indices}."
)

pipelines = status_df[PROC_STATUS_COLS["pipeline_name"]].unique()
derivative_utils.check_pipelines_are_recognized(pipelines)

# TODO: Do we need to check all versions across all pipelines first, and report all unrecognized versions together?
for pipeline in pipelines:
versions = status_df[
status_df[PROC_STATUS_COLS["pipeline_name"]] == pipeline
][PROC_STATUS_COLS["pipeline_version"]].unique()

derivative_utils.check_pipeline_versions_are_recognized(
pipeline, versions
)
derivative_utils.check_at_least_one_pipeline_version_is_recognized(
status_df=status_df
)

jsonld_dataset = model_utils.extract_and_validate_jsonld_dataset(
jsonld_path
Expand Down
2 changes: 1 addition & 1 deletion bagel/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_pipeline_catalog(url: str, path: Path) -> list[dict]:
) from e


def parse_pipeline_catalog():
def parse_pipeline_catalog() -> tuple[dict, dict]:
"""
Load the pipeline catalog and return a dictionary of pipeline names and their URIs in the Nipoppy namespace,
and a dictionary of pipeline names and their supported versions in Nipoppy.
Expand Down
92 changes: 76 additions & 16 deletions bagel/utilities/derivative_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import Iterable

import pandas as pd
Expand All @@ -17,35 +18,92 @@
}


def check_pipelines_are_recognized(pipelines: Iterable[str]):
"""Check that all pipelines in the processing status file are supported by Nipoppy."""
def get_recognized_pipelines(pipelines: Iterable[str]) -> list:
"""
Check that all pipelines in the processing status file are supported by Nipoppy.
Raise an error if all pipelines are unrecognized, otherwise warn about unrecognized pipelines.
"""
recognized_pipelines = list(
set(pipelines).intersection(mappings.KNOWN_PIPELINE_URIS)
)
unrecognized_pipelines = list(
set(pipelines).difference(mappings.KNOWN_PIPELINE_URIS)
)
if len(unrecognized_pipelines) > 0:

unrecognized_pipelines_details = (
f"Unrecognized processing pipelines: {unrecognized_pipelines}\n"
f"Supported pipelines are those in the Nipoppy pipeline catalog (https://github.com/nipoppy/pipeline-catalog):\n"
f"{list(mappings.KNOWN_PIPELINE_URIS.keys())}"
)
if not recognized_pipelines:
raise LookupError(
f"The processing status file contains unrecognized pipelines in the column '{PROC_STATUS_COLS['pipeline_name']}': "
f"{unrecognized_pipelines}. "
f"Allowed pipeline names are the following pipelines supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n"
f"{mappings.KNOWN_PIPELINE_URIS}"
f"The processing status file contains no recognized pipelines in the column: '{PROC_STATUS_COLS['pipeline_name']}'.\n"
f"{unrecognized_pipelines_details}"
)
if unrecognized_pipelines:
warnings.warn(
f"The processing status file contains unrecognized pipelines in the column: '{PROC_STATUS_COLS['pipeline_name']}'. These will be ignored.\n"
f"{unrecognized_pipelines_details}"
)
return recognized_pipelines


def check_pipeline_versions_are_recognized(
def validate_pipeline_versions(
pipeline: str, versions: Iterable[str]
):
) -> tuple[list, list]:
"""
Check that all pipeline versions in the processing status file are supported by Nipoppy.
Assumes that the input pipeline name is recognized.
For a given pipeline, return the recognized and unrecognized pipeline versions in the processing status file
based on the Nipoppy pipeline catalog, and return both as lists.
"""
recognized_versions = list(
set(versions).intersection(mappings.KNOWN_PIPELINE_VERSIONS[pipeline])
)
unrecognized_versions = list(
set(versions).difference(mappings.KNOWN_PIPELINE_VERSIONS[pipeline])
)
if len(unrecognized_versions) > 0:

return recognized_versions, unrecognized_versions


def check_at_least_one_pipeline_version_is_recognized(status_df: pd.DataFrame):
"""
Check that at least one pipeline name and version combination found in the processing status file is supported by Nipoppy.
"""
recognized_pipelines = get_recognized_pipelines(
status_df[PROC_STATUS_COLS["pipeline_name"]].unique()
)

any_recognized_versions = False
unrecognized_pipeline_versions = {}
for pipeline in recognized_pipelines:
versions = status_df[
status_df[PROC_STATUS_COLS["pipeline_name"]] == pipeline
][PROC_STATUS_COLS["pipeline_version"]].unique()

recognized_versions, unrecognized_versions = (
validate_pipeline_versions(pipeline, versions)
)
if recognized_versions:
any_recognized_versions = True
if unrecognized_versions:
unrecognized_pipeline_versions[pipeline] = unrecognized_versions

unrecognized_versions_details = (
f"Unrecognized processing pipeline versions: {unrecognized_pipeline_versions}\n"
"Supported pipeline versions are those in the Nipoppy pipeline catalog. "
"For a full list, see https://github.com/nipoppy/pipeline-catalog."
)
if not any_recognized_versions:
# TODO: Consider simply exiting with a message and no output instead?
raise LookupError(
f"The processing status file contains unrecognized {pipeline} versions in the column '{PROC_STATUS_COLS['pipeline_version']}': {unrecognized_versions}. "
f"Allowed {pipeline} versions are the following versions supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n"
f"{mappings.KNOWN_PIPELINE_VERSIONS[pipeline]}"
f"The processing status file contains no recognized versions of any pipelines in the column '{PROC_STATUS_COLS['pipeline_version']}'.\n"
f"{unrecognized_versions_details}"
)
if unrecognized_pipeline_versions:
warnings.warn(
f"The processing status file contains unrecognized versions of pipelines in the column '{PROC_STATUS_COLS['pipeline_version']}'. "
"These will be ignored.\n"
f"{unrecognized_versions_details}"
)


Expand All @@ -61,8 +119,10 @@ def create_completed_pipelines(session_proc_df: pd.DataFrame) -> list:
PROC_STATUS_COLS["pipeline_version"],
]
):
# Check that all pipeline steps have succeeded
if (
pipeline in mappings.KNOWN_PIPELINE_URIS
and version in mappings.KNOWN_PIPELINE_VERSIONS[pipeline]
) and (
session_pipe_df[PROC_STATUS_COLS["status"]].str.lower()
== "success"
).all():
Expand Down
3 changes: 3 additions & 0 deletions tests/data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ _incomplete.tsv | Has a missing value in the `bids_participant_id` column | Fail
_unique_sessions.csv | Includes a unique subject-session (`sub-01`, `ses-03`) not found in the synthetic dataset | Pass
_missing_sessions.tsv | One subject (`sub-02`) is missing all session labels | Pass
_no_bids_sessions.tsv | Has session labels in all rows for `session_id`, but no values in `bids_session_id` column | Pass
_unrecognized_pipelines.tsv | Includes some pipeline names and versions not found in the pipeline catalog | Pass
_no_recognized_pipelines.tsv | Includes pipeline names found in the pipeline catalog, but no recognized versions | Fail



## Example expected CLI outputs
Expand Down
6 changes: 6 additions & 0 deletions tests/data/proc_status_no_recognized_pipelines.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
participant_id bids_participant_id session_id bids_session_id pipeline_name pipeline_version pipeline_step status
01 sub-01 01 ses-01 fmriprep unknown.version1 step1 FAIL
01 sub-01 01 ses-01 fmriprep unknown.version1 step2 INCOMPLETE
01 sub-01 01 ses-01 fmriprep unknown.version2 default SUCCESS
01 sub-01 01 ses-01 freesurfer unknown.version3 default SUCCESS
01 sub-01 02 ses-02 freesurfer unknown.version3 default UNAVAILABLE
6 changes: 6 additions & 0 deletions tests/data/proc_status_unrecognized_pipelines.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
participant_id bids_participant_id session_id bids_session_id pipeline_name pipeline_version pipeline_step status
01 sub-01 01 ses-01 fmriprep unknown.version step1 FAIL
01 sub-01 01 ses-01 fmriprep unknown.version step2 INCOMPLETE
01 sub-01 01 ses-01 unknown-pipeline 1.0.0 default SUCCESS
01 sub-01 01 ses-01 freesurfer 7.3.2 default SUCCESS
01 sub-01 02 ses-02 freesurfer 7.3.2 default UNAVAILABLE
96 changes: 96 additions & 0 deletions tests/integration/test_cli_derivatives.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from bagel import mappings
from bagel.cli import bagel


Expand Down Expand Up @@ -205,3 +206,98 @@ def test_custom_imaging_sessions_created_for_missing_session_labels(

# Note: order of items does not matter for dict comparison
assert custom_ses_completed_pipes == completed_pipes_for_missing_ses_sub


def test_unrecognized_pipelines_and_versions_excluded_from_output(
runner,
test_data,
test_data_upload_path,
default_derivatives_output_path,
load_test_json,
):
"""
Test that when a subset of pipelines or versions from a processing status file are unrecognized,
they are excluded from the output JSONLD with informative warnings, without causing the derivatives command to fail.
"""
with pytest.warns(UserWarning) as w:
result = runner.invoke(
bagel,
[
"derivatives",
"-t",
test_data / "proc_status_unrecognized_pipelines.tsv",
"-p",
test_data_upload_path / "example_synthetic.jsonld",
"-o",
default_derivatives_output_path,
],
catch_exceptions=False,
)

assert result.exit_code == 0, f"Errored out. STDOUT: {result.output}"

assert len(w) == 2
warnings = [warning.message.args[0] for warning in w]
for warning in warnings:
assert (
"unrecognized pipelines" in warning
and "unknown-pipeline" in warning
) or (
"unrecognized versions" in warning
and "{'fmriprep': ['unknown.version']}" in warning
)

output = load_test_json(default_derivatives_output_path)

sessions_with_completed_pipes = {}
for sub in output["hasSamples"]:
if sub["hasLabel"] == "sub-01":
for ses in sub["hasSession"]:
if (
ses["schemaKey"] == "ImagingSession"
and "hasCompletedPipeline" in ses
):
sessions_with_completed_pipes[ses["hasLabel"]] = ses[
"hasCompletedPipeline"
]

ses01_completed_pipes = sessions_with_completed_pipes.get("ses-01")
assert sessions_with_completed_pipes.keys() == {"ses-01"}
assert len(ses01_completed_pipes) == 1
assert (
ses01_completed_pipes[0]["hasPipelineName"]["identifier"]
== f"{mappings.NP.pf}:freesurfer"
)
assert ses01_completed_pipes[0]["hasPipelineVersion"] == "7.3.2"


def test_error_when_no_pipeline_version_combos_recognized(
runner,
test_data,
test_data_upload_path,
default_derivatives_output_path,
load_test_json,
):
"""
Test that when there is no recognized pipeline-version combination in the processing status file,
an error is raised and no output JSONLD is created.
"""
with pytest.raises(LookupError) as e:
runner.invoke(
bagel,
[
"derivatives",
"-t",
test_data / "proc_status_no_recognized_pipelines.tsv",
"-p",
test_data_upload_path / "example_synthetic.jsonld",
"-o",
default_derivatives_output_path,
],
catch_exceptions=False,
)

assert "no recognized versions" in str(e.value)
assert (
not default_derivatives_output_path.exists()
), "A JSONLD was created despite inputs being invalid."
63 changes: 38 additions & 25 deletions tests/unit/test_derivative_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,44 +114,57 @@ def test_pipeline_versions_are_loaded():
)


@pytest.mark.parametrize(
"pipelines, unrecog_pipelines",
[
(["fmriprep", "pipeline1"], ["pipeline1"]),
(["pipelineA", "pipelineB"], ["pipelineA", "pipelineB"]),
],
)
def test_unrecognized_pipeline_names_raise_error(pipelines, unrecog_pipelines):
"""Test that pipeline names not found in the pipeline catalog raise an informative error."""
with pytest.raises(LookupError) as e:
derivative_utils.check_pipelines_are_recognized(pipelines)
def test_warning_raised_when_some_pipeline_names_unrecognized():
"""
Test that when a subset of pipeline names are not found in the pipeline catalog,
an informative warning is raised but the recognized pipeline names are successfully returned.
"""
pipelines = ["fmriprep", "fakepipeline1"]

with pytest.warns(UserWarning) as w:
recognized_pipelines = derivative_utils.get_recognized_pipelines(
pipelines
)

assert all(
substr in str(e.value)
for substr in ["unrecognized pipelines"] + unrecog_pipelines
substr in str(w[0].message.args[0])
for substr in ["unrecognized pipelines", "fakepipeline1"]
)
assert recognized_pipelines == ["fmriprep"]


def test_error_raised_when_no_pipeline_names_recognized():
"""
Test that when no provided pipeline names are found in the pipeline catalog,
an informative error is raised.
"""
pipelines = ["fakepipeline1", "fakepipeline2"]

with pytest.raises(LookupError) as e:
derivative_utils.get_recognized_pipelines(pipelines)

assert "no recognized pipelines" in str(e.value)


@pytest.mark.parametrize(
"fmriprep_versions, unrecog_versions",
"fmriprep_versions, expected_recog_versions, expected_unrecog_versions",
[
(["20.2.7", "vA.B"], ["vA.B"]),
(["C.D.E", "F.G.H"], ["C.D.E", "F.G.H"]),
(["20.2.7", "vA.B"], ["20.2.7"], ["vA.B"]),
(["C.D.E", "F.G.H"], [], ["C.D.E", "F.G.H"]),
],
)
def test_unrecognized_pipeline_versions_raise_error(
fmriprep_versions, unrecog_versions
def test_pipeline_versions_classified_correctly(
fmriprep_versions, expected_recog_versions, expected_unrecog_versions
):
"""Test that versions of a pipeline not found in the pipeline catalog raise an informative error."""
with pytest.raises(LookupError) as e:
derivative_utils.check_pipeline_versions_are_recognized(
"""Test that versions of a pipeline are correctly classified as recognized or unrecognized according to the pipeline catalog."""
recog_versions, unrecog_versions = (
derivative_utils.validate_pipeline_versions(
"fmriprep", fmriprep_versions
)

assert all(
substr in str(e.value)
for substr in ["unrecognized fmriprep versions"] + unrecog_versions
)
# The order of the versions in the lists is not guaranteed
assert set(recog_versions) == set(expected_recog_versions)
assert set(unrecog_versions) == set(expected_unrecog_versions)


def test_create_completed_pipelines():
Expand Down

0 comments on commit f68468a

Please sign in to comment.