Skip to content

Commit

Permalink
refactor(wip): building within_intron and spliced subsets
Browse files Browse the repository at this point in the history
Closes #91
Closes #92

Extracts the duplicated code that builds subsets of reads that are within introns and within spliced_3ui and abstracts
out to functions with tests. Currently the tests fail and I don't think they should so I need to work out why and
perhaps add some additional fixtures to use here.
  • Loading branch information
slackline committed Jan 10, 2025
1 parent 83bb05c commit 743b339
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 1 deletion.
21 changes: 20 additions & 1 deletion isoslam/all_introns_counts_and_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def fragment_iterator(read_iterator):
# Extract features
pair_features = isoslam.extract_features_from_pair(pair)
# DEBUGGING - Get information on features
if i_total_progress in (484):
if i_total_progress == 484:
print(f"{pair_features=}")
# Temporary code sets up variables from the returned dictionary to match those currently used
read1_start = pair_features["read1"]["start"]
Expand Down Expand Up @@ -155,6 +155,13 @@ def fragment_iterator(read_iterator):
## we will just ignore those few
block_starts1, block_ends1 = isoslam.zip_blocks(read1)
block_starts2, block_ends2 = isoslam.zip_blocks(read2)

# Build a dictionary as its cleaner to work with
blocks = {
"read1": {"starts": block_starts1, "ends": block_ends1},
"read2": {"starts": block_starts2, "ends": block_ends2},
}

# RETAINED
read1_within_intron = {}

Expand Down Expand Up @@ -185,6 +192,9 @@ def fragment_iterator(read_iterator):
if transcript_id not in read1_within_intron:
read1_within_intron[transcript_id] = []
read1_within_intron[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read1_within_intron=}")

# SPLICED
read1_spliced_3UI = {}
Expand All @@ -194,6 +204,9 @@ def fragment_iterator(read_iterator):
if transcript_id not in read1_spliced_3UI:
read1_spliced_3UI[transcript_id] = []
read1_spliced_3UI[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read1_spliced_3UI=}")

## READ 2
# RETAINED
Expand Down Expand Up @@ -226,6 +239,9 @@ def fragment_iterator(read_iterator):
if transcript_id not in read2_within_intron:
read2_within_intron[transcript_id] = []
read2_within_intron[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read2_within_intron=}")

# SPLICED
read2_spliced_3UI = {}
Expand All @@ -235,6 +251,9 @@ def fragment_iterator(read_iterator):
if transcript_id not in read2_spliced_3UI:
read2_spliced_3UI[transcript_id] = []
read2_spliced_3UI[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read2_spliced_3UI=}")

all_dicts = [read1_within_intron, read2_within_intron, read1_spliced_3UI, read2_spliced_3UI]
all_empty = all(not contents for contents in all_dicts)
Expand Down
88 changes: 88 additions & 0 deletions isoslam/isoslam.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,91 @@ def zip_blocks(read: AlignedSegment) -> Iterator[tuple[Any, ...]]:
Tuple of two lists of integers the first is start location, the second is the end location.
"""
return zip(*read.get_blocks())


def filter_within_introns(
pair_features: dict[str, dict[str, Any]],
blocks: dict[str, dict[str, set[int]]],
read: str = "read1",
) -> dict[str, tuple[Any]]:
"""
Filter utrons that are within introns.
Parameters
----------
pair_features : dict[str, dict]
Dictionary of extracted features and utron in both read directions.
blocks : dic[str: dict[str, set]]
Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end.
read : str
Direction of read to filter on, default is ''read1'' but can also use ''read2''.
Returns
-------
dict[str, tuple(Any)]
Dictionary of the chromosome, start, end and strand of transcripts that are within introns.
"""
within_intron: dict[str, Any] = {}
for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]:
# print("\n=========================")
# print(f"{chr=}")
# print(f"{start=}")
# print(f"{end=}")
# print(f"{transcript_id=}")
# print(f"{strand=}")
# Check if starts/ends are within introns
if ( # pylint: disable=too-many-boolean-expressions
(start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end)
# Start should not be in ends and ends should not be in start, can we combine the start and end block
# sets I wonder?
and start not in blocks["read1"]["ends"]
and end not in blocks["read1"]["starts"]
and start not in blocks["read2"]["ends"]
and end not in blocks["read2"]["starts"]
):
# Why add an empty list and append a tuple?
# Should we not be getting the keys of within_intron to check transcript_id is not in?
if transcript_id not in within_intron:
within_intron[transcript_id] = []
within_intron[transcript_id].append((chromosome, start, end, strand))
return within_intron


def filter_spliced_utrons(
pair_features: dict[str, dict[str, Any]],
blocks: dict[str, dict[str, set[int]]],
read: str = "read1",
) -> dict[str, list[Any]]:
"""
Filter utrons that are within spliced 3UI's.
Parameters
----------
pair_features : dict[str, dict]
Dictionary of extracted features and utron in both read directions.
blocks : dic[str: dict[str, set]]
Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end.
read : str
Direction of read to filter on, default is ''read1'' but can also use ''read2''.
Returns
-------
dict[str, tuple(Any)]
Dictionary of the chromosome, start, end and strand of transcripts that are within introns.
"""
spliced_3ui: dict[str, list[Any]] = {}
for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]:
# print("\n=========================")
# print(f"{chr=}")
# print(f"{start=}")
# print(f"{end=}")
# print(f"{transcript_id=}")
# print(f"{strand=}")
# if start in block_ends1 and end in block_starts1:
if start in blocks[read]["ends"] and end in blocks[read]["starts"]:
# Why add an empty list and append a tuple?
# Should we not be getting the keys of within_intron to check transcript_id is not in?
if transcript_id not in spliced_3ui:
spliced_3ui[transcript_id] = []
spliced_3ui[transcript_id].append((chromosome, start, end, strand))
return spliced_3ui
68 changes: 68 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,71 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe
aligned segment to be testing because of this filtering.
"""
return [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0]


