Skip to content

Commit

Permalink
Make gene translation more flexible by cutting off the end of genes t…
Browse files Browse the repository at this point in the history
…hat are not modulo 3
  • Loading branch information
jpjarnoux committed Apr 10, 2024
1 parent 922073d commit 9aaac23
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions ppanggolin/formats/writeMSA.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get_families_to_write(pangenome: Pangenome, partition_filter: str = "core",
return families


def translate(gene: Gene, code: Dict[str, Dict[str, str]]) -> str:
def translate(gene: Gene, code: Dict[str, Dict[str, str]]) -> Tuple[str, bool]:
"""translates the given dna sequence with the given translation table
:param gene: given gene
Expand All @@ -107,22 +107,25 @@ def translate(gene: Gene, code: Dict[str, Dict[str, str]]) -> str:
# code: https://www.bioinformatics.org/sms/iupac.html
start_table = code["start_table"]
table = code["trans_table"]
if len(gene.dna) % 3 == 0:
protein = start_table[gene.dna[0: 3]]
for i in range(3, len(gene.dna), 3):
codon = gene.dna[i: i + 3]
try:
protein += table[codon]
except KeyError: # codon was not planned for. Probably can't determine it.
protein += 'X' # X is for unknown
else:
logging.getLogger("PPANGGOLIN").error(f"Found gene length is {len(gene.dna)}")
raise IndexError(f"Gene {gene.ID} sequence length modulo 3 was different than 0, which is unexpected.")
return protein
mod = len(gene.dna) % 3
partial = False
if mod != 0:
partial = True
msg = (f"Gene {gene.ID} {'' if gene.local_identifier == '' else 'with local identifier ' + gene.local_identifier}"
f" has a sequence length of {len(gene.dna)} which modulo 3 was different than 0.")
logging.getLogger("PPANGGOLIN").debug(msg)
protein = start_table[gene.dna[0: 3]]
for i in range(3, len(gene.dna) - mod, 3):
codon = gene.dna[i: i + 3]
try:
protein += table[codon]
except KeyError: # codon was not planned for. Probably can't determine it.
protein += 'X' # X is for unknown
return protein, partial


def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory, code_table: Dict[str, Dict[str, str]],
source: str = 'protein', use_gene_id: bool = False) -> Path:
source: str = 'protein', use_gene_id: bool = False) -> Tuple[Path, bool]:
"""Write fasta files for each gene family
:param family: gene family to write
Expand All @@ -143,6 +146,7 @@ def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory
if len(genes) == 1:
single_copy_genes.extend(genes)

partial = False
for gene in single_copy_genes:
if use_gene_id:
f_obj.write('>' + gene.ID + "\n")
Expand All @@ -151,12 +155,14 @@ def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory
if source == "dna":
f_obj.write(gene.dna + '\n')
elif source == "protein":
f_obj.write(translate(gene, code_table) + "\n")
protein, part = translate(gene, code_table)
if not partial:
partial = part
f_obj.write(protein + "\n")
else:
raise AssertionError("Unknown sequence source given (expected 'dna' or 'protein')")
f_obj.flush()

return f_name
return f_name, partial


def launch_mafft(fname: Path, output: Path, fam_name: str):
Expand Down Expand Up @@ -204,12 +210,19 @@ def compute_msa(families: Set[GeneFamily], output: Path, tmpdir: Path, cpu: int
logging.getLogger("PPanGGOLiN").info("Preparing input files for MSA...")
code_table = genetic_codes(code)

partial = False
for family in tqdm(families, unit="family", disable=disable_bar):
start_write = time.time()
fname = write_fasta_families(family, newtmpdir, code_table, source, use_gene_id)
fname, part = write_fasta_families(family, newtmpdir, code_table, source, use_gene_id)
if not partial:
partial = part
write_total = write_total + (time.time() - start_write)
args.append((fname, output, family.name))

if partial:
logging.getLogger("PPanGGOLiN").warning("Partial gene was found during translation. "
"Last nucleotides were removed to translate. "
"Use --verbose 2 to see genes that are partial")
logging.getLogger("PPanGGOLiN").info("Computing the MSA ...")
with get_context('fork').Pool(cpu) as p:
with tqdm(total=len(families), unit="family", disable=disable_bar) as bar:
Expand Down

0 comments on commit 9aaac23

Please sign in to comment.