From ce1c18d64354a11271f4717a193dacfae54e6032 Mon Sep 17 00:00:00 2001 From: Neil Shephard Date: Fri, 10 Jan 2025 17:10:26 +0000 Subject: [PATCH 1/2] refactor: building within_intron and spliced subsets Closes #91 Closes #92 Extracts the duplicated code that builds subsets of reads that are within introns (or span introns) and within spliced_3ui and abstracts out to functions with tests. Currently the tests fail and I don't think they should so I need to work out why and perhaps add some additional fixtures to use here. --- isoslam/all_introns_counts_and_info.py | 21 ++++- isoslam/isoslam.py | 96 +++++++++++++++++++++++ tests/conftest.py | 68 +++++++++++++++++ tests/test_isoslam.py | 102 +++++++++++++++++++++++++ 4 files changed, 286 insertions(+), 1 deletion(-) diff --git a/isoslam/all_introns_counts_and_info.py b/isoslam/all_introns_counts_and_info.py index c36a3a6..706966c 100644 --- a/isoslam/all_introns_counts_and_info.py +++ b/isoslam/all_introns_counts_and_info.py @@ -113,7 +113,7 @@ def fragment_iterator(read_iterator): # Extract features pair_features = isoslam.extract_features_from_pair(pair) # DEBUGGING - Get information on features - if i_total_progress in (484): + if i_total_progress == 484: print(f"{pair_features=}") # Temporary code sets up variables from the returned dictionary to match those currently used read1_start = pair_features["read1"]["start"] @@ -155,6 +155,13 @@ def fragment_iterator(read_iterator): ## we will just ignore those few block_starts1, block_ends1 = isoslam.zip_blocks(read1) block_starts2, block_ends2 = isoslam.zip_blocks(read2) + + # Build a dictionary as its cleaner to work with + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + # RETAINED read1_within_intron = {} @@ -185,6 +192,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read1_within_intron: read1_within_intron[transcript_id] = [] read1_within_intron[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read1_within_intron=}") # SPLICED read1_spliced_3UI = {} @@ -194,6 +204,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read1_spliced_3UI: read1_spliced_3UI[transcript_id] = [] read1_spliced_3UI[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read1_spliced_3UI=}") ## READ 2 # RETAINED @@ -226,6 +239,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read2_within_intron: read2_within_intron[transcript_id] = [] read2_within_intron[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read2_within_intron=}") # SPLICED read2_spliced_3UI = {} @@ -235,6 +251,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read2_spliced_3UI: read2_spliced_3UI[transcript_id] = [] read2_spliced_3UI[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read2_spliced_3UI=}") all_dicts = [read1_within_intron, read2_within_intron, read1_spliced_3UI, read2_spliced_3UI] all_empty = all(not contents for contents in all_dicts) diff --git a/isoslam/isoslam.py b/isoslam/isoslam.py index 17e9fb3..4a964eb 100644 --- a/isoslam/isoslam.py +++ b/isoslam/isoslam.py @@ -196,3 +196,99 @@ def zip_blocks(read: AlignedSegment) -> Iterator[tuple[Any, ...]]: Tuple of two lists of integers the first is start location, the second is the end location. """ return zip(*read.get_blocks()) + + +def filter_within_introns( + pair_features: dict[str, dict[str, Any]], + blocks: dict[str, dict[str, set[int]]], + read: str = "read1", +) -> dict[str, tuple[Any]]: + """ + Filter utrons that are within introns. + + Parameters + ---------- + pair_features : dict[str, dict] + Dictionary of extracted features and utron in both read directions. + blocks : dic[str: dict[str, set]] + Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end. + read : str + Direction of read to filter on, default is ''read1'' but can also use ''read2''. + + Returns + ------- + dict[str, tuple(Any)] + Dictionary of the chromosome, start, end and strand of transcripts that are within introns. + """ + within_intron: dict[str, Any] = {} + for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: + # print("\n=========================") + # print(f"{chr=}") + # print(f"{start=}") + # print(f"{end=}") + # print(f"{transcript_id=}") + # print(f"{strand=}") + # Check if starts/ends are within introns + start_end_within_intron = ( + start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end + ) + spans_intron = ( + pair_features[read]["start"] < start + and pair_features[read]["end"] > end + and pair_features[read]["length"] < (end - start) + ) + if ( # pylint: disable=too-many-boolean-expressions + (start_end_within_intron or spans_intron) + # Start should not be in ends and ends should not be in start, can we combine the start and end block + # sets I wonder? + and start not in blocks["read1"]["ends"] + and end not in blocks["read1"]["starts"] + and start not in blocks["read2"]["ends"] + and end not in blocks["read2"]["starts"] + ): + # Why add an empty list and append a tuple? + # Should we not be getting the keys of within_intron to check transcript_id is not in? + if transcript_id not in within_intron: + within_intron[transcript_id] = [] + within_intron[transcript_id].append((chromosome, start, end, strand)) + return within_intron + + +def filter_spliced_utrons( + pair_features: dict[str, dict[str, Any]], + blocks: dict[str, dict[str, set[int]]], + read: str = "read1", +) -> dict[str, list[Any]]: + """ + Filter utrons that are within spliced 3UI's. + + Parameters + ---------- + pair_features : dict[str, dict] + Dictionary of extracted features and utron in both read directions. + blocks : dic[str: dict[str, set]] + Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end. + read : str + Direction of read to filter on, default is ''read1'' but can also use ''read2''. + + Returns + ------- + dict[str, tuple(Any)] + Dictionary of the chromosome, start, end and strand of transcripts that are within introns. + """ + spliced_3ui: dict[str, list[Any]] = {} + for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: + # print("\n=========================") + # print(f"{chr=}") + # print(f"{start=}") + # print(f"{end=}") + # print(f"{transcript_id=}") + # print(f"{strand=}") + # if start in block_ends1 and end in block_starts1: + if start in blocks[read]["ends"] and end in blocks[read]["starts"]: + # Why add an empty list and append a tuple? + # Should we not be getting the keys of within_intron to check transcript_id is not in? + if transcript_id not in spliced_3ui: + spliced_3ui[transcript_id] = [] + spliced_3ui[transcript_id].append((chromosome, start, end, strand)) + return spliced_3ui diff --git a/tests/conftest.py b/tests/conftest.py index 0625d5d..f9c214d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -148,3 +148,71 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe aligned segment to be testing because of this filtering. """ return [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0] + + +@pytest.fixture() +def feature_pair_14770_17814( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """Feature pair for 17814 and 14770 augment with utrons and return along with block start/ends.""" + aligned_segment_14770 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 14770][0] + aligned_segment_17814 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 17814][0] + pair_features = isoslam.extract_features_from_pair( + [ + aligned_segment_14770, + aligned_segment_17814, + ] + ) + # ...and get the utron for both reads and add them to the dictionary + _, gene_transcript = extract_strand_transcript + pair_features["read1"]["utron"] = isoslam.extract_utron( + pair_features["read1"], gene_transcript, coordinates=extract_transcript + ) + pair_features["read2"]["utron"] = isoslam.extract_utron( + pair_features["read2"], gene_transcript, coordinates=extract_transcript + ) + # ...then we can get the block_starts/ends + block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_14770) + block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_17814) + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) + return (pair_features, blocks) + + +@pytest.fixture() +def feature_pair_20906_21051( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """Feature pair for 21051 and 20906 augment with utrons and return along with block start/ends.""" + aligned_segment_20906 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0] + aligned_segment_21051 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 21051][0] + pair_features = isoslam.extract_features_from_pair( + [ + aligned_segment_20906, + aligned_segment_21051, + ] + ) + # ...and get the utron for both reads and add them to the dictionary + _, gene_transcript = extract_strand_transcript + pair_features["read1"]["utron"] = isoslam.extract_utron( + pair_features["read1"], gene_transcript, coordinates=extract_transcript + ) + pair_features["read2"]["utron"] = isoslam.extract_utron( + pair_features["read2"], gene_transcript, coordinates=extract_transcript + ) + # ...then we can get the block_starts/ends + block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_20906) + block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_21051) + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) + return (pair_features, blocks) diff --git a/tests/test_isoslam.py b/tests/test_isoslam.py index 3eb8c0b..54f3af1 100644 --- a/tests/test_isoslam.py +++ b/tests/test_isoslam.py @@ -392,3 +392,105 @@ def test_zip_blocks( start_blocks, end_blocks = isoslam.zip_blocks(request.getfixturevalue(aligned_segment)) assert start_blocks == expected_start assert end_blocks == expected_end + + +# Need to identify aligned segments that are within introns +@pytest.mark.parametrize( + ( + "feature_pair", + "read", + "expected", + ), + [ + pytest.param( # type: ignore[misc] + "feature_pair_14770_17814", + "read1", + {}, + id="14770 and 17814 for read1 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_14770_17814", + "read2", + {}, + id="14770 and 17814 for read2 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_20906_21051", + "read1", + {}, + id="20906 and 21051 for read1 - Unassigned", + ), + pytest.param( + "feature_pair_20906_21051", + "read2", + {}, + id="20906 and 21051 for read2 - Unassigned", + ), + ], +) +def test_filter_within_introns( + feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]], + read: str, + expected: dict[str, tuple[Any]], + request: pytest.FixtureRequest, +) -> None: + """Test filtering within introns.""" + # We first extract the required pair_features and block start/ends + pair_features, blocks = request.getfixturevalue(feature_pair) + within_introns = isoslam.filter_within_introns( + pair_features, + blocks, + read, + ) + assert within_introns == expected + + +# Not getting the same as from debugging all_introns_counts_and_info.py +@pytest.mark.parametrize( + ( + "feature_pair", + "read", + "expected", + ), + [ + pytest.param( # type: ignore[misc] + "feature_pair_14770_17814", + "read1", + {"ENST00000442898": [(18174, 18380, "9", "-")]}, + id="14770 and 17814 for read1 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_14770_17814", + "read2", + {"ENST00000442898": [(17855, 18027, "9", "-")]}, + id="14770 and 17814 for read2 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_20906_21051", + "read1", + {}, + id="20906 and 21051 for read1 - Unassigned", + ), + pytest.param( + "feature_pair_20906_21051", + "read2", + {}, + id="20906 and 21051 for read2 - Unassigned", + ), + ], +) +def test_filter_spliced_utrons( + feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]], + read: str, + expected: dict[str, list[Any]], + request: pytest.FixtureRequest, +) -> None: + """Test filtering within introns.""" + # We first extract the required pair_features and block start/ends + pair_features, blocks = request.getfixturevalue(feature_pair) + within_introns = isoslam.filter_spliced_utrons( + pair_features, + blocks, + read, + ) + assert within_introns == expected From 552d5f3fae6e6422a29d6ce4704a1462963482e3 Mon Sep 17 00:00:00 2001 From: Neil Shephard Date: Mon, 13 Jan 2025 17:13:38 +0000 Subject: [PATCH 2/2] tests: Meaningful tests for refactored filtering Identified instances where meaningful results are obtained when filtering for instances where the start/end are within introns or there is splicing. Tests are not fully comprehensive yet as not all logic statements are covered, in particular when a transcript spans a whole region but I'm currently a bit hazy on whether this will always happen as the `query_length` is always `150` (`read_length` is longer and was the cause of previous errors). --- isoslam/all_introns_counts_and_info.py | 16 +++ isoslam/isoslam.py | 20 +-- tests/conftest.py | 160 +++++++++++++++++----- tests/test_all_introns_counts_and_info.py | 2 + tests/test_isoslam.py | 99 ++++++++----- 5 files changed, 208 insertions(+), 89 deletions(-) diff --git a/isoslam/all_introns_counts_and_info.py b/isoslam/all_introns_counts_and_info.py index 706966c..0fdda89 100644 --- a/isoslam/all_introns_counts_and_info.py +++ b/isoslam/all_introns_counts_and_info.py @@ -165,6 +165,15 @@ def fragment_iterator(read_iterator): # RETAINED read1_within_intron = {} + def print_info(chr, start, end, transcript_id, strand, blocks, text, count=i_total_progress) -> None: + print(f"\n{count} =========================== {text=}\n") + print(f"{transcript_id=}") + print(f"{chr=}") + print(f"{start=}") + print(f"{end=}") + print(f"{strand=}") + print(f"\n{blocks}\n") + for chr, start, end, transcript_id, strand in utrons1: # if starts/ends in intron/(s) if ( @@ -174,6 +183,8 @@ def fragment_iterator(read_iterator): and start not in block_ends2 and end not in block_starts2 ): + # if transcript_id == "ENST00000442898": + # print_info(chr, start, end, transcript_id, strand, blocks, text="read1_within_intron") if transcript_id not in read1_within_intron: read1_within_intron[transcript_id] = [] read1_within_intron[transcript_id].append((start, end, chr, strand)) @@ -189,6 +200,8 @@ def fragment_iterator(read_iterator): and start not in block_ends2 and end not in block_starts2 ): + # if transcript_id == "ENST00000442898": + # print_info(chr, start, end, transcript_id, strand, blocks, text="spans_intron") if transcript_id not in read1_within_intron: read1_within_intron[transcript_id] = [] read1_within_intron[transcript_id].append((start, end, chr, strand)) @@ -201,6 +214,8 @@ def fragment_iterator(read_iterator): for chr, start, end, transcript_id, strand in utrons1: if start in block_ends1 and end in block_starts1: + # if transcript_id == "ENST00000442898": + # print_info(chr, start, end, transcript_id, strand, blocks, text="spliced") if transcript_id not in read1_spliced_3UI: read1_spliced_3UI[transcript_id] = [] read1_spliced_3UI[transcript_id].append((start, end, chr, strand)) @@ -214,6 +229,7 @@ def fragment_iterator(read_iterator): for chr, start, end, transcript_id, strand in utrons2: # if starts/ends in intron/(s) + if ( (start <= read2_start <= end or start <= read2_end <= end) and start not in block_ends2 diff --git a/isoslam/isoslam.py b/isoslam/isoslam.py index 4a964eb..45d37ed 100644 --- a/isoslam/isoslam.py +++ b/isoslam/isoslam.py @@ -222,13 +222,6 @@ def filter_within_introns( """ within_intron: dict[str, Any] = {} for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: - # print("\n=========================") - # print(f"{chr=}") - # print(f"{start=}") - # print(f"{end=}") - # print(f"{transcript_id=}") - # print(f"{strand=}") - # Check if starts/ends are within introns start_end_within_intron = ( start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end ) @@ -247,7 +240,7 @@ def filter_within_introns( and end not in blocks["read2"]["starts"] ): # Why add an empty list and append a tuple? - # Should we not be getting the keys of within_intron to check transcript_id is not in? + # TODO: Check where there are multiple matches that these are extracted if transcript_id not in within_intron: within_intron[transcript_id] = [] within_intron[transcript_id].append((chromosome, start, end, strand)) @@ -260,7 +253,7 @@ def filter_spliced_utrons( read: str = "read1", ) -> dict[str, list[Any]]: """ - Filter utrons that are within spliced 3UI's. + Filter utrons where start is in the block ends or end is in the block start. Parameters ---------- @@ -278,16 +271,9 @@ def filter_spliced_utrons( """ spliced_3ui: dict[str, list[Any]] = {} for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: - # print("\n=========================") - # print(f"{chr=}") - # print(f"{start=}") - # print(f"{end=}") - # print(f"{transcript_id=}") - # print(f"{strand=}") - # if start in block_ends1 and end in block_starts1: if start in blocks[read]["ends"] and end in blocks[read]["starts"]: # Why add an empty list and append a tuple? - # Should we not be getting the keys of within_intron to check transcript_id is not in? + # TODO: Check where there are multiple matches that these are extracted if transcript_id not in spliced_3ui: spliced_3ui[transcript_id] = [] spliced_3ui[transcript_id].append((chromosome, start, end, strand)) diff --git a/tests/conftest.py b/tests/conftest.py index f9c214d..b1cf9c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -151,18 +151,31 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe @pytest.fixture() -def feature_pair_14770_17814( - bam_sorted_assigned_file_no4sU: list[AlignedSegment], +def start_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment: + """Fixture where the start (16061) is within the intron (16011-16161).""" + return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 16061 and x.reference_end == 16717][0] + + +@pytest.fixture() +def end_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment: + """Fixture where the end (24850) is within the intron (24715-24865).""" + return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 18492 and x.reference_end == 24850][0] + + +def _feature_pair( + bam_file: list[AlignedSegment], + read1_start: int, + read2_start: int, extract_transcript: dict[str, list[int | str]], extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], ) -> tuple[dict["str", Any], list, list, list, list]: - """Feature pair for 17814 and 14770 augment with utrons and return along with block start/ends.""" - aligned_segment_14770 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 14770][0] - aligned_segment_17814 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 17814][0] + """Construct feature pairs.""" + aligned_segment1 = [x for x in bam_file if x.reference_start == read1_start][0] + aligned_segment2 = [x for x in bam_file if x.reference_start == read2_start][0] pair_features = isoslam.extract_features_from_pair( [ - aligned_segment_14770, - aligned_segment_17814, + aligned_segment1, + aligned_segment2, ] ) # ...and get the utron for both reads and add them to the dictionary @@ -174,45 +187,124 @@ def feature_pair_14770_17814( pair_features["read2"], gene_transcript, coordinates=extract_transcript ) # ...then we can get the block_starts/ends - block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_14770) - block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_17814) + block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment1) + block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment2) blocks = { "read1": {"starts": block_starts1, "ends": block_ends1}, "read2": {"starts": block_starts2, "ends": block_ends2}, } - # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) return (pair_features, blocks) +# Fixtures for within introns +@pytest.fixture() +def feature_pair_within_15967_24715( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 15967 and 24715 augment with utrons and return along with block start/ends. + + Both of these have a start within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_no4sU, + read1_start=15967, + read2_start=24715, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + @pytest.fixture() -def feature_pair_20906_21051( +def feature_pair_within_17790_18093( bam_sorted_assigned_file_0hr1: list[AlignedSegment], extract_transcript: dict[str, list[int | str]], extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], ) -> tuple[dict["str", Any], list, list, list, list]: - """Feature pair for 21051 and 20906 augment with utrons and return along with block start/ends.""" - aligned_segment_20906 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0] - aligned_segment_21051 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 21051][0] - pair_features = isoslam.extract_features_from_pair( - [ - aligned_segment_20906, - aligned_segment_21051, - ] + """ + Feature pair for 17790 and 18093 augment with utrons and return along with block start/ends. + + Both of these have a start within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17790, + read2_start=18093, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, ) - # ...and get the utron for both reads and add them to the dictionary - _, gene_transcript = extract_strand_transcript - pair_features["read1"]["utron"] = isoslam.extract_utron( - pair_features["read1"], gene_transcript, coordinates=extract_transcript + + +@pytest.fixture() +def feature_pair_within_17709_18290( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17709 and 18290 augment with utrons and return along with block start/ends. + + 17709 has a start within ''read1'' and end within ''read2'' whilst 18290 has a start within ''read2'' and an end + within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17709, + read2_start=18290, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, ) - pair_features["read2"]["utron"] = isoslam.extract_utron( - pair_features["read2"], gene_transcript, coordinates=extract_transcript + + +# Fixtures for spliced matches +@pytest.fixture() +def feature_pair_spliced_17739_17814( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17739 and 17814 augment with utrons and return along with block start/ends. + + 17739 has an end that matches the end of ''read1'' whilst 17814 has a start that matches the end of ''read1''. + + Used in test_filter_spliced_utrons() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_no4sU, + read1_start=17739, + read2_start=17814, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + +@pytest.fixture() +def feature_pair_spliced_17430_18155( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17430 and 18155 augment with utrons and return along with block start/ends. + + 17430 has an end that matches ''read1'' start whilst 18155 has an end that matches the start of ''read1'' and an end + within ''read1''. + + Used in test_filter_spliced_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17430, + read2_start=18155, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, ) - # ...then we can get the block_starts/ends - block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_20906) - block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_21051) - blocks = { - "read1": {"starts": block_starts1, "ends": block_ends1}, - "read2": {"starts": block_starts2, "ends": block_ends2}, - } - # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) - return (pair_features, blocks) diff --git a/tests/test_all_introns_counts_and_info.py b/tests/test_all_introns_counts_and_info.py index ca2f111..d01356a 100644 --- a/tests/test_all_introns_counts_and_info.py +++ b/tests/test_all_introns_counts_and_info.py @@ -39,3 +39,5 @@ def test_main(file_path: Path, tmp_path: Path, regtest) -> None: # Ideally would like to use syrupy to test snapshots but pd.DataFrame() are not yet supported # https://github.com/syrupy-project/syrupy/issues/887 print(results.to_string(float_format="{:.4e}".format), file=regtest) + # Uncomment to print out debugging information when running tests + # assert False diff --git a/tests/test_isoslam.py b/tests/test_isoslam.py index 54f3af1..9f9c104 100644 --- a/tests/test_isoslam.py +++ b/tests/test_isoslam.py @@ -403,28 +403,55 @@ def test_zip_blocks( ), [ pytest.param( # type: ignore[misc] - "feature_pair_14770_17814", + "feature_pair_within_15967_24715", "read1", - {}, - id="14770 and 17814 for read1 - Assigned to MSTRG.63147", - ), - pytest.param( - "feature_pair_14770_17814", - "read2", - {}, - id="14770 and 17814 for read2 - Assigned to MSTRG.63147", + { + "ENST00000442898": [ + ( + "9", + 16061, + 16717, + "-", + ), + ] + }, + id="15967 and 24715 feature pair from no4sU", ), pytest.param( - "feature_pair_20906_21051", + "feature_pair_within_17790_18093", "read1", - {}, - id="20906 and 21051 for read1 - Unassigned", + { + "ENST00000442898": [ + ( + "9", + 17855, + 18027, + "-", + ), + ] + }, + id="17790 and 18093 feature pair from 0hr1", ), pytest.param( - "feature_pair_20906_21051", - "read2", - {}, - id="20906 and 21051 for read2 - Unassigned", + "feature_pair_within_17709_18290", + "read1", + { + "ENST00000442898": [ + ( + "9", + 17479, + 17718, + "-", + ), + ( + "9", + 17855, + 18027, + "-", + ), + ] + }, + id="17709 and 18290 feature pair from 0hr1", ), ], ) @@ -434,8 +461,13 @@ def test_filter_within_introns( expected: dict[str, tuple[Any]], request: pytest.FixtureRequest, ) -> None: - """Test filtering within introns.""" - # We first extract the required pair_features and block start/ends + """ + Test filtering within introns. + + Tests are very basic and not all conditional statements are covered, in particular instances where transcripts span + the whole region are not covered. This may be challenging as the length is _always_ 150. + """ + # We first extract the required pair_features and block start/ends from the fixture pair_features, blocks = request.getfixturevalue(feature_pair) within_introns = isoslam.filter_within_introns( pair_features, @@ -454,28 +486,16 @@ def test_filter_within_introns( ), [ pytest.param( # type: ignore[misc] - "feature_pair_14770_17814", - "read1", - {"ENST00000442898": [(18174, 18380, "9", "-")]}, - id="14770 and 17814 for read1 - Assigned to MSTRG.63147", - ), - pytest.param( - "feature_pair_14770_17814", - "read2", - {"ENST00000442898": [(17855, 18027, "9", "-")]}, - id="14770 and 17814 for read2 - Assigned to MSTRG.63147", - ), - pytest.param( - "feature_pair_20906_21051", + "feature_pair_spliced_17739_17814", "read1", - {}, - id="20906 and 21051 for read1 - Unassigned", + {"ENST00000442898": [("9", 17855, 18027, "-")]}, + id="14770 and 17814 for read1 from no4sU", ), pytest.param( - "feature_pair_20906_21051", + "feature_pair_spliced_17430_18155", "read2", - {}, - id="20906 and 21051 for read2 - Unassigned", + {"ENST00000442898": [("9", 18174, 18380, "-"), ("9", 18492, 24850, "-")]}, + id="17430 and 18155 for from 0hr1", ), ], ) @@ -485,8 +505,11 @@ def test_filter_spliced_utrons( expected: dict[str, list[Any]], request: pytest.FixtureRequest, ) -> None: - """Test filtering within introns.""" - # We first extract the required pair_features and block start/ends + """Test filtering spliced utrons. + + Tests are very basic and not all conditional statements are covered. + """ + # We first extract the required pair_features and block start/ends from the fixture pair_features, blocks = request.getfixturevalue(feature_pair) within_introns = isoslam.filter_spliced_utrons( pair_features,