diff --git a/ppanggolin/formats/writeMSA.py b/ppanggolin/formats/writeMSA.py index 7f5198d9..e91b0753 100644 --- a/ppanggolin/formats/writeMSA.py +++ b/ppanggolin/formats/writeMSA.py @@ -25,7 +25,7 @@ def is_single_copy(family: GeneFamily, dup_margin: float = 0.95) -> bool: """ Check if a gene family can be considered 'single copy' or not - + :param family: GeneFamily object :param dup_margin: maximal number of genomes in which the gene family can have multiple members and still be considered a 'single copy' gene family @@ -47,55 +47,55 @@ def get_families_to_write(pangenome: Pangenome, partition_filter: str = "core", Get families corresponding to the given partition :param pangenome: Partitioned pangenome - :param partition_filter: choice of partition to compute Multiple Sequence Alignement of the gene families + :param partition_filter: choice of partition to compute Multiple Sequence Alignment of the gene families :param soft_core: Soft core threshold to use :param dup_margin: maximal number of genomes in which the gene family can have multiple members and still be considered a 'single copy' gene family :param single_copy: Use "single copy" (defined by dup_margin) gene families only :return: set of families unique to one partition """ - families = set() - nb_org = pangenome.number_of_organisms - if partition_filter == "all": - return pangenome.gene_families - if partition_filter in ["persistent", "shell", "cloud"]: - for family in pangenome.gene_families: - if family.named_partition == partition_filter: - if single_copy: - if is_single_copy(family, dup_margin): - families.add(family) - else: - families.add(family) - elif partition_filter in ["core", "accessory", "softcore"]: - if partition_filter == "core": + return set(pangenome.gene_families) + else: + families = set() + nb_org = pangenome.number_of_organisms + if partition_filter in ["persistent", "shell", "cloud"]: for family in pangenome.gene_families: - if family.number_of_organisms == nb_org: + if family.named_partition == partition_filter: if single_copy: if is_single_copy(family, dup_margin): families.add(family) else: families.add(family) - elif partition_filter == "accessory": - for family in pangenome.gene_families: - if family.number_of_organisms < nb_org: - if single_copy: - if is_single_copy(family, dup_margin): + elif partition_filter in ["core", "accessory", "softcore"]: + if partition_filter == "core": + for family in pangenome.gene_families: + if family.number_of_organisms == nb_org: + if single_copy: + if is_single_copy(family, dup_margin): + families.add(family) + else: families.add(family) - else: - families.add(family) - elif partition_filter == "softcore": - for family in pangenome.gene_families: - if family.number_of_organisms >= nb_org * soft_core: - if single_copy: - if is_single_copy(family, dup_margin): + elif partition_filter == "accessory": + for family in pangenome.gene_families: + if family.number_of_organisms < nb_org: + if single_copy: + if is_single_copy(family, dup_margin): + families.add(family) + else: families.add(family) - else: - families.add(family) - return families + elif partition_filter == "softcore": + for family in pangenome.gene_families: + if family.number_of_organisms >= nb_org * soft_core: + if single_copy: + if is_single_copy(family, dup_margin): + families.add(family) + else: + families.add(family) + return families -def translate(seq: str, code: Dict[str, str]) -> str: +def translate(seq: str, code: Dict[str, Dict[str, str]]) -> str: """translates the given dna sequence with the given translation table :param seq: given dna sequence @@ -120,7 +120,7 @@ def translate(seq: str, code: Dict[str, str]) -> str: return protein -def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory, code_table: Dict[str, str], +def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory, code_table: Dict[str, Dict[str, str]], source: str = 'protein', use_gene_id: bool = False) -> Path: """Write fasta files for each gene family @@ -152,7 +152,7 @@ def write_fasta_families(family: GeneFamily, tmpdir: tempfile.TemporaryDirectory elif source == "protein": f_obj.write(translate(gene.dna, code_table) + "\n") else: - raise Exception("Unknown sequence source given (expected 'dna' or 'protein')") + raise AssertionError("Unknown sequence source given (expected 'dna' or 'protein')") f_obj.flush() return f_name @@ -182,8 +182,8 @@ def launch_multi_mafft(args: List[Tuple[Path, Path, str]]): launch_mafft(*args) -def compute_msa(families: set, output: Path, tmpdir: Path, cpu: int = 1, source: str = "protein", - use_gene_id: bool = False, code: int = 11, disable_bar: bool = False): +def compute_msa(families: Set[GeneFamily], output: Path, tmpdir: Path, cpu: int = 1, source: str = "protein", + use_gene_id: bool = False, code: str = "11", disable_bar: bool = False): """ Compute MSA between pangenome gene families @@ -201,7 +201,7 @@ def compute_msa(families: set, output: Path, tmpdir: Path, cpu: int = 1, source: write_total = 0 args = [] logging.getLogger("PPanGGOLiN").info("Preparing input files for MSA...") - code_table = genetic_codes(str(code)) + code_table = genetic_codes(code) for family in tqdm(families, unit="family", disable=disable_bar): start_write = time.time() @@ -210,14 +210,13 @@ def compute_msa(families: set, output: Path, tmpdir: Path, cpu: int = 1, source: args.append((fname, output, family.name)) logging.getLogger("PPanGGOLiN").info("Computing the MSA ...") - bar = tqdm(range(len(families)), unit="family", disable=disable_bar) with get_context('fork').Pool(cpu) as p: - for _ in p.imap_unordered(launch_multi_mafft, args): - bar.update() - bar.close() + with tqdm(total=len(families), unit="family", disable=disable_bar) as bar: + for _ in p.imap_unordered(launch_multi_mafft, args): + bar.update() -def write_whole_genome_msa(pangenome: Pangenome, families: set, phylo_name: str, outdir: Path, +def write_whole_genome_msa(pangenome: Pangenome, families: set, phylo_name: Path, outdir: Path, use_gene_id: bool = False): """ Writes a whole genome msa file for additional phylogenetic analysis @@ -285,7 +284,7 @@ def write_msa_files(pangenome: Pangenome, output: Path, cpu: int = 1, partition: :param pangenome: Pangenome object with partition :param output: Path to output directory :param cpu: number of available core - :param partition: choice of partition to compute Multiple Sequence Alignement of the gene families + :param partition: choice of partition to compute Multiple Sequence Alignment of the gene families :param tmpdir: path to temporary directory :param source: indicates whether to use protein or dna sequences to compute the msa :param soft_core: Soft core threshold to use @@ -314,14 +313,13 @@ def write_msa_files(pangenome: Pangenome, output: Path, cpu: int = 1, partition: # check that the code is similar than the one used previously, if there is one if 'translation_table' in pangenome.parameters["cluster"]: - if pangenome.parameters["cluster"]["translation_table"] != translation_table: + if pangenome.parameters["cluster"]["translation_table"] != str(translation_table): logging.getLogger("PPanGGOLiN").warning("The translation table used during clustering " f"('{pangenome.parameters['cluster']['translation_table']}') " f"is different than the one provided now ('{translation_table}')") - code = translation_table - compute_msa(families, outdir, cpu=cpu, tmpdir=tmpdir, source=source, use_gene_id=use_gene_id, code=code, - disable_bar=disable_bar) + compute_msa(families, outdir, cpu=cpu, tmpdir=tmpdir, source=source, use_gene_id=use_gene_id, + code=str(translation_table), disable_bar=disable_bar) logging.getLogger("PPanGGOLiN").info(f"Done writing all {partition} MSA in: {outdir}") if phylo: @@ -388,7 +386,7 @@ def parser_msa(parser: argparse.ArgumentParser): "option --dup_margin") optional.add_argument("--partition", required=False, default="core", choices=["all", "persistent", "shell", "cloud", "core", "accessory", 'softcore'], - help="compute Multiple Sequence Alignement of the gene families in the given partition") + help="compute Multiple Sequence Alignment of the gene families in the given partition") optional.add_argument("--source", required=False, default="protein", choices=["dna", "protein"], help="indicates whether to use protein or dna sequences to compute the msa") optional.add_argument("--phylo", required=False, action='store_true',