Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: building within_intron and spliced subsets #134

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion isoslam/all_introns_counts_and_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def fragment_iterator(read_iterator):
# Extract features
pair_features = isoslam.extract_features_from_pair(pair)
# DEBUGGING - Get information on features
if i_total_progress in (484):
if i_total_progress == 484:
print(f"{pair_features=}")
# Temporary code sets up variables from the returned dictionary to match those currently used
read1_start = pair_features["read1"]["start"]
Expand Down Expand Up @@ -155,9 +155,25 @@ def fragment_iterator(read_iterator):
## we will just ignore those few
block_starts1, block_ends1 = isoslam.zip_blocks(read1)
block_starts2, block_ends2 = isoslam.zip_blocks(read2)

# Build a dictionary as its cleaner to work with
blocks = {
"read1": {"starts": block_starts1, "ends": block_ends1},
"read2": {"starts": block_starts2, "ends": block_ends2},
}

# RETAINED
read1_within_intron = {}

def print_info(chr, start, end, transcript_id, strand, blocks, text, count=i_total_progress) -> None:
print(f"\n{count} =========================== {text=}\n")
print(f"{transcript_id=}")
print(f"{chr=}")
print(f"{start=}")
print(f"{end=}")
print(f"{strand=}")
print(f"\n{blocks}\n")

for chr, start, end, transcript_id, strand in utrons1:
# if starts/ends in intron/(s)
if (
Expand All @@ -167,6 +183,8 @@ def fragment_iterator(read_iterator):
and start not in block_ends2
and end not in block_starts2
):
# if transcript_id == "ENST00000442898":
# print_info(chr, start, end, transcript_id, strand, blocks, text="read1_within_intron")
if transcript_id not in read1_within_intron:
read1_within_intron[transcript_id] = []
read1_within_intron[transcript_id].append((start, end, chr, strand))
Expand All @@ -182,25 +200,36 @@ def fragment_iterator(read_iterator):
and start not in block_ends2
and end not in block_starts2
):
# if transcript_id == "ENST00000442898":
# print_info(chr, start, end, transcript_id, strand, blocks, text="spans_intron")
if transcript_id not in read1_within_intron:
read1_within_intron[transcript_id] = []
read1_within_intron[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read1_within_intron=}")

# SPLICED
read1_spliced_3UI = {}

for chr, start, end, transcript_id, strand in utrons1:
if start in block_ends1 and end in block_starts1:
# if transcript_id == "ENST00000442898":
# print_info(chr, start, end, transcript_id, strand, blocks, text="spliced")
if transcript_id not in read1_spliced_3UI:
read1_spliced_3UI[transcript_id] = []
read1_spliced_3UI[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read1_spliced_3UI=}")

## READ 2
# RETAINED
read2_within_intron = {}

for chr, start, end, transcript_id, strand in utrons2:
# if starts/ends in intron/(s)

