Skip to content

Commit

Permalink
Merge pull request #196 from labgem/fix_msa
Browse files Browse the repository at this point in the history
Fix error to write msa
  • Loading branch information
JeanMainguy authored Mar 21, 2024
2 parents 42de260 + 8ad3e6e commit d681080
Showing 1 changed file with 47 additions and 49 deletions.
96 changes: 47 additions & 49 deletions ppanggolin/formats/writeMSA.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
def is_single_copy(family: GeneFamily, dup_margin: float = 0.95) -> bool:
"""
Check if a gene family can be considered 'single copy' or not
:param family: GeneFamily object
:param dup_margin: maximal number of genomes in which the gene family can have multiple members and still be considered a 'single copy' gene family
Expand All @@ -47,55 +47,55 @@ def get_families_to_write(pangenome: Pangenome, partition_filter: str = "core",
Get families corresponding to the given partition
:param pangenome: Partitioned pangenome
:param partition_filter: choice of partition to compute Multiple Sequence Alignement of the gene families
:param partition_filter: choice of partition to compute Multiple Sequence Alignment of the gene families
:param soft_core: Soft core threshold to use
:param dup_margin: maximal number of genomes in which the gene family can have multiple members and still be considered a 'single copy' gene family
:param single_copy: Use "single copy" (defined by dup_margin) gene families only
:return: set of families unique to one partition
"""
families = set()
nb_org = pangenome.number_of_organisms

if partition_filter == "all":
return pangenome.gene_families
if partition_filter in ["persistent", "shell", "cloud"]:
for family in pangenome.gene_families:
if family.named_partition == partition_filter:
if single_copy:
if is_single_copy(family, dup_margin):
families.add(family)
else:
families.add(family)
elif partition_filter in ["core", "accessory", "softcore"]:
if partition_filter == "core":
return set(pangenome.gene_families)
else:
families = set()
nb_org = pangenome.number_of_organisms
if partition_filter in ["persistent", "shell", "cloud"]:
for family in pangenome.gene_families:
if family.number_of_organisms == nb_org:
if family.named_partition == partition_filter:
if single_copy:
if is_single_copy(family, dup_margin):
families.add(family)
else:
families.add(family)
elif partition_filter == "accessory":
for family in pangenome.gene_families:
if family.number_of_organisms < nb_org:
if single_copy:
if is_single_copy(family, dup_margin):
elif partition_filter in ["core", "accessory", "softcore"]:
if partition_filter == "core":
for family in pangenome.gene_families:
if family.number_of_organisms == nb_org:
if single_copy:
if is_single_copy(family, dup_margin):
families.add(family)
else:
families.add(family)
else:
families.add(family)
elif partition_filter == "softcore":
for family in pangenome.gene_families:
if family.number_of_organisms >= nb_org * soft_core:
if single_copy:
if is_single_copy(family, dup_margin):
elif partition_filter == "accessory":
for family in pangenome.gene_families:
if family.number_of_organisms < nb_org:
if single_copy:
if is_single_copy(family, dup_margin):
families.add(family)
else:
families.add(family)
else:
families.add(family)
return families
elif partition_filter == "softcore":
for family in pangenome.gene_families:
if family.number_of_organisms >= nb_org * soft_core:
if single_copy:
if is_single_copy(family, dup_margin):
families.add(family)
else:
families.add(family)
return families


def translate(seq: str, code: Dict[str, str]) -> str:
def translate(seq: str, code: Dict[str, Dict[str, str]]) -> str:
"""translates the given dna sequence with the given translation table
:param seq: given dna sequence
Expand All @@ -120,7 +120,7 @@ def translate(seq: str, code: Dict[str, str]) -> str:
return protein


def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory, code_table: Dict[str, str],
def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory, code_table: Dict[str, Dict[str, str]],
source: str = 'protein', use_gene_id: bool = False) -> Path:
"""Write fasta files for each gene family
Expand Down Expand Up @@ -152,7 +152,7 @@ def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory
elif source == "protein":
f_obj.write(translate(gene.dna, code_table) + "\n")
else:
raise Exception("Unknown sequence source given (expected 'dna' or 'protein')")
raise AssertionError("Unknown sequence source given (expected 'dna' or 'protein')")
f_obj.flush()

