From 743b33941882d32ceed484ec7898ca70fb5a4b9b Mon Sep 17 00:00:00 2001 From: Neil Shephard Date: Fri, 10 Jan 2025 17:10:26 +0000 Subject: [PATCH] refactor(wip): building within_intron and spliced subsets Closes #91 Closes #92 Extracts the duplicated code that builds subsets of reads that are within introns and within spliced_3ui and abstracts out to functions with tests. Currently the tests fail and I don't think they should so I need to work out why and perhaps add some additional fixtures to use here. --- isoslam/all_introns_counts_and_info.py | 21 ++++- isoslam/isoslam.py | 88 +++++++++++++++++++++ tests/conftest.py | 68 +++++++++++++++++ tests/test_isoslam.py | 102 +++++++++++++++++++++++++ 4 files changed, 278 insertions(+), 1 deletion(-) diff --git a/isoslam/all_introns_counts_and_info.py b/isoslam/all_introns_counts_and_info.py index c36a3a6..706966c 100644 --- a/isoslam/all_introns_counts_and_info.py +++ b/isoslam/all_introns_counts_and_info.py @@ -113,7 +113,7 @@ def fragment_iterator(read_iterator): # Extract features pair_features = isoslam.extract_features_from_pair(pair) # DEBUGGING - Get information on features - if i_total_progress in (484): + if i_total_progress == 484: print(f"{pair_features=}") # Temporary code sets up variables from the returned dictionary to match those currently used read1_start = pair_features["read1"]["start"] @@ -155,6 +155,13 @@ def fragment_iterator(read_iterator): ## we will just ignore those few block_starts1, block_ends1 = isoslam.zip_blocks(read1) block_starts2, block_ends2 = isoslam.zip_blocks(read2) + + # Build a dictionary as its cleaner to work with + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + # RETAINED read1_within_intron = {} @@ -185,6 +192,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read1_within_intron: read1_within_intron[transcript_id] = [] read1_within_intron[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read1_within_intron=}") # SPLICED read1_spliced_3UI = {} @@ -194,6 +204,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read1_spliced_3UI: read1_spliced_3UI[transcript_id] = [] read1_spliced_3UI[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read1_spliced_3UI=}") ## READ 2 # RETAINED @@ -226,6 +239,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read2_within_intron: read2_within_intron[transcript_id] = [] read2_within_intron[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read2_within_intron=}") # SPLICED read2_spliced_3UI = {} @@ -235,6 +251,9 @@ def fragment_iterator(read_iterator): if transcript_id not in read2_spliced_3UI: read2_spliced_3UI[transcript_id] = [] read2_spliced_3UI[transcript_id].append((start, end, chr, strand)) + # DEBUGGING - Get information on features + if i_total_progress == 484: + print(f"{read2_spliced_3UI=}") all_dicts = [read1_within_intron, read2_within_intron, read1_spliced_3UI, read2_spliced_3UI] all_empty = all(not contents for contents in all_dicts) diff --git a/isoslam/isoslam.py b/isoslam/isoslam.py index 17e9fb3..6834024 100644 --- a/isoslam/isoslam.py +++ b/isoslam/isoslam.py @@ -196,3 +196,91 @@ def zip_blocks(read: AlignedSegment) -> Iterator[tuple[Any, ...]]: Tuple of two lists of integers the first is start location, the second is the end location. """ return zip(*read.get_blocks()) + + +def filter_within_introns( + pair_features: dict[str, dict[str, Any]], + blocks: dict[str, dict[str, set[int]]], + read: str = "read1", +) -> dict[str, tuple[Any]]: + """ + Filter utrons that are within introns. + + Parameters + ---------- + pair_features : dict[str, dict] + Dictionary of extracted features and utron in both read directions. + blocks : dic[str: dict[str, set]] + Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end. + read : str + Direction of read to filter on, default is ''read1'' but can also use ''read2''. + + Returns + ------- + dict[str, tuple(Any)] + Dictionary of the chromosome, start, end and strand of transcripts that are within introns. + """ + within_intron: dict[str, Any] = {} + for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: + # print("\n=========================") + # print(f"{chr=}") + # print(f"{start=}") + # print(f"{end=}") + # print(f"{transcript_id=}") + # print(f"{strand=}") + # Check if starts/ends are within introns + if ( # pylint: disable=too-many-boolean-expressions + (start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end) + # Start should not be in ends and ends should not be in start, can we combine the start and end block + # sets I wonder? + and start not in blocks["read1"]["ends"] + and end not in blocks["read1"]["starts"] + and start not in blocks["read2"]["ends"] + and end not in blocks["read2"]["starts"] + ): + # Why add an empty list and append a tuple? + # Should we not be getting the keys of within_intron to check transcript_id is not in? + if transcript_id not in within_intron: + within_intron[transcript_id] = [] + within_intron[transcript_id].append((chromosome, start, end, strand)) + return within_intron + + +def filter_spliced_utrons( + pair_features: dict[str, dict[str, Any]], + blocks: dict[str, dict[str, set[int]]], + read: str = "read1", +) -> dict[str, list[Any]]: + """ + Filter utrons that are within spliced 3UI's. + + Parameters + ---------- + pair_features : dict[str, dict] + Dictionary of extracted features and utron in both read directions. + blocks : dic[str: dict[str, set]] + Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end. + read : str + Direction of read to filter on, default is ''read1'' but can also use ''read2''. + + Returns + ------- + dict[str, tuple(Any)] + Dictionary of the chromosome, start, end and strand of transcripts that are within introns. + """ + spliced_3ui: dict[str, list[Any]] = {} + for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: + # print("\n=========================") + # print(f"{chr=}") + # print(f"{start=}") + # print(f"{end=}") + # print(f"{transcript_id=}") + # print(f"{strand=}") + # if start in block_ends1 and end in block_starts1: + if start in blocks[read]["ends"] and end in blocks[read]["starts"]: + # Why add an empty list and append a tuple? + # Should we not be getting the keys of within_intron to check transcript_id is not in? + if transcript_id not in spliced_3ui: + spliced_3ui[transcript_id] = [] + spliced_3ui[transcript_id].append((chromosome, start, end, strand)) + return spliced_3ui diff --git a/tests/conftest.py b/tests/conftest.py index 0625d5d..f9c214d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -148,3 +148,71 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe aligned segment to be testing because of this filtering. """ return [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0] + + +@pytest.fixture() +def feature_pair_14770_17814( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """Feature pair for 17814 and 14770 augment with utrons and return along with block start/ends.""" + aligned_segment_14770 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 14770][0] + aligned_segment_17814 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 17814][0] + pair_features = isoslam.extract_features_from_pair( + [ + aligned_segment_14770, + aligned_segment_17814, + ] + ) + # ...and get the utron for both reads and add them to the dictionary + _, gene_transcript = extract_strand_transcript + pair_features["read1"]["utron"] = isoslam.extract_utron( + pair_features["read1"], gene_transcript, coordinates=extract_transcript + ) + pair_features["read2"]["utron"] = isoslam.extract_utron( + pair_features["read2"], gene_transcript, coordinates=extract_transcript + ) + # ...then we can get the block_starts/ends + block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_14770) + block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_17814) + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) + return (pair_features, blocks) + + +@pytest.fixture() +def feature_pair_20906_21051( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """Feature pair for 21051 and 20906 augment with utrons and return along with block start/ends.""" + aligned_segment_20906 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0] + aligned_segment_21051 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 21051][0] + pair_features = isoslam.extract_features_from_pair( + [ + aligned_segment_20906, + aligned_segment_21051, + ] + ) + # ...and get the utron for both reads and add them to the dictionary + _, gene_transcript = extract_strand_transcript + pair_features["read1"]["utron"] = isoslam.extract_utron( + pair_features["read1"], gene_transcript, coordinates=extract_transcript + ) + pair_features["read2"]["utron"] = isoslam.extract_utron( + pair_features["read2"], gene_transcript, coordinates=extract_transcript + ) + # ...then we can get the block_starts/ends + block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_20906) + block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_21051) + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + # return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2) + return (pair_features, blocks) diff --git a/tests/test_isoslam.py b/tests/test_isoslam.py index 3eb8c0b..54f3af1 100644 --- a/tests/test_isoslam.py +++ b/tests/test_isoslam.py @@ -392,3 +392,105 @@ def test_zip_blocks( start_blocks, end_blocks = isoslam.zip_blocks(request.getfixturevalue(aligned_segment)) assert start_blocks == expected_start assert end_blocks == expected_end + + +# Need to identify aligned segments that are within introns +@pytest.mark.parametrize( + ( + "feature_pair", + "read", + "expected", + ), + [ + pytest.param( # type: ignore[misc] + "feature_pair_14770_17814", + "read1", + {}, + id="14770 and 17814 for read1 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_14770_17814", + "read2", + {}, + id="14770 and 17814 for read2 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_20906_21051", + "read1", + {}, + id="20906 and 21051 for read1 - Unassigned", + ), + pytest.param( + "feature_pair_20906_21051", + "read2", + {}, + id="20906 and 21051 for read2 - Unassigned", + ), + ], +) +def test_filter_within_introns( + feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]], + read: str, + expected: dict[str, tuple[Any]], + request: pytest.FixtureRequest, +) -> None: + """Test filtering within introns.""" + # We first extract the required pair_features and block start/ends + pair_features, blocks = request.getfixturevalue(feature_pair) + within_introns = isoslam.filter_within_introns( + pair_features, + blocks, + read, + ) + assert within_introns == expected + + +# Not getting the same as from debugging all_introns_counts_and_info.py +@pytest.mark.parametrize( + ( + "feature_pair", + "read", + "expected", + ), + [ + pytest.param( # type: ignore[misc] + "feature_pair_14770_17814", + "read1", + {"ENST00000442898": [(18174, 18380, "9", "-")]}, + id="14770 and 17814 for read1 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_14770_17814", + "read2", + {"ENST00000442898": [(17855, 18027, "9", "-")]}, + id="14770 and 17814 for read2 - Assigned to MSTRG.63147", + ), + pytest.param( + "feature_pair_20906_21051", + "read1", + {}, + id="20906 and 21051 for read1 - Unassigned", + ), + pytest.param( + "feature_pair_20906_21051", + "read2", + {}, + id="20906 and 21051 for read2 - Unassigned", + ), + ], +) +def test_filter_spliced_utrons( + feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]], + read: str, + expected: dict[str, list[Any]], + request: pytest.FixtureRequest, +) -> None: + """Test filtering within introns.""" + # We first extract the required pair_features and block start/ends + pair_features, blocks = request.getfixturevalue(feature_pair) + within_introns = isoslam.filter_spliced_utrons( + pair_features, + blocks, + read, + ) + assert within_introns == expected