From 552d5f3fae6e6422a29d6ce4704a1462963482e3 Mon Sep 17 00:00:00 2001 From: Neil Shephard Date: Mon, 13 Jan 2025 17:13:38 +0000 Subject: [PATCH] tests: Meaningful tests for refactored filtering Identified instances where meaningful results are obtained when filtering for instances where the start/end are within introns or there is splicing. Tests are not fully comprehensive yet as not all logic statements are covered, in particular when a transcript spans a whole region but I'm currently a bit hazy on whether this will always happen as the `query_length` is always `150` (`read_length` is longer and was the cause of previous errors). --- isoslam/all_introns_counts_and_info.py | 16 +++ isoslam/isoslam.py | 20 +-- tests/conftest.py | 160 +++++++++++++++++----- tests/test_all_introns_counts_and_info.py | 2 + tests/test_isoslam.py | 99 ++++++++----- 5 files changed, 208 insertions(+), 89 deletions(-) diff --git a/isoslam/all_introns_counts_and_info.py b/isoslam/all_introns_counts_and_info.py index 706966c..0fdda89 100644 --- a/isoslam/all_introns_counts_and_info.py +++ b/isoslam/all_introns_counts_and_info.py @@ -165,6 +165,15 @@ def fragment_iterator(read_iterator): # RETAINED read1_within_intron = {} + def print_info(chr, start, end, transcript_id, strand, blocks, text, count=i_total_progress) -> None: + print(f"\n{count} =========================== {text=}\n") + print(f"{transcript_id=}") + print(f"{chr=}") + print(f"{start=}") + print(f"{end=}") + print(f"{strand=}") + print(f"\n{blocks}\n") + for chr, start, end, transcript_id, strand in utrons1: # if starts/ends in intron/(s) if ( @@ -174,6 +183,8 @@ def fragment_iterator(read_iterator): and start not in block_ends2 and end not in block_starts2 ): + # if transcript_id == "ENST00000442898": + # print_info(chr, start, end, transcript_id, strand, blocks, text="read1_within_intron") if transcript_id not in read1_within_intron: read1_within_intron[transcript_id] = [] read1_within_intron[transcript_id].append((start, end, chr, strand)) @@ -189,6 +200,8 @@ def fragment_iterator(read_iterator): and start not in block_ends2 and end not in block_starts2 ): + # if transcript_id == "ENST00000442898": + # print_info(chr, start, end, transcript_id, strand, blocks, text="spans_intron") if transcript_id not in read1_within_intron: read1_within_intron[transcript_id] = [] read1_within_intron[transcript_id].append((start, end, chr, strand)) @@ -201,6 +214,8 @@ def fragment_iterator(read_iterator): for chr, start, end, transcript_id, strand in utrons1: if start in block_ends1 and end in block_starts1: + # if transcript_id == "ENST00000442898": + # print_info(chr, start, end, transcript_id, strand, blocks, text="spliced") if transcript_id not in read1_spliced_3UI: read1_spliced_3UI[transcript_id] = [] read1_spliced_3UI[transcript_id].append((start, end, chr, strand)) @@ -214,6 +229,7 @@ def fragment_iterator(read_iterator): for chr, start, end, transcript_id, strand in utrons2: # if starts/ends in intron/(s) + if ( (start <= read2_start <= end or start <= read2_end <= end) and start not in block_ends2 diff --git a/isoslam/isoslam.py b/isoslam/isoslam.py index 4a964eb..45d37ed 100644 --- a/isoslam/isoslam.py +++ b/isoslam/isoslam.py @@ -222,13 +222,6 @@ def filter_within_introns( """ within_intron: dict[str, Any] = {} for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: - # print("\n=========================") - # print(f"{chr=}") - # print(f"{start=}") - # print(f"{end=}") - # print(f"{transcript_id=}") - # print(f"{strand=}") - # Check if starts/ends are within introns start_end_within_intron = ( start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end ) @@ -247,7 +240,7 @@ def filter_within_introns( and end not in blocks["read2"]["starts"] ): # Why add an empty list and append a tuple? - # Should we not be getting the keys of within_intron to check transcript_id is not in? + # TODO: Check where there are multiple matches that these are extracted if transcript_id not in within_intron: within_intron[transcript_id] = [] within_intron[transcript_id].append((chromosome, start, end, strand)) @@ -260,7 +253,7 @@ def filter_spliced_utrons( read: str = "read1", ) -> dict[str, list[Any]]: """ - Filter utrons that are within spliced 3UI's. + Filter utrons where start is in the block ends or end is in the block start. Parameters ---------- @@ -278,16 +271,9 @@ def filter_spliced_utrons( """ spliced_3ui: dict[str, list[Any]] = {} for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: - # print("\n=========================") - # print(f"{chr=}") - # print(f"{start=}") - # print(f"{end=}") - # print(f"{transcript_id=}") - # print(f"{strand=}") - # if start in block_ends1 and end in block_starts1: if start in blocks[read]["ends"] and end in blocks[read]["starts"]: # Why add an empty list and append a tuple? - # Should we not be getting the keys of within_intron to check transcript_id is not in? + # TODO: Check where there are multiple matches that these are extracted if transcript_id not in spliced_3ui: spliced_3ui[transcript_id] = [] spliced_3ui[transcript_id].append((chromosome, start, end, strand)) diff --git a/tests/conftest.py b/tests/conftest.py index f9c214d..b1cf9c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -151,18 +151,31 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe @pytest.fixture() -def feature_pair_14770_17814( - bam_sorted_assigned_file_no4sU: list[AlignedSegment], +def start_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment: + """Fixture where the start (16061) is within the intron (16011-16161).""" + return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 16061 and x.reference_end == 16717][0] + + +@pytest.fixture() +def end_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment: + """Fixture where the end (24850) is within the intron (24715-24865).""" + return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 18492 and x.reference_end == 24850][0] + + +def _feature_pair( + bam_file: list[AlignedSegment], + read1_start: int, + read2_start: int, extract_transcript: dict[str, list[int | str]], extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], ) -> tuple[dict["str", Any], list, list, list, list]: - """Feature pair for 17814 and 14770 augment with utrons and return along with block start/ends.""" - aligned_segment_14770 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 14770][0] - aligned_segment_17814 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 17814][0] + """Construct feature pairs.""" + aligned_segment1 = [x for x in bam_file if x.reference_start == read1_start][0] + aligned_segment2 = [x for x in bam_file if x.reference_start == read2_start][0] pair_features = isoslam.extract_features_from_pair( [ - aligned_segment_14770, - aligned_segment_17814, + aligned_segment1, + aligned_segment2, ] ) # ...and get the utron for both reads and add them to the dictionary @@ -174,45 +187,124 @@ def feature_pair_14770_17814( pair_features["read2"], gene_transcript, coordinates=extract_transcript ) # ...then we can get the block_starts/ends - block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_14770) - block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_17814) + block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment1) + block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment2) blocks = { "read1": {"starts": block_starts1, "ends": block_ends1}, "read2": {"starts": block_starts2, "ends": block_ends2}, } - # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) return (pair_features, blocks) +# Fixtures for within introns +@pytest.fixture() +def feature_pair_within_15967_24715( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 15967 and 24715 augment with utrons and return along with block start/ends. + + Both of these have a start within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_no4sU, + read1_start=15967, + read2_start=24715, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + @pytest.fixture() -def feature_pair_20906_21051( +def feature_pair_within_17790_18093( bam_sorted_assigned_file_0hr1: list[AlignedSegment], extract_transcript: dict[str, list[int | str]], extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], ) -> tuple[dict["str", Any], list, list, list, list]: - """Feature pair for 21051 and 20906 augment with utrons and return along with block start/ends.""" - aligned_segment_20906 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0] - aligned_segment_21051 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 21051][0] - pair_features = isoslam.extract_features_from_pair( - [ - aligned_segment_20906, - aligned_segment_21051, - ] + """ + Feature pair for 17790 and 18093 augment with utrons and return along with block start/ends. + + Both of these have a start within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17790, + read2_start=18093, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, ) - # ...and get the utron for both reads and add them to the dictionary - _, gene_transcript = extract_strand_transcript - pair_features["read1"]["utron"] = isoslam.extract_utron( - pair_features["read1"], gene_transcript, coordinates=extract_transcript + + +@pytest.fixture() +def feature_pair_within_17709_18290( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17709 and 18290 augment with utrons and return along with block start/ends. + + 17709 has a start within ''read1'' and end within ''read2'' whilst 18290 has a start within ''read2'' and an end + within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17709, + read2_start=18290, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, ) - pair_features["read2"]["utron"] = isoslam.extract_utron( - pair_features["read2"], gene_transcript, coordinates=extract_transcript + + +# Fixtures for spliced matches +@pytest.fixture() +def feature_pair_spliced_17739_17814( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17739 and 17814 augment with utrons and return along with block start/ends. + + 17739 has an end that matches the end of ''read1'' whilst 17814 has a start that matches the end of ''read1''. + + Used in test_filter_spliced_utrons() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_no4sU, + read1_start=17739, + read2_start=17814, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + +@pytest.fixture() +def feature_pair_spliced_17430_18155( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17430 and 18155 augment with utrons and return along with block start/ends. + + 17430 has an end that matches ''read1'' start whilst 18155 has an end that matches the start of ''read1'' and an end + within ''read1''. + + Used in test_filter_spliced_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17430, + read2_start=18155, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, ) - # ...then we can get the block_starts/ends - block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_20906) - block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_21051) - blocks = { - "read1": {"starts": block_starts1, "ends": block_ends1}, - "read2": {"starts": block_starts2, "ends": block_ends2}, - } - # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) - return (pair_features, blocks) diff --git a/tests/test_all_introns_counts_and_info.py b/tests/test_all_introns_counts_and_info.py index ca2f111..d01356a 100644 --- a/tests/test_all_introns_counts_and_info.py +++ b/tests/test_all_introns_counts_and_info.py @@ -39,3 +39,5 @@ def test_main(file_path: Path, tmp_path: Path, regtest) -> None: # Ideally would like to use syrupy to test snapshots but pd.DataFrame() are not yet supported # https://github.com/syrupy-project/syrupy/issues/887 print(results.to_string(float_format="{:.4e}".format), file=regtest) + # Uncomment to print out debugging information when running tests + # assert False diff --git a/tests/test_isoslam.py b/tests/test_isoslam.py index 54f3af1..9f9c104 100644 --- a/tests/test_isoslam.py +++ b/tests/test_isoslam.py @@ -403,28 +403,55 @@ def test_zip_blocks( ), [ pytest.param( # type: ignore[misc] - "feature_pair_14770_17814", + "feature_pair_within_15967_24715", "read1", - {}, - id="14770 and 17814 for read1 - Assigned to MSTRG.63147", - ), - pytest.param( - "feature_pair_14770_17814", - "read2", - {}, - id="14770 and 17814 for read2 - Assigned to MSTRG.63147", + { + "ENST00000442898": [ + ( + "9", + 16061, + 16717, + "-", + ), + ] + }, + id="15967 and 24715 feature pair from no4sU", ), pytest.param( - "feature_pair_20906_21051", + "feature_pair_within_17790_18093", "read1", - {}, - id="20906 and 21051 for read1 - Unassigned", + { + "ENST00000442898": [ + ( + "9", + 17855, + 18027, + "-", + ), + ] + }, + id="17790 and 18093 feature pair from 0hr1", ), pytest.param( - "feature_pair_20906_21051", - "read2", - {}, - id="20906 and 21051 for read2 - Unassigned", + "feature_pair_within_17709_18290", + "read1", + { + "ENST00000442898": [ + ( + "9", + 17479, + 17718, + "-", + ), + ( + "9", + 17855, + 18027, + "-", + ), + ] + }, + id="17709 and 18290 feature pair from 0hr1", ), ], ) @@ -434,8 +461,13 @@ def test_filter_within_introns( expected: dict[str, tuple[Any]], request: pytest.FixtureRequest, ) -> None: - """Test filtering within introns.""" - # We first extract the required pair_features and block start/ends + """ + Test filtering within introns. + + Tests are very basic and not all conditional statements are covered, in particular instances where transcripts span + the whole region are not covered. This may be challenging as the length is _always_ 150. + """ + # We first extract the required pair_features and block start/ends from the fixture pair_features, blocks = request.getfixturevalue(feature_pair) within_introns = isoslam.filter_within_introns( pair_features, @@ -454,28 +486,16 @@ def test_filter_within_introns( ), [ pytest.param( # type: ignore[misc] - "feature_pair_14770_17814", - "read1", - {"ENST00000442898": [(18174, 18380, "9", "-")]}, - id="14770 and 17814 for read1 - Assigned to MSTRG.63147", - ), - pytest.param( - "feature_pair_14770_17814", - "read2", - {"ENST00000442898": [(17855, 18027, "9", "-")]}, - id="14770 and 17814 for read2 - Assigned to MSTRG.63147", - ), - pytest.param( - "feature_pair_20906_21051", + "feature_pair_spliced_17739_17814", "read1", - {}, - id="20906 and 21051 for read1 - Unassigned", + {"ENST00000442898": [("9", 17855, 18027, "-")]}, + id="14770 and 17814 for read1 from no4sU", ), pytest.param( - "feature_pair_20906_21051", + "feature_pair_spliced_17430_18155", "read2", - {}, - id="20906 and 21051 for read2 - Unassigned", + {"ENST00000442898": [("9", 18174, 18380, "-"), ("9", 18492, 24850, "-")]}, + id="17430 and 18155 for from 0hr1", ), ], ) @@ -485,8 +505,11 @@ def test_filter_spliced_utrons( expected: dict[str, list[Any]], request: pytest.FixtureRequest, ) -> None: - """Test filtering within introns.""" - # We first extract the required pair_features and block start/ends + """Test filtering spliced utrons. + + Tests are very basic and not all conditional statements are covered. + """ + # We first extract the required pair_features and block start/ends from the fixture pair_features, blocks = request.getfixturevalue(feature_pair) within_introns = isoslam.filter_spliced_utrons( pair_features,