diff --git a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh old mode 100644 new mode 100755 index 1409a9d91..4f8ee1b82 --- a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh +++ b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh @@ -14,7 +14,8 @@ do echo "Process $INPUT" ASSEMBLY=$(basename $(dirname ${INPUT})); SC_NAME=$(basename $(dirname $(dirname ${INPUT}))); - TYPE=$(echo $(basename ${INPUT}) | cut -f 4- -d '_' | awk '{print substr($0,1,length($0)-11)}') + # SPLIT by GCA is to support format that have a taxonomy prefix (release 5+) or the one that do not (release 4-) + TYPE=$(echo $(basename ${INPUT}) | awk -F 'GCA' '{print $2}' |cut -f 3- -d '_' | awk '{print substr($0,1,length($0)-11)}') OUTPUT=tmp_${SC_NAME}_${ASSEMBLY}_${TYPE}.txt if [[ ${INPUT} == *.vcf.gz ]] then @@ -42,5 +43,5 @@ cat $ALL_TMP_OUTPUT | sort \ print ""; }' \ | grep -v '^$' | sort | uniq -c | sort -nr > "$OUTPUT_FILE" -rm $ALL_TMP_OUTPUT +rm -f $ALL_TMP_OUTPUT diff --git a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_release.sh b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_release.sh old mode 100644 new mode 100755 diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 1ef0cc3cd..5333edb8a 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -3,14 +3,19 @@ import glob import os from collections import defaultdict, Counter -from functools import lru_cache +from functools import lru_cache, cached_property +from urllib.parse import urlsplit from ebi_eva_common_pyutils.command_utils import run_command_with_output from ebi_eva_common_pyutils.common_utils import pretty_print +from ebi_eva_common_pyutils.config_utils import get_metadata_creds_for_profile from ebi_eva_common_pyutils.logger import logging_config, AppLogger from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query +from sqlalchemy import Column, String, Integer, BigInteger, ForeignKey, create_engine, URL, MetaData, TEXT, \ + UniqueConstraint, select +from sqlalchemy.orm import declarative_base, relationship, mapped_column, Session logger = logging_config.get_logger(__name__) @@ -36,6 +41,8 @@ def find_link(key_set, dict1, dict2, source_linked_set1=None, source_linked_set2 linked_set2 = source_linked_set2.copy() for key1 in key_set: if key1 in dict1: + # first set should at least contain the query + linked_set1.add(key1) for value1 in dict1.get(key1): linked_set2.add(value1) if value1 in dict2: @@ -89,14 +96,15 @@ def gather_count_for_set_species(release_directory, set_of_species, output_dir): def collect_files_to_count(release_directory, set_of_species): + """Collect all the (final) release files for a set of species""" all_files = [] for species in set_of_species: species_dir = os.path.join(release_directory, species) assembly_directories = glob.glob(os.path.join(species_dir, "GCA_*")) for assembly_dir in assembly_directories: - vcf_pattern = f'*_GCA_*_ids.vcf.gz' + vcf_pattern = f'*GCA_*_ids.vcf.gz' vcf_files = glob.glob(os.path.join(assembly_dir, vcf_pattern)) - txt_pattern = f'*_GCA_*_ids.txt.gz' + txt_pattern = f'*GCA_*_ids.txt.gz' txt_files = glob.glob(os.path.join(assembly_dir, txt_pattern)) # I don't have the taxonomy to add to the bash pattern so remove the file that start with eva_ or dbsnp_ vcf_files = [f for f in vcf_files if 'dbsnp_' not in f and 'eva_' not in f] @@ -107,7 +115,11 @@ def collect_files_to_count(release_directory, set_of_species): return all_files -def calculate_all_logs(release_dir, output_dir, species_directories=None): +def run_calculation_script_for_species(release_dir, output_dir, species_directories=None): + """ + Run the bash script that count the number of RS for all the specified species directories (all if not set) + and return the logs + """ all_assemblies_2_species, all_species_2_assemblies = gather_assemblies_and_species_from_directory(release_dir) all_sets_of_species = set() # Determine the species that needs to be counted together because they share assemblies @@ -141,36 +153,176 @@ def generate_output_tsv(dict_of_counter, output_file, header): ]) + '\n') +Base = declarative_base(metadata=MetaData(schema="eva_stats")) + + +class RSCountCategory(Base): + """ + Table that provide the metadata associated with a number of RS id + """ + __tablename__ = 'release_rs_count_category' + + count_category_id = Column(Integer, primary_key=True, autoincrement=True) + assembly = Column(String) + taxonomy = Column(Integer) + rs_type = Column(String) + release_version = Column(Integer) + rs_count_id = mapped_column(ForeignKey("release_rs_count.id")) + rs_count = relationship("RSCount", back_populates="count_categories") + __table_args__ = ( + UniqueConstraint('assembly', 'taxonomy', 'rs_type', 'release_version', 'rs_count_id', name='uix_1'), + ) + schema = 'eva_stats' + + +class RSCount(Base): + """ + Table that provide the count associated with each category + """ + __tablename__ = 'release_rs_count' + id = Column(Integer, primary_key=True, autoincrement=True) + count = Column(BigInteger) + group_description = Column(TEXT, unique=True) + count_categories = relationship("RSCountCategory", back_populates="rs_count") + schema = 'eva_stats' + + class ReleaseCounter(AppLogger): - def __init__(self, private_config_xml_file, release_version, logs): + def __init__(self, private_config_xml_file, config_profile, release_version, logs): self.private_config_xml_file = private_config_xml_file + self.config_profile = config_profile self.release_version = release_version self.all_counts_grouped = [] - self.parse_logs(logs) + self.parse_count_script_logs(logs) self.add_annotations() @lru_cache def get_taxonomy_and_scientific_name(self, species_folder): + # TODO: Restore this function to only retrieve the taxonomy and scientific name using the taxonomy table in release 6 query = ( f"select distinct c.taxonomy, t.scientific_name " f"from eva_progress_tracker.clustering_release_tracker c " f"join evapro.taxonomy t on c.taxonomy=t.taxonomy_id " - f"where release_version={self.release_version} AND release_folder_name='{species_folder}'" + f"where release_folder_name='{species_folder}'" ) - with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn: + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: results = get_all_results_for_query(db_conn, query) + if len(results) < 1: + # Rely only on the clustering_release_tracker when taxonomy is not available in EVAPRO + query = ( + f"select distinct taxonomy, scientific_name " + f"from eva_progress_tracker.clustering_release_tracker " + f"where release_folder_name='{species_folder}'" + ) + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: + results = get_all_results_for_query(db_conn, query) + if len(results) < 1: + # Support for directory from release 1 + if species_folder.split('_')[-1].isdigit(): + taxonomy = int(species_folder.split('_')[-1]) + query = ( + f"select distinct taxonomy, scientific_name " + f"from eva_progress_tracker.clustering_release_tracker " + f"where taxonomy={taxonomy}" + ) + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: + results = get_all_results_for_query(db_conn, query) if len(results) < 1: logger.warning(f'Failed to get scientific name and taxonomy for {species_folder}') return None, None return results[0][0], results[0][1] + @cached_property + def sqlalchemy_engine(self): + pg_url, pg_user, pg_pass = get_metadata_creds_for_profile(self.config_profile, self.private_config_xml_file) + dbtype, host_url, port_and_db = urlsplit(pg_url).path.split(':') + port, db = port_and_db.split('/') + engine = create_engine(URL.create( + dbtype + '+psycopg2', + username=pg_user, + password=pg_pass, + host=host_url.split('/')[-1], + database=db, + port=port + )) + RSCount.__table__.create(bind=engine, checkfirst=True) + RSCountCategory.__table__.create(bind=engine, checkfirst=True) + return engine + def add_annotations(self): for count_groups in self.all_counts_grouped: for count_dict in count_groups: taxonomy, scientific_name = self.get_taxonomy_and_scientific_name(count_dict['release_folder']) - count_dict['taxonomy'] = taxonomy - count_dict['scientific_name'] = scientific_name + if taxonomy: + count_dict['taxonomy'] = taxonomy + if scientific_name: + count_dict['scientific_name'] = scientific_name + + def count_descriptor(self, count_dict): + """Description for associated with specific taxonomy, assembly and type of RS""" + if 'taxonomy' in count_dict: + return f"{count_dict['taxonomy']},{count_dict['assembly']},{count_dict['idtype']}" + + def group_descriptor(self, count_groups): + """Description for associated with group of taxonomy, assembly and type of RS that have the same RS count""" + group_descriptor_list = [ + self.count_descriptor(count_dict) + for count_dict in count_groups] + if None not in group_descriptor_list: + return '|'.join(sorted(group_descriptor_list)) + f'_release_{self.release_version}' + + def write_counts_to_db(self): + """ + For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already. + Warn if the count already exists and are different. + """ + session = Session(self.sqlalchemy_engine) + with session.begin(): + for count_groups in self.all_counts_grouped: + # All the part of the group should have the same count + count_set = set((count_dict['count'] for count_dict in count_groups)) + assert len(count_set) == 1 + count = count_set.pop() + # This is used to uniquely identify the count for this group so that loading the same value twice does + # not result in duplicates + group_description = self.group_descriptor(count_groups) + if not group_description: + # One of the taxonomy annotation is missing, we should not load that count + continue + query = select(RSCount).where(RSCount.group_description == group_description) + result = session.execute(query).fetchone() + if result: + rs_count = result.RSCount + else: + rs_count = RSCount(count=count, group_description=group_description) + session.add(rs_count) + for count_dict in count_groups: + query = select(RSCountCategory).where( + RSCountCategory.assembly == count_dict['assembly'], + RSCountCategory.taxonomy == count_dict['taxonomy'], + RSCountCategory.rs_type == count_dict['idtype'], + RSCountCategory.release_version == self.release_version, + RSCountCategory.rs_count_id == rs_count.id, + ) + result = session.execute(query).fetchone() + if not result: + self.info(f"Create persistence for {count_dict['assembly']}, {count_dict['taxonomy']}, {count_dict['idtype']}, {count_dict['count']}") + rs_category = RSCountCategory( + assembly=count_dict['assembly'], + taxonomy=count_dict['taxonomy'], + rs_type=count_dict['idtype'], + release_version=self.release_version, + rs_count=rs_count + ) + session.add(rs_category) + else: + rs_category = result.RSCountCategory + # Check if we were just loading the same value as a previous run + if rs_category.rs_count.count != count_dict['count']: + self.error(f"{self.count_descriptor(count_dict)} already has a count entry in the table " + f"({rs_category.rs_count.count}) different from the one being loaded " + f"{count_dict['count']}") def get_assembly_counts_from_database(self): """ @@ -186,14 +338,14 @@ def get_assembly_counts_from_database(self): f" FROM {assembly_table_name} " f"WHERE release_version={self.release_version}" ) - with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn: + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: for row in get_all_results_for_query(db_conn, query): assembly = row[0] for index, metric in enumerate(all_metrics): results[assembly][metric] = row[index + 1] return results - def parse_logs(self, all_logs): + def parse_count_script_logs(self, all_logs): for log_file in all_logs: with open(log_file) as open_file: for line in open_file: @@ -204,7 +356,8 @@ def parse_logs(self, all_logs): for annotation in set_of_annotations: assembly, release_folder, idtype = annotation.split('-') all_groups.append( - {'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype} + {'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype, + 'annotation': annotation} ) self.all_counts_grouped.append(all_groups) @@ -256,7 +409,7 @@ def compare_assembly_counts_with_db(self, threshold=0, output_csv=None): header = ('Assembly', 'Metric', 'File', 'DB', 'Diff (file-db)') rows = [] count_per_assembly_from_files = self.generate_per_assembly_counts() - counts_per_assembly_from_db = self.get_counts_assembly_from_database() + counts_per_assembly_from_db = self.get_assembly_counts_from_database() all_asms = set(count_per_assembly_from_files.keys()).union(counts_per_assembly_from_db.keys()) for asm in all_asms: asm_counts_from_files = count_per_assembly_from_files.get(asm, {}) @@ -284,22 +437,32 @@ def main(): help="base directory where all the release was run.", required=True) parser.add_argument("--output-dir", type=str, help="Output directory where all count logs will be created.", required=True) - parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) parser.add_argument("--release-version", type=int, help="current release version", required=True) parser.add_argument("--species-directories", type=str, nargs='+', help="set of directory names to process. It will process all the related one as well. " "Process all if missing") + parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) + parser.add_argument("--config-profile", help="profile to use in the config xml", required=False, + default='production_processing') + args = parser.parse_args() logging_config.add_stdout_handler() + output_dir = os.path.abspath(args.output_dir) + # Move to the output dir to make sure the bash tmp dir are written their + os.chdir(output_dir) logger.info(f'Analyse {args.release_root_path}') - logs = calculate_all_logs(args.release_root_path, args.output_dir, args.species_directories) - counter = ReleaseCounter(args.private_config_xml_file, args.release_version, logs) - counter.detect_inconsistent_types() - generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy') - generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly') - generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly') - counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv')) + log_files = run_calculation_script_for_species(args.release_root_path, output_dir, args.species_directories) + counter = ReleaseCounter(args.private_config_xml_file, + config_profile=args.config_profile, release_version=args.release_version, logs=log_files) + counter.write_counts_to_db() + + # TODO: Other analysis should be performed on the database counts + # counter.detect_inconsistent_types() + # generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy') + # generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly') + # generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly') + # counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv')) if __name__ == "__main__": diff --git a/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py index b57d2fae6..33d3149bd 100644 --- a/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py @@ -6,7 +6,8 @@ def test_find_links(): 'A': ['1', '2'], 'B': ['2', '5'], 'C': ['3', '4'], - 'D': ['5'] + 'D': ['5'], + 'E': [] } d2 = { '1': ['A', 'B'], @@ -19,3 +20,4 @@ def test_find_links(): assert find_link({'B'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'})) assert find_link({'C'}, d1, d2) == (frozenset({'C'}), frozenset({'3', '4'})) assert find_link({'D'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'})) + assert find_link({'E'}, d1, d2) == (frozenset({'E'}), frozenset({})) diff --git a/eva-accession-release-automation/requirements.txt b/eva-accession-release-automation/requirements.txt index 2bf2101e8..1546ff9f5 100644 --- a/eva-accession-release-automation/requirements.txt +++ b/eva-accession-release-automation/requirements.txt @@ -4,3 +4,4 @@ pytz==2022.6 pyyaml==5.3.1 requests==2.22.0 retry==0.9.2 +sqlalchemy==2.0.22