Skip to content

Commit

Permalink
tests: Meaningful tests for refactored filtering
Browse files Browse the repository at this point in the history
Identified instances where meaningful results are obtained when filtering for instances where the start/end are within
introns or there is splicing.

Tests are not fully comprehensive yet as not all logic statements are covered, in particular when a transcript spans a
whole region but I'm currently a bit hazy on whether this will always happen as the `query_length` is always
`150` (`read_length` is longer and was the cause of previous errors).
  • Loading branch information
slackline committed Jan 13, 2025
1 parent ce1c18d commit 552d5f3
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 89 deletions.
16 changes: 16 additions & 0 deletions isoslam/all_introns_counts_and_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ def fragment_iterator(read_iterator):
# RETAINED
read1_within_intron = {}

def print_info(chr, start, end, transcript_id, strand, blocks, text, count=i_total_progress) -> None:
print(f"\n{count} =========================== {text=}\n")
print(f"{transcript_id=}")
print(f"{chr=}")
print(f"{start=}")
print(f"{end=}")
print(f"{strand=}")
print(f"\n{blocks}\n")

for chr, start, end, transcript_id, strand in utrons1:
# if starts/ends in intron/(s)
if (
Expand All @@ -174,6 +183,8 @@ def fragment_iterator(read_iterator):
and start not in block_ends2
and end not in block_starts2
):
# if transcript_id == "ENST00000442898":
# print_info(chr, start, end, transcript_id, strand, blocks, text="read1_within_intron")
if transcript_id not in read1_within_intron:
read1_within_intron[transcript_id] = []
read1_within_intron[transcript_id].append((start, end, chr, strand))
Expand All @@ -189,6 +200,8 @@ def fragment_iterator(read_iterator):
and start not in block_ends2
and end not in block_starts2
):
# if transcript_id == "ENST00000442898":
# print_info(chr, start, end, transcript_id, strand, blocks, text="spans_intron")
if transcript_id not in read1_within_intron:
read1_within_intron[transcript_id] = []
read1_within_intron[transcript_id].append((start, end, chr, strand))
Expand All @@ -201,6 +214,8 @@ def fragment_iterator(read_iterator):

for chr, start, end, transcript_id, strand in utrons1:
if start in block_ends1 and end in block_starts1:
# if transcript_id == "ENST00000442898":
# print_info(chr, start, end, transcript_id, strand, blocks, text="spliced")
if transcript_id not in read1_spliced_3UI:
read1_spliced_3UI[transcript_id] = []
read1_spliced_3UI[transcript_id].append((start, end, chr, strand))
Expand All @@ -214,6 +229,7 @@ def fragment_iterator(read_iterator):

for chr, start, end, transcript_id, strand in utrons2:
# if starts/ends in intron/(s)

if (
(start <= read2_start <= end or start <= read2_end <= end)
and start not in block_ends2
Expand Down
20 changes: 3 additions & 17 deletions isoslam/isoslam.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ def filter_within_introns(
"""
within_intron: dict[str, Any] = {}
for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]:
# print("\n=========================")
# print(f"{chr=}")
# print(f"{start=}")
# print(f"{end=}")
# print(f"{transcript_id=}")
# print(f"{strand=}")
# Check if starts/ends are within introns
start_end_within_intron = (
start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end
)
Expand All @@ -247,7 +240,7 @@ def filter_within_introns(
and end not in blocks["read2"]["starts"]
):
# Why add an empty list and append a tuple?
# Should we not be getting the keys of within_intron to check transcript_id is not in?
# TODO: Check where there are multiple matches that these are extracted
if transcript_id not in within_intron:
within_intron[transcript_id] = []
within_intron[transcript_id].append((chromosome, start, end, strand))
Expand All @@ -260,7 +253,7 @@ def filter_spliced_utrons(
read: str = "read1",
) -> dict[str, list[Any]]:
"""
Filter utrons that are within spliced 3UI's.
Filter utrons where start is in the block ends or end is in the block start.
Parameters
----------
Expand All @@ -278,16 +271,9 @@ def filter_spliced_utrons(
"""
spliced_3ui: dict[str, list[Any]] = {}
for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]:
# print("\n=========================")
# print(f"{chr=}")
# print(f"{start=}")
# print(f"{end=}")
# print(f"{transcript_id=}")
# print(f"{strand=}")
# if start in block_ends1 and end in block_starts1:
if start in blocks[read]["ends"] and end in blocks[read]["starts"]:
# Why add an empty list and append a tuple?
# Should we not be getting the keys of within_intron to check transcript_id is not in?
# TODO: Check where there are multiple matches that these are extracted
if transcript_id not in spliced_3ui:
spliced_3ui[transcript_id] = []
spliced_3ui[transcript_id].append((chromosome, start, end, strand))
Expand Down
160 changes: 126 additions & 34 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,31 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe


@pytest.fixture()
def feature_pair_14770_17814(
bam_sorted_assigned_file_no4sU: list[AlignedSegment],
def start_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment:
"""Fixture where the start (16061) is within the intron (16011-16161)."""
return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 16061 and x.reference_end == 16717][0]


@pytest.fixture()
def end_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment:
"""Fixture where the end (24850) is within the intron (24715-24865)."""
return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 18492 and x.reference_end == 24850][0]


def _feature_pair(
bam_file: list[AlignedSegment],
read1_start: int,
read2_start: int,
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""Feature pair for 17814 and 14770 augment with utrons and return along with block start/ends."""
aligned_segment_14770 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 14770][0]
aligned_segment_17814 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 17814][0]
"""Construct feature pairs."""
aligned_segment1 = [x for x in bam_file if x.reference_start == read1_start][0]
aligned_segment2 = [x for x in bam_file if x.reference_start == read2_start][0]
pair_features = isoslam.extract_features_from_pair(
[
aligned_segment_14770,
aligned_segment_17814,
aligned_segment1,
aligned_segment2,
]
)
# ...and get the utron for both reads and add them to the dictionary
Expand All @@ -174,45 +187,124 @@ def feature_pair_14770_17814(
pair_features["read2"], gene_transcript, coordinates=extract_transcript
)
# ...then we can get the block_starts/ends
block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_14770)
block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_17814)
block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment1)
block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment2)
blocks = {
"read1": {"starts": block_starts1, "ends": block_ends1},
"read2": {"starts": block_starts2, "ends": block_ends2},
}
# return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2)
return (pair_features, blocks)