return f_name
Expand Down Expand Up @@ -182,8 +182,8 @@ def launch_multi_mafft(args: List[Tuple[Path, Path, str]]):
launch_mafft(*args)


def compute_msa(families: set, output: Path, tmpdir: Path, cpu: int = 1, source: str = "protein",
use_gene_id: bool = False, code: int = 11, disable_bar: bool = False):
def compute_msa(families: Set[GeneFamily], output: Path, tmpdir: Path, cpu: int = 1, source: str = "protein",
use_gene_id: bool = False, code: str = "11", disable_bar: bool = False):
"""
Compute MSA between pangenome gene families
Expand All @@ -201,7 +201,7 @@ def compute_msa(families: set, output: Path, tmpdir: Path, cpu: int = 1, source:
write_total = 0
args = []
logging.getLogger("PPanGGOLiN").info("Preparing input files for MSA...")
code_table = genetic_codes(str(code))
code_table = genetic_codes(code)

for family in tqdm(families, unit="family", disable=disable_bar):
start_write = time.time()
Expand All @@ -210,14 +210,13 @@ def compute_msa(families: set, output: Path, tmpdir: Path, cpu: int = 1, source:
args.append((fname, output, family.name))

logging.getLogger("PPanGGOLiN").info("Computing the MSA ...")
bar = tqdm(range(len(families)), unit="family", disable=disable_bar)
with get_context('fork').Pool(cpu) as p:
for _ in p.imap_unordered(launch_multi_mafft, args):
bar.update()
bar.close()
with tqdm(total=len(families), unit="family", disable=disable_bar) as bar:
for _ in p.imap_unordered(launch_multi_mafft, args):
bar.update()


def write_whole_genome_msa(pangenome: Pangenome, families: set, phylo_name: str, outdir: Path,
def write_whole_genome_msa(pangenome: Pangenome, families: set, phylo_name: Path, outdir: Path,
use_gene_id: bool = False):
"""
Writes a whole genome msa file for additional phylogenetic analysis
Expand Down Expand Up @@ -285,7 +284,7 @@ def write_msa_files(pangenome: Pangenome, output: Path, cpu: int = 1, partition:
:param pangenome: Pangenome object with partition
:param output: Path to output directory
:param cpu: number of available core
:param partition: choice of partition to compute Multiple Sequence Alignement of the gene families
:param partition: choice of partition to compute Multiple Sequence Alignment of the gene families
:param tmpdir: path to temporary directory
:param source: indicates whether to use protein or dna sequences to compute the msa
:param soft_core: Soft core threshold to use
Expand Down Expand Up @@ -314,14 +313,13 @@ def write_msa_files(pangenome: Pangenome, output: Path, cpu: int = 1, partition:

# check that the code is similar than the one used previously, if there is one
if 'translation_table' in pangenome.parameters["cluster"]:
if pangenome.parameters["cluster"]["translation_table"] != translation_table:
if pangenome.parameters["cluster"]["translation_table"] != str(translation_table):
logging.getLogger("PPanGGOLiN").warning("The translation table used during clustering "
f"('{pangenome.parameters['cluster']['translation_table']}') "
f"is different than the one provided now ('{translation_table}')")
code = translation_table

compute_msa(families, outdir, cpu=cpu, tmpdir=tmpdir, source=source, use_gene_id=use_gene_id, code=code,
disable_bar=disable_bar)
compute_msa(families, outdir, cpu=cpu, tmpdir=tmpdir, source=source, use_gene_id=use_gene_id,
code=str(translation_table), disable_bar=disable_bar)
logging.getLogger("PPanGGOLiN").info(f"Done writing all {partition} MSA in: {outdir}")

if phylo:
Expand Down Expand Up @@ -388,7 +386,7 @@ def parser_msa(parser: argparse.ArgumentParser):
"option --dup_margin")
optional.add_argument("--partition", required=False, default="core",
choices=["all", "persistent", "shell", "cloud", "core", "accessory", 'softcore'],
help="compute Multiple Sequence Alignement of the gene families in the given partition")
help="compute Multiple Sequence Alignment of the gene families in the given partition")
optional.add_argument("--source", required=False, default="protein", choices=["dna", "protein"],
help="indicates whether to use protein or dna sequences to compute the msa")
optional.add_argument("--phylo", required=False, action='store_true',
Expand Down

0 comments on commit d681080

Please sign in to comment.