@pytest.fixture()
def feature_pair_14770_17814(
bam_sorted_assigned_file_no4sU: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""Feature pair for 17814 and 14770 augment with utrons and return along with block start/ends."""
aligned_segment_14770 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 14770][0]
aligned_segment_17814 = [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 17814][0]
pair_features = isoslam.extract_features_from_pair(
[
aligned_segment_14770,
aligned_segment_17814,
]
)
# ...and get the utron for both reads and add them to the dictionary
_, gene_transcript = extract_strand_transcript
pair_features["read1"]["utron"] = isoslam.extract_utron(
pair_features["read1"], gene_transcript, coordinates=extract_transcript
)
pair_features["read2"]["utron"] = isoslam.extract_utron(
pair_features["read2"], gene_transcript, coordinates=extract_transcript
)
# ...then we can get the block_starts/ends
block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_14770)
block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_17814)
blocks = {
"read1": {"starts": block_starts1, "ends": block_ends1},
"read2": {"starts": block_starts2, "ends": block_ends2},
}
# return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2)
return (pair_features, blocks)


@pytest.fixture()
def feature_pair_20906_21051(
bam_sorted_assigned_file_0hr1: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""Feature pair for 21051 and 20906 augment with utrons and return along with block start/ends."""
aligned_segment_20906 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0]
aligned_segment_21051 = [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 21051][0]
pair_features = isoslam.extract_features_from_pair(
[
aligned_segment_20906,
aligned_segment_21051,
]
)
# ...and get the utron for both reads and add them to the dictionary
_, gene_transcript = extract_strand_transcript
pair_features["read1"]["utron"] = isoslam.extract_utron(
pair_features["read1"], gene_transcript, coordinates=extract_transcript
)
pair_features["read2"]["utron"] = isoslam.extract_utron(
pair_features["read2"], gene_transcript, coordinates=extract_transcript
)
# ...then we can get the block_starts/ends
block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment_20906)
block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment_21051)
blocks = {
"read1": {"starts": block_starts1, "ends": block_ends1},
"read2": {"starts": block_starts2, "ends": block_ends2},
}
# return (pair_features, block_starts1, block_ends1, block_starts2, block_ends2)
return (pair_features, blocks)
102 changes: 102 additions & 0 deletions tests/test_isoslam.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,105 @@ def test_zip_blocks(
start_blocks, end_blocks = isoslam.zip_blocks(request.getfixturevalue(aligned_segment))
assert start_blocks == expected_start
assert end_blocks == expected_end


# Need to identify aligned segments that are within introns
@pytest.mark.parametrize(
(
"feature_pair",
"read",
"expected",
),
[
pytest.param( # type: ignore[misc]
"feature_pair_14770_17814",
"read1",
{},
id="14770 and 17814 for read1 - Assigned to MSTRG.63147",
),
pytest.param(
"feature_pair_14770_17814",
"read2",
{},
id="14770 and 17814 for read2 - Assigned to MSTRG.63147",
),
pytest.param(
"feature_pair_20906_21051",
"read1",
{},
id="20906 and 21051 for read1 - Unassigned",
),
pytest.param(
"feature_pair_20906_21051",
"read2",
{},
id="20906 and 21051 for read2 - Unassigned",
),
],
)
def test_filter_within_introns(
feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]],
read: str,
expected: dict[str, tuple[Any]],
request: pytest.FixtureRequest,
) -> None:
"""Test filtering within introns."""
# We first extract the required pair_features and block start/ends
pair_features, blocks = request.getfixturevalue(feature_pair)
within_introns = isoslam.filter_within_introns(
pair_features,
blocks,
read,
)
assert within_introns == expected


