diff --git a/isoslam/all_introns_counts_and_info.py b/isoslam/all_introns_counts_and_info.py index 4ed0475..ae7d4ea 100644 --- a/isoslam/all_introns_counts_and_info.py +++ b/isoslam/all_introns_counts_and_info.py @@ -113,8 +113,8 @@ def fragment_iterator(read_iterator): # Extract features pair_features = isoslam.extract_features_from_pair(pair) # DEBUGGING - Get information on features - if i_total_progress == 484: - print(f"{pair_features=}") + # if i_total_progress == 484: + # print(f"{pair_features=}") # Temporary code sets up variables from the returned dictionary to match those currently used read1_start = pair_features["read1"]["start"] read1_end = pair_features["read1"]["end"] @@ -155,91 +155,35 @@ def fragment_iterator(read_iterator): ## we will just ignore those few block_starts1, block_ends1 = isoslam.zip_blocks(read1) block_starts2, block_ends2 = isoslam.zip_blocks(read2) - # RETAINED - read1_within_intron = {} - - for chr, start, end, transcript_id, strand in utrons1: - # if starts/ends in intron/(s) - if ( - (start <= read1_start <= end or start <= read1_end <= end) - and start not in block_ends1 - and end not in block_starts1 - and start not in block_ends2 - and end not in block_starts2 - ): - if transcript_id not in read1_within_intron: - read1_within_intron[transcript_id] = [] - read1_within_intron[transcript_id].append((start, end, chr, strand)) - - # if spans an entire intron - intron_length = end - start - if ( - read1_start < start - and read1_end > end - and intron_length < read1_length - and start not in block_ends1 - and end not in block_starts1 - and start not in block_ends2 - and end not in block_starts2 - ): - if transcript_id not in read1_within_intron: - read1_within_intron[transcript_id] = [] - read1_within_intron[transcript_id].append((start, end, chr, strand)) - - # SPLICED - read1_spliced_3UI = {} - - for chr, start, end, transcript_id, strand in utrons1: - if start in block_ends1 and end in block_starts1: - if transcript_id not in read1_spliced_3UI: - read1_spliced_3UI[transcript_id] = [] - read1_spliced_3UI[transcript_id].append((start, end, chr, strand)) - - ## READ 2 - # RETAINED - read2_within_intron = {} - - for chr, start, end, transcript_id, strand in utrons2: - # if starts/ends in intron/(s) - if ( - (start <= read2_start <= end or start <= read2_end <= end) - and start not in block_ends2 - and end not in block_starts2 - and start not in block_ends1 - and end not in block_starts1 - ): - if transcript_id not in read2_within_intron: - read2_within_intron[transcript_id] = [] - read2_within_intron[transcript_id].append((start, end, chr, strand)) - - # if spans an entire intron - intron_length = end - start - if ( - read2_start < start - and read2_end > end - and intron_length < read2_length - and start not in block_ends2 - and end not in block_starts2 - and start not in block_ends1 - and end not in block_starts1 - ): - if transcript_id not in read2_within_intron: - read2_within_intron[transcript_id] = [] - read2_within_intron[transcript_id].append((start, end, chr, strand)) - - # SPLICED - read2_spliced_3UI = {} - - for chr, start, end, transcript_id, strand in utrons2: - if start in block_ends2 and end in block_starts2: - if transcript_id not in read2_spliced_3UI: - read2_spliced_3UI[transcript_id] = [] - read2_spliced_3UI[transcript_id].append((start, end, chr, strand)) + + # Build a dictionary as its cleaner to work with + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + + def print_info(chr, start, end, transcript_id, strand, blocks, text, count=i_total_progress) -> None: + """Print information on progress to aid debugging.""" + print(f"\n{count} =========================== {text=}\n") + print(f"{transcript_id=}") + print(f"{chr=}") + print(f"{start=}") + print(f"{end=}") + print(f"{strand=}") + print(f"{blocks=}") + + # Retain within introns + read1_within_intron = isoslam.filter_within_introns(pair_features, blocks, read="read1") + read2_within_intron = isoslam.filter_within_introns(pair_features, blocks, read="read2") + # Retain spliced + read1_spliced_3UI = isoslam.filter_spliced_utrons(pair_features, blocks, read="read1") + read2_spliced_3UI = isoslam.filter_spliced_utrons(pair_features, blocks, read="read2") all_dicts = [read1_within_intron, read2_within_intron, read1_spliced_3UI, read2_spliced_3UI] - all_empty = all(not contents for contents in all_dicts) - if all_empty: + # Check that there are some retained regions (all dictionaries would be empty if there are none retained and + # not {} evaluates to True, hence wrapping in all()) + if all(not contents for contents in all_dicts): continue first_matched += 1 diff --git a/isoslam/isoslam.py b/isoslam/isoslam.py index 17e9fb3..291b63a 100644 --- a/isoslam/isoslam.py +++ b/isoslam/isoslam.py @@ -196,3 +196,83 @@ def zip_blocks(read: AlignedSegment) -> Iterator[tuple[Any, ...]]: Tuple of two lists of integers the first is start location, the second is the end location. """ return zip(*read.get_blocks()) + + +def filter_within_introns( + pair_features: dict[str, dict[str, Any]], + blocks: dict[str, dict[str, set[int]]], + read: str = "read1", +) -> dict[str, tuple[Any]]: + """ + Filter utrons that are within introns. + + Parameters + ---------- + pair_features : dict[str, dict] + Dictionary of extracted features and utron in both read directions. + blocks : dic[str: dict[str, set]] + Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end. + read : str + Direction of read to filter on, default is ''read1'' but can also use ''read2''. + + Returns + ------- + dict[str, tuple(Any)] + Dictionary of the chromosome, start, end and strand of transcripts that are within introns. + """ + within_intron: dict[str, Any] = {} + for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: + start_end_within_intron = ( + start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end + ) + spans_intron = ( + pair_features[read]["start"] < start + and pair_features[read]["end"] > end + and (end - start) < pair_features[read]["length"] + ) + if ( # pylint: disable=too-many-boolean-expressions + (start_end_within_intron or spans_intron) + # Start should not be in ends and ends should not be in start, can we combine the start and end block + # sets I wonder? + and start not in blocks["read1"]["ends"] + and end not in blocks["read1"]["starts"] + and start not in blocks["read2"]["ends"] + and end not in blocks["read2"]["starts"] + ): + # Why add an empty list and append a tuple? + if transcript_id not in within_intron: + within_intron[transcript_id] = [] + within_intron[transcript_id].append((start, end, chromosome, strand)) + return within_intron + + +def filter_spliced_utrons( + pair_features: dict[str, dict[str, Any]], + blocks: dict[str, dict[str, set[int]]], + read: str = "read1", +) -> dict[str, list[Any]]: + """ + Filter utrons where start is in the block ends or end is in the block start. + + Parameters + ---------- + pair_features : dict[str, dict] + Dictionary of extracted features and utron in both read directions. + blocks : dic[str: dict[str, set]] + Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end. + read : str + Direction of read to filter on, default is ''read1'' but can also use ''read2''. + + Returns + ------- + dict[str, tuple(Any)] + Dictionary of the chromosome, start, end and strand of transcripts that are within introns. + """ + spliced_3ui: dict[str, list[Any]] = {} + for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]: + if start in blocks[read]["ends"] and end in blocks[read]["starts"]: + # Why add an empty list and append a tuple? + if transcript_id not in spliced_3ui: + spliced_3ui[transcript_id] = [] + spliced_3ui[transcript_id].append((start, end, chromosome, strand)) + return spliced_3ui diff --git a/tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_2].out b/tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_0hr1].out similarity index 100% rename from tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_2].out rename to tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_0hr1].out diff --git a/tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_1].out b/tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_no4sU].out similarity index 100% rename from tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_1].out rename to tests/_regtest_outputs/test_all_introns_counts_and_info.test_main[file_no4sU].out diff --git a/tests/conftest.py b/tests/conftest.py index 0625d5d..342f45f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -148,3 +148,187 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe aligned segment to be testing because of this filtering. """ return [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0] + + +@pytest.fixture() +def start_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment: + """Fixture where the start (16061) is within the intron (16011-16161).""" + return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 16061 and x.reference_end == 16717][0] + + +@pytest.fixture() +def end_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment: + """Fixture where the end (24850) is within the intron (24715-24865).""" + return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 18492 and x.reference_end == 24850][0] + + +def _feature_pair( + bam_file: list[AlignedSegment], + read1_start: int, + read2_start: int, + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """Construct feature pairs.""" + aligned_segment1 = [x for x in bam_file if x.reference_start == read1_start][0] + aligned_segment2 = [x for x in bam_file if x.reference_start == read2_start][0] + pair_features = isoslam.extract_features_from_pair( + [ + aligned_segment1, + aligned_segment2, + ] + ) + # ...and get the utron for both reads and add them to the dictionary + _, gene_transcript = extract_strand_transcript + pair_features["read1"]["utron"] = isoslam.extract_utron( + pair_features["read1"], gene_transcript, coordinates=extract_transcript + ) + pair_features["read2"]["utron"] = isoslam.extract_utron( + pair_features["read2"], gene_transcript, coordinates=extract_transcript + ) + # ...then we can get the block_starts/ends + block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment1) + block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment2) + blocks = { + "read1": {"starts": block_starts1, "ends": block_ends1}, + "read2": {"starts": block_starts2, "ends": block_ends2}, + } + return (pair_features, blocks) + + +# Fixtures for within introns +@pytest.fixture() +def feature_pair_within_15967_24715( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 15967 and 24715 augment with utrons and return along with block start/ends. + + Both of these have a start within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_no4sU, + read1_start=15967, + read2_start=24715, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + +@pytest.fixture() +def feature_pair_within_17790_18093( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17790 and 18093 augment with utrons and return along with block start/ends. + + Both of these have a start within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17790, + read2_start=18093, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + +@pytest.fixture() +def feature_pair_within_17709_18290( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17709 and 18290 augment with utrons and return along with block start/ends. + + 17709 has a start within ''read1'' and end within ''read2'' whilst 18290 has a start within ''read2'' and an end + within ''read1''. + + Used in test_filter_within_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17709, + read2_start=18290, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + +# Fixtures for within and spliced +@pytest.fixture() +def feature_pair_within_14770_17814( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 14770 and 17814 augment with utrons and return along with block start/ends. + + 14770 has a start within ''read1'' and end within ''read2'' whilst 17814 has a start within ''read2'' and an end + within ''read1''. + + Used in both test_filter_within_introns() and test_filter_spliced_utrons() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_no4sU, + read1_start=14770, + read2_start=17814, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + +# Fixtures for spliced matches +@pytest.fixture() +def feature_pair_spliced_17739_17814( + bam_sorted_assigned_file_no4sU: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17739 and 17814 augment with utrons and return along with block start/ends. + + 17739 has an end that matches the end of ''read1'' whilst 17814 has a start that matches the end of ''read1''. + + Used in test_filter_spliced_utrons() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_no4sU, + read1_start=17739, + read2_start=17814, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) + + +@pytest.fixture() +def feature_pair_spliced_17430_18155( + bam_sorted_assigned_file_0hr1: list[AlignedSegment], + extract_transcript: dict[str, list[int | str]], + extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]], +) -> tuple[dict["str", Any], list, list, list, list]: + """ + Feature pair for 17430 and 18155 augment with utrons and return along with block start/ends. + + 17430 has an end that matches ''read1'' start whilst 18155 has an end that matches the start of ''read1'' and an end + within ''read1''. + + Used in test_filter_spliced_introns() + """ + return _feature_pair( + bam_file=bam_sorted_assigned_file_0hr1, + read1_start=17430, + read2_start=18155, + extract_transcript=extract_transcript, + extract_strand_transcript=extract_strand_transcript, + ) diff --git a/tests/test_all_introns_counts_and_info.py b/tests/test_all_introns_counts_and_info.py index ca2f111..a243a77 100644 --- a/tests/test_all_introns_counts_and_info.py +++ b/tests/test_all_introns_counts_and_info.py @@ -21,9 +21,11 @@ ("file_path"), [ pytest.param( - BAM_DIR / "sorted_assigned" / "d0_no4sU_filtered_remapped_sorted.sorted.assigned.bam", id="file 1" + BAM_DIR / "sorted_assigned" / "d0_no4sU_filtered_remapped_sorted.sorted.assigned.bam", id="file no4sU" + ), + pytest.param( + BAM_DIR / "sorted_assigned" / "d0_0hr1_filtered_remapped_sorted.sorted.assigned.bam", id="file 0hr1" ), - pytest.param(BAM_DIR / "sorted_assigned" / "d0_0hr1_filtered_remapped_sorted.sorted.assigned.bam", id="file 2"), ], ) def test_main(file_path: Path, tmp_path: Path, regtest) -> None: @@ -39,3 +41,5 @@ def test_main(file_path: Path, tmp_path: Path, regtest) -> None: # Ideally would like to use syrupy to test snapshots but pd.DataFrame() are not yet supported # https://github.com/syrupy-project/syrupy/issues/887 print(results.to_string(float_format="{:.4e}".format), file=regtest) + # Uncomment to print out debugging information when running tests + # assert False diff --git a/tests/test_isoslam.py b/tests/test_isoslam.py index 3eb8c0b..4b9fdc0 100644 --- a/tests/test_isoslam.py +++ b/tests/test_isoslam.py @@ -392,3 +392,105 @@ def test_zip_blocks( start_blocks, end_blocks = isoslam.zip_blocks(request.getfixturevalue(aligned_segment)) assert start_blocks == expected_start assert end_blocks == expected_end + + +@pytest.mark.parametrize( + ( + "feature_pair", + "read", + "expected", + ), + [ + pytest.param( # type: ignore[misc] + "feature_pair_within_15967_24715", + "read1", + {"ENST00000442898": [(16061, 16717, "9", "-")]}, + id="15967 and 24715 feature pair from no4sU", + ), + pytest.param( + "feature_pair_within_17790_18093", + "read1", + {"ENST00000442898": [(17855, 18027, "9", "-")]}, + id="17790 and 18093 feature pair from 0hr1", + ), + pytest.param( + "feature_pair_within_17709_18290", + "read1", + {"ENST00000442898": [(17479, 17718, "9", "-"), (17855, 18027, "9", "-")]}, + id="17709 and 18290 feature pair from 0hr1", + ), + pytest.param( + "feature_pair_within_14770_17814", + "read1", + {}, + id="14770 and 17814 feature pair from no4sU", + ), + ], +) +def test_filter_within_introns( + feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]], + read: str, + expected: dict[str, tuple[Any]], + request: pytest.FixtureRequest, +) -> None: + """ + Test filtering within introns. + + Tests are very basic and not all conditional statements are covered, in particular instances where transcripts span + the whole region are not covered. This may be challenging as the length is _always_ 150. + """ + # We first extract the required pair_features and block start/ends from the fixture + pair_features, blocks = request.getfixturevalue(feature_pair) + within_introns = isoslam.filter_within_introns( + pair_features, + blocks, + read, + ) + assert within_introns == expected + + +@pytest.mark.parametrize( + ( + "feature_pair", + "read", + "expected", + ), + [ + pytest.param( # type: ignore[misc] + "feature_pair_spliced_17739_17814", + "read1", + {"ENST00000442898": [(17855, 18027, "9", "-")]}, + id="14770 and 17814 for read1 from no4sU", + ), + pytest.param( + "feature_pair_spliced_17430_18155", + "read2", + {"ENST00000442898": [(18174, 18380, "9", "-"), (18492, 24850, "9", "-")]}, + id="17430 and 18155 for from 0hr1", + ), + pytest.param( + "feature_pair_within_14770_17814", + "read2", + {"ENST00000442898": [(17855, 18027, "9", "-")]}, + id="17430 and 18155 for from no4sU", + ), + ], +) +def test_filter_spliced_utrons( + feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]], + read: str, + expected: dict[str, list[Any]], + request: pytest.FixtureRequest, +) -> None: + """Test filtering spliced utrons. + + Tests are very basic and not all conditional statements are covered. + """ + # We first extract the required pair_features and block start/ends from the fixture + pair_features, blocks = request.getfixturevalue(feature_pair) + within_introns = isoslam.filter_spliced_utrons( + pair_features, + blocks, + read, + ) + assert within_introns == expected