# Fixtures for within introns
@pytest.fixture()
def feature_pair_within_15967_24715(
bam_sorted_assigned_file_no4sU: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 15967 and 24715 augment with utrons and return along with block start/ends.
Both of these have a start within ''read1''.
Used in test_filter_within_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_no4sU,
read1_start=15967,
read2_start=24715,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)


@pytest.fixture()
def feature_pair_20906_21051(
def feature_pair_within_17790_18093(
bam_sorted_assigned_file_0hr1: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""Feature pair for 21051 and 20906 augment with utrons and return along with block start/ends."""
aligned_segment_20906 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0]
aligned_segment_21051 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 21051][0]
pair_features = isoslam.extract_features_from_pair(
[
aligned_segment_20906,
aligned_segment_21051,
]
"""
Feature pair for 17790 and 18093 augment with utrons and return along with block start/ends.
Both of these have a start within ''read1''.
Used in test_filter_within_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_0hr1,
read1_start=17790,
read2_start=18093,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)
# ...and get the utron for both reads and add them to the dictionary
_, gene_transcript = extract_strand_transcript
pair_features["read1"]["utron"] = isoslam.extract_utron(
pair_features["read1"], gene_transcript, coordinates=extract_transcript


@pytest.fixture()
def feature_pair_within_17709_18290(
bam_sorted_assigned_file_0hr1: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 17709 and 18290 augment with utrons and return along with block start/ends.
17709 has a start within ''read1'' and end within ''read2'' whilst 18290 has a start within ''read2'' and an end
within ''read1''.
Used in test_filter_within_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_0hr1,
read1_start=17709,
read2_start=18290,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)
pair_features["read2"]["utron"] = isoslam.extract_utron(
pair_features["read2"], gene_transcript, coordinates=extract_transcript


# Fixtures for spliced matches
@pytest.fixture()
def feature_pair_spliced_17739_17814(
bam_sorted_assigned_file_no4sU: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 17739 and 17814 augment with utrons and return along with block start/ends.
17739 has an end that matches the end of ''read1'' whilst 17814 has a start that matches the end of ''read1''.
Used in test_filter_spliced_utrons()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_no4sU,
read1_start=17739,
read2_start=17814,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)


@pytest.fixture()
def feature_pair_spliced_17430_18155(
bam_sorted_assigned_file_0hr1: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 17430 and 18155 augment with utrons and return along with block start/ends.
17430 has an end that matches ''read1'' start whilst 18155 has an end that matches the start of ''read1'' and an end
within ''read1''.
Used in test_filter_spliced_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_0hr1,
read1_start=17430,
read2_start=18155,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)
# ...then we can get the block_starts/ends
block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_20906)
block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_21051)
blocks = {
"read1": {"starts": block_starts1, "ends": block_ends1},
"read2": {"starts": block_starts2, "ends": block_ends2},
}
# return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2)
return (pair_features, blocks)
2 changes: 2 additions & 0 deletions tests/test_all_introns_counts_and_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ def test_main(file_path: Path, tmp_path: Path, regtest) -> None:
# Ideally would like to use syrupy to test snapshots but pd.DataFrame() are not yet supported
# https://github.com/syrupy-project/syrupy/issues/887
print(results.to_string(float_format="{:.4e}".format), file=regtest)
# Uncomment to print out debugging information when running tests
# assert False
Loading

0 comments on commit 552d5f3

Please sign in to comment.