# Not getting the same as from debugging all_introns_counts_and_info.py
@pytest.mark.parametrize(
(
"feature_pair",
"read",
"expected",
),
[
pytest.param( # type: ignore[misc]
"feature_pair_14770_17814",
"read1",
{"ENST00000442898": [(18174, 18380, "9", "-")]},
id="14770 and 17814 for read1 - Assigned to MSTRG.63147",
),
pytest.param(
"feature_pair_14770_17814",
"read2",
{"ENST00000442898": [(17855, 18027, "9", "-")]},
id="14770 and 17814 for read2 - Assigned to MSTRG.63147",
),
pytest.param(
"feature_pair_20906_21051",
"read1",
{},
id="20906 and 21051 for read1 - Unassigned",
),
pytest.param(
"feature_pair_20906_21051",
"read2",
{},
id="20906 and 21051 for read2 - Unassigned",
),
],
)
def test_filter_spliced_utrons(
feature_pair: tuple[dict["str", Any], list[int], list[int], list[int], list[int]],
read: str,
expected: dict[str, list[Any]],
request: pytest.FixtureRequest,
) -> None:
"""Test filtering within introns."""
# We first extract the required pair_features and block start/ends
pair_features, blocks = request.getfixturevalue(feature_pair)
within_introns = isoslam.filter_spliced_utrons(
pair_features,
blocks,
read,
)
assert within_introns == expected

Check failure on line 496 in tests/test_isoslam.py

View workflow job for this annotation

GitHub Actions / Ex1 (3.10, ubuntu-latest)

test_filter_spliced_utrons[14770 and 17814 for read1 - Assigned to MSTRG.63147] AssertionError: assert {} == {'ENST0000044...0, '9', '-')]} Right contains 1 more item: {'ENST00000442898': [(18174, 18380, '9', '-')]} Full diff: + {} - { - 'ENST00000442898': [ - ( - 18174, - 18380, - '9', - '-', - ), - ], - }

Check failure on line 496 in tests/test_isoslam.py

View workflow job for this annotation

GitHub Actions / Ex1 (3.11, ubuntu-latest)

test_filter_spliced_utrons[14770 and 17814 for read1 - Assigned to MSTRG.63147] AssertionError: assert {} == {'ENST0000044...0, '9', '-')]} Right contains 1 more item: {'ENST00000442898': [(18174, 18380, '9', '-')]} Full diff: + {} - { - 'ENST00000442898': [ - ( - 18174, - 18380, - '9', - '-', - ), - ], - }

Check failure on line 496 in tests/test_isoslam.py

View workflow job for this annotation

GitHub Actions / Ex1 (3.12, ubuntu-latest)

test_filter_spliced_utrons[14770 and 17814 for read1 - Assigned to MSTRG.63147] AssertionError: assert {} == {'ENST0000044...0, '9', '-')]} Right contains 1 more item: {'ENST00000442898': [(18174, 18380, '9', '-')]} Full diff: + {} - { - 'ENST00000442898': [ - ( - 18174, - 18380, - '9', - '-', - ), - ], - }

Check failure on line 496 in tests/test_isoslam.py

View workflow job for this annotation

GitHub Actions / Ex1 (3.10, macos-latest)

test_filter_spliced_utrons[14770 and 17814 for read1 - Assigned to MSTRG.63147] AssertionError: assert {} == {'ENST0000044...0, '9', '-')]} Right contains 1 more item: {'ENST00000442898': [(18174, 18380, '9', '-')]} Full diff: + {} - { - 'ENST00000442898': [ - ( - 18174, - 18380, - '9', - '-', - ), - ], - }

Check failure on line 496 in tests/test_isoslam.py

View workflow job for this annotation

GitHub Actions / Ex1 (3.11, macos-latest)

test_filter_spliced_utrons[14770 and 17814 for read1 - Assigned to MSTRG.63147] AssertionError: assert {} == {'ENST0000044...0, '9', '-')]} Right contains 1 more item: {'ENST00000442898': [(18174, 18380, '9', '-')]} Full diff: + {} - { - 'ENST00000442898': [ - ( - 18174, - 18380, - '9', - '-', - ), - ], - }

Check failure on line 496 in tests/test_isoslam.py

View workflow job for this annotation

GitHub Actions / Ex1 (3.12, macos-latest)

test_filter_spliced_utrons[14770 and 17814 for read1 - Assigned to MSTRG.63147] AssertionError: assert {} == {'ENST0000044...0, '9', '-')]} Right contains 1 more item: {'ENST00000442898': [(18174, 18380, '9', '-')]} Full diff: + {} - { - 'ENST00000442898': [ - ( - 18174, - 18380, - '9', - '-', - ), - ], - }

0 comments on commit 743b339

Please sign in to comment.