if (
(start <= read2_start <= end or start <= read2_end <= end)
and start not in block_ends2
Expand All @@ -226,6 +255,9 @@ def fragment_iterator(read_iterator):
if transcript_id not in read2_within_intron:
read2_within_intron[transcript_id] = []
read2_within_intron[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read2_within_intron=}")

# SPLICED
read2_spliced_3UI = {}
Expand All @@ -235,6 +267,9 @@ def fragment_iterator(read_iterator):
if transcript_id not in read2_spliced_3UI:
read2_spliced_3UI[transcript_id] = []
read2_spliced_3UI[transcript_id].append((start, end, chr, strand))
# DEBUGGING - Get information on features
if i_total_progress == 484:
print(f"{read2_spliced_3UI=}")

all_dicts = [read1_within_intron, read2_within_intron, read1_spliced_3UI, read2_spliced_3UI]
all_empty = all(not contents for contents in all_dicts)
Expand Down
82 changes: 82 additions & 0 deletions isoslam/isoslam.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,85 @@ def zip_blocks(read: AlignedSegment) -> Iterator[tuple[Any, ...]]:
Tuple of two lists of integers the first is start location, the second is the end location.
"""
return zip(*read.get_blocks())


def filter_within_introns(
pair_features: dict[str, dict[str, Any]],
blocks: dict[str, dict[str, set[int]]],
read: str = "read1",
) -> dict[str, tuple[Any]]:
"""
Filter utrons that are within introns.

Parameters
----------
pair_features : dict[str, dict]
Dictionary of extracted features and utron in both read directions.
blocks : dic[str: dict[str, set]]
Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end.
read : str
Direction of read to filter on, default is ''read1'' but can also use ''read2''.

Returns
-------
dict[str, tuple(Any)]
Dictionary of the chromosome, start, end and strand of transcripts that are within introns.
"""
within_intron: dict[str, Any] = {}
for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]:
start_end_within_intron = (
start <= pair_features[read]["start"] <= end or start <= pair_features[read]["end"] <= end
)
spans_intron = (
pair_features[read]["start"] < start
and pair_features[read]["end"] > end
and pair_features[read]["length"] < (end - start)
)
if ( # pylint: disable=too-many-boolean-expressions
(start_end_within_intron or spans_intron)
# Start should not be in ends and ends should not be in start, can we combine the start and end block
# sets I wonder?
and start not in blocks["read1"]["ends"]
and end not in blocks["read1"]["starts"]
and start not in blocks["read2"]["ends"]
and end not in blocks["read2"]["starts"]
):
# Why add an empty list and append a tuple?
# TODO: Check where there are multiple matches that these are extracted
if transcript_id not in within_intron:
within_intron[transcript_id] = []
within_intron[transcript_id].append((chromosome, start, end, strand))
return within_intron


def filter_spliced_utrons(
pair_features: dict[str, dict[str, Any]],
blocks: dict[str, dict[str, set[int]]],
read: str = "read1",
) -> dict[str, list[Any]]:
"""
Filter utrons where start is in the block ends or end is in the block start.

Parameters
----------
pair_features : dict[str, dict]
Dictionary of extracted features and utron in both read directions.
blocks : dic[str: dict[str, set]]
Nested dictionary of start and ends for each read. Top level is read, with a dictionary of start and end.
read : str
Direction of read to filter on, default is ''read1'' but can also use ''read2''.

Returns
-------
dict[str, tuple(Any)]
Dictionary of the chromosome, start, end and strand of transcripts that are within introns.
"""
spliced_3ui: dict[str, list[Any]] = {}
for chromosome, start, end, transcript_id, strand in pair_features[read]["utron"]:
if start in blocks[read]["ends"] and end in blocks[read]["starts"]:
# Why add an empty list and append a tuple?
# TODO: Check where there are multiple matches that these are extracted
if transcript_id not in spliced_3ui:
spliced_3ui[transcript_id] = []
spliced_3ui[transcript_id].append((chromosome, start, end, strand))
return spliced_3ui
160 changes: 160 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,163 @@ def aligned_segment_assigned_20906(bam_sorted_assigned_file_0hr1: list[AlignedSe
aligned segment to be testing because of this filtering.
"""
return [x for x in bam_sorted_assigned_file_0hr1 if x.reference_start == 20906][0]


@pytest.fixture()
def start_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment:
"""Fixture where the start (16061) is within the intron (16011-16161)."""
return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 16061 and x.reference_end == 16717][0]


@pytest.fixture()
def end_within_intron(bam_sorted_assigned_file_no4sU: list[AlignedSegment]) -> AlignedSegment:
"""Fixture where the end (24850) is within the intron (24715-24865)."""
return [x for x in bam_sorted_assigned_file_no4sU if x.reference_start == 18492 and x.reference_end == 24850][0]


def _feature_pair(
bam_file: list[AlignedSegment],
read1_start: int,
read2_start: int,
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""Construct feature pairs."""
aligned_segment1 = [x for x in bam_file if x.reference_start == read1_start][0]
aligned_segment2 = [x for x in bam_file if x.reference_start == read2_start][0]
pair_features = isoslam.extract_features_from_pair(
[
aligned_segment1,
aligned_segment2,
]
)
# ...and get the utron for both reads and add them to the dictionary
_, gene_transcript = extract_strand_transcript
pair_features["read1"]["utron"] = isoslam.extract_utron(
pair_features["read1"], gene_transcript, coordinates=extract_transcript
)
pair_features["read2"]["utron"] = isoslam.extract_utron(
pair_features["read2"], gene_transcript, coordinates=extract_transcript
)
# ...then we can get the block_starts/ends
block_starts1, block_ends1 = isoslam.zip_blocks(aligned_segment1)
block_starts2, block_ends2 = isoslam.zip_blocks(aligned_segment2)
blocks = {
"read1": {"starts": block_starts1, "ends": block_ends1},
"read2": {"starts": block_starts2, "ends": block_ends2},
}
return (pair_features, blocks)


# Fixtures for within introns
@pytest.fixture()
def feature_pair_within_15967_24715(
bam_sorted_assigned_file_no4sU: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 15967 and 24715 augment with utrons and return along with block start/ends.

Both of these have a start within ''read1''.

Used in test_filter_within_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_no4sU,
read1_start=15967,
read2_start=24715,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)


@pytest.fixture()
def feature_pair_within_17790_18093(
bam_sorted_assigned_file_0hr1: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 17790 and 18093 augment with utrons and return along with block start/ends.

Both of these have a start within ''read1''.

Used in test_filter_within_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_0hr1,
read1_start=17790,
read2_start=18093,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)


@pytest.fixture()
def feature_pair_within_17709_18290(
bam_sorted_assigned_file_0hr1: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 17709 and 18290 augment with utrons and return along with block start/ends.

17709 has a start within ''read1'' and end within ''read2'' whilst 18290 has a start within ''read2'' and an end
within ''read1''.

Used in test_filter_within_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_0hr1,
read1_start=17709,
read2_start=18290,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)


# Fixtures for spliced matches
@pytest.fixture()
def feature_pair_spliced_17739_17814(
bam_sorted_assigned_file_no4sU: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 17739 and 17814 augment with utrons and return along with block start/ends.

17739 has an end that matches the end of ''read1'' whilst 17814 has a start that matches the end of ''read1''.

Used in test_filter_spliced_utrons()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_no4sU,
read1_start=17739,
read2_start=17814,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)


@pytest.fixture()
def feature_pair_spliced_17430_18155(
bam_sorted_assigned_file_0hr1: list[AlignedSegment],
extract_transcript: dict[str, list[int | str]],
extract_strand_transcript: tuple[dict[str, list[Any]], dict[str, str]],
) -> tuple[dict["str", Any], list, list, list, list]:
"""
Feature pair for 17430 and 18155 augment with utrons and return along with block start/ends.

17430 has an end that matches ''read1'' start whilst 18155 has an end that matches the start of ''read1'' and an end
within ''read1''.

Used in test_filter_spliced_introns()
"""
return _feature_pair(
bam_file=bam_sorted_assigned_file_0hr1,
read1_start=17430,
read2_start=18155,
extract_transcript=extract_transcript,
extract_strand_transcript=extract_strand_transcript,
)
2 changes: 2 additions & 0 deletions tests/test_all_introns_counts_and_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ def test_main(file_path: Path, tmp_path: Path, regtest) -> None:
# Ideally would like to use syrupy to test snapshots but pd.DataFrame() are not yet supported
# https://github.com/syrupy-project/syrupy/issues/887
print(results.to_string(float_format="{:.4e}".format), file=regtest)
# Uncomment to print out debugging information when running tests
# assert False
Loading
Loading