Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3410 - Persistence of the exploded counts to eva_stats #423

Merged
merged 13 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ do
echo "Process $INPUT"
ASSEMBLY=$(basename $(dirname ${INPUT}));
SC_NAME=$(basename $(dirname $(dirname ${INPUT})));
TYPE=$(echo $(basename ${INPUT}) | cut -f 4- -d '_' | awk '{print substr($0,1,length($0)-11)}')
# SPLIT by GCA is to support format that have a taxonomy prefix (release 5+) or the one that do not (release 4-)
TYPE=$(echo $(basename ${INPUT}) | awk -F 'GCA' '{print $2}' |cut -f 3- -d '_' | awk '{print substr($0,1,length($0)-11)}')
OUTPUT=tmp_${SC_NAME}_${ASSEMBLY}_${TYPE}.txt
if [[ ${INPUT} == *.vcf.gz ]]
then
Expand Down Expand Up @@ -42,5 +43,5 @@ cat $ALL_TMP_OUTPUT | sort \
print ""; }' \
| grep -v '^$' | sort | uniq -c | sort -nr > "$OUTPUT_FILE"

rm $ALL_TMP_OUTPUT
rm -f $ALL_TMP_OUTPUT

Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import glob
import os
from collections import defaultdict, Counter
from functools import lru_cache
from functools import lru_cache, cached_property
from urllib.parse import urlsplit

from ebi_eva_common_pyutils.command_utils import run_command_with_output
from ebi_eva_common_pyutils.common_utils import pretty_print
from ebi_eva_common_pyutils.config_utils import get_metadata_creds_for_profile
from ebi_eva_common_pyutils.logger import logging_config, AppLogger
from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query

from sqlalchemy import Column, String, Integer, BigInteger, ForeignKey, create_engine, URL, MetaData, TEXT, \
UniqueConstraint, select
from sqlalchemy.orm import declarative_base, relationship, mapped_column, Session

logger = logging_config.get_logger(__name__)

Expand All @@ -36,6 +41,8 @@ def find_link(key_set, dict1, dict2, source_linked_set1=None, source_linked_set2
linked_set2 = source_linked_set2.copy()
for key1 in key_set:
if key1 in dict1:
# first set should at least contain the query
linked_set1.add(key1)
for value1 in dict1.get(key1):
linked_set2.add(value1)
if value1 in dict2:
Expand Down Expand Up @@ -89,14 +96,15 @@ def gather_count_for_set_species(release_directory, set_of_species, output_dir):


def collect_files_to_count(release_directory, set_of_species):
"""Collect all the (final) release files for a set of species"""
all_files = []
for species in set_of_species:
species_dir = os.path.join(release_directory, species)
assembly_directories = glob.glob(os.path.join(species_dir, "GCA_*"))
for assembly_dir in assembly_directories:
vcf_pattern = f'*_GCA_*_ids.vcf.gz'
vcf_pattern = f'*GCA_*_ids.vcf.gz'
vcf_files = glob.glob(os.path.join(assembly_dir, vcf_pattern))
txt_pattern = f'*_GCA_*_ids.txt.gz'
txt_pattern = f'*GCA_*_ids.txt.gz'
txt_files = glob.glob(os.path.join(assembly_dir, txt_pattern))
# I don't have the taxonomy to add to the bash pattern so remove the file that start with eva_ or dbsnp_
vcf_files = [f for f in vcf_files if 'dbsnp_' not in f and 'eva_' not in f]
Expand All @@ -107,7 +115,11 @@ def collect_files_to_count(release_directory, set_of_species):
return all_files


def calculate_all_logs(release_dir, output_dir, species_directories=None):
def run_calculation_script_for_species(release_dir, output_dir, species_directories=None):
"""
Run the bash script that count the number of RS for all the specified species directories (all if not set)
and return the logs
"""
all_assemblies_2_species, all_species_2_assemblies = gather_assemblies_and_species_from_directory(release_dir)
all_sets_of_species = set()
# Determine the species that needs to be counted together because they share assemblies
Expand Down Expand Up @@ -141,36 +153,176 @@ def generate_output_tsv(dict_of_counter, output_file, header):
]) + '\n')


Base = declarative_base(metadata=MetaData(schema="eva_stats"))


class RSCountCategory(Base):
"""
Table that provide the metadata associated with a number of RS id
"""
__tablename__ = 'release_rs_count_category'

count_category_id = Column(Integer, primary_key=True, autoincrement=True)
assembly = Column(String)
taxonomy = Column(Integer)
rs_type = Column(String)
release_version = Column(Integer)
rs_count_id = mapped_column(ForeignKey("release_rs_count.id"))
rs_count = relationship("RSCount", back_populates="count_categories")
__table_args__ = (
UniqueConstraint('assembly', 'taxonomy', 'rs_type', 'release_version', 'rs_count_id', name='uix_1'),
)
schema = 'eva_stats'


class RSCount(Base):
"""
Table that provide the count associated with each category
"""
__tablename__ = 'release_rs_count'
id = Column(Integer, primary_key=True, autoincrement=True)
count = Column(BigInteger)
group_description = Column(TEXT, unique=True)
count_categories = relationship("RSCountCategory", back_populates="rs_count")
schema = 'eva_stats'


class ReleaseCounter(AppLogger):

def __init__(self, private_config_xml_file, release_version, logs):
def __init__(self, private_config_xml_file, config_profile, release_version, logs):
self.private_config_xml_file = private_config_xml_file
self.config_profile = config_profile
self.release_version = release_version
self.all_counts_grouped = []
self.parse_logs(logs)
self.parse_count_script_logs(logs)
self.add_annotations()

@lru_cache
def get_taxonomy_and_scientific_name(self, species_folder):
# TODO: Restore this function to only retrieve the taxonomy and scientific name using the taxonomy table in release 6
query = (
f"select distinct c.taxonomy, t.scientific_name "
f"from eva_progress_tracker.clustering_release_tracker c "
f"join evapro.taxonomy t on c.taxonomy=t.taxonomy_id "
f"where release_version={self.release_version} AND release_folder_name='{species_folder}'"
f"where release_folder_name='{species_folder}'"
)
with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn:
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
results = get_all_results_for_query(db_conn, query)
if len(results) < 1:
# Rely only on the clustering_release_tracker when taxonomy is not available in EVAPRO
query = (
f"select distinct taxonomy, scientific_name "
f"from eva_progress_tracker.clustering_release_tracker "
f"where release_folder_name='{species_folder}'"
)
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
results = get_all_results_for_query(db_conn, query)
if len(results) < 1:
# Support for directory from release 1
if species_folder.split('_')[-1].isdigit():
taxonomy = int(species_folder.split('_')[-1])
query = (
f"select distinct taxonomy, scientific_name "
f"from eva_progress_tracker.clustering_release_tracker "
f"where taxonomy={taxonomy}"
)
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
results = get_all_results_for_query(db_conn, query)
if len(results) < 1:
logger.warning(f'Failed to get scientific name and taxonomy for {species_folder}')
return None, None
return results[0][0], results[0][1]

@cached_property
def sqlalchemy_engine(self):
pg_url, pg_user, pg_pass = get_metadata_creds_for_profile(self.config_profile, self.private_config_xml_file)
dbtype, host_url, port_and_db = urlsplit(pg_url).path.split(':')
port, db = port_and_db.split('/')
engine = create_engine(URL.create(
dbtype + '+psycopg2',
username=pg_user,
password=pg_pass,
host=host_url.split('/')[-1],
database=db,
port=port
))
RSCount.__table__.create(bind=engine, checkfirst=True)
RSCountCategory.__table__.create(bind=engine, checkfirst=True)
return engine

def add_annotations(self):
for count_groups in self.all_counts_grouped:
for count_dict in count_groups:
taxonomy, scientific_name = self.get_taxonomy_and_scientific_name(count_dict['release_folder'])
count_dict['taxonomy'] = taxonomy
count_dict['scientific_name'] = scientific_name
if taxonomy:
count_dict['taxonomy'] = taxonomy
if scientific_name:
count_dict['scientific_name'] = scientific_name

def count_descriptor(self, count_dict):
"""Description for associated with specific taxonomy, assembly and type of RS"""
if 'taxonomy' in count_dict:
return f"{count_dict['taxonomy']},{count_dict['assembly']},{count_dict['idtype']}"

def group_descriptor(self, count_groups):
"""Description for associated with group of taxonomy, assembly and type of RS that have the same RS count"""
group_descriptor_list = [
self.count_descriptor(count_dict)
for count_dict in count_groups]
if None not in group_descriptor_list:
return '|'.join(sorted(group_descriptor_list)) + f'_release_{self.release_version}'

def write_counts_to_db(self):
"""
For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already.
Warn if the count already exists and are different.
"""
session = Session(self.sqlalchemy_engine)
with session.begin():
for count_groups in self.all_counts_grouped:
# All the part of the group should have the same count
count_set = set((count_dict['count'] for count_dict in count_groups))
assert len(count_set) == 1
count = count_set.pop()
# This is used to uniquely identify the count for this group so that loading the same value twice does
# not result in duplicates
group_description = self.group_descriptor(count_groups)
if not group_description:
# One of the taxonomy annotation is missing, we should not load that count
continue
query = select(RSCount).where(RSCount.group_description == group_description)
result = session.execute(query).fetchone()
if result:
rs_count = result.RSCount
else:
rs_count = RSCount(count=count, group_description=group_description)
session.add(rs_count)
for count_dict in count_groups:
query = select(RSCountCategory).where(
RSCountCategory.assembly == count_dict['assembly'],
RSCountCategory.taxonomy == count_dict['taxonomy'],
RSCountCategory.rs_type == count_dict['idtype'],
RSCountCategory.release_version == self.release_version,
RSCountCategory.rs_count_id == rs_count.id,
)
result = session.execute(query).fetchone()
if not result:
self.info(f"Create persistence for {count_dict['assembly']}, {count_dict['taxonomy']}, {count_dict['idtype']}, {count_dict['count']}")
rs_category = RSCountCategory(
assembly=count_dict['assembly'],
taxonomy=count_dict['taxonomy'],
rs_type=count_dict['idtype'],
release_version=self.release_version,
rs_count=rs_count
)
session.add(rs_category)
else:
rs_category = result.RSCountCategory
# Check if we were just loading the same value as a previous run
if rs_category.rs_count.count != count_dict['count']:
self.error(f"{self.count_descriptor(count_dict)} already has a count entry in the table "
f"({rs_category.rs_count.count}) different from the one being loaded "
f"{count_dict['count']}")

def get_assembly_counts_from_database(self):
"""
Expand All @@ -186,14 +338,14 @@ def get_assembly_counts_from_database(self):
f" FROM {assembly_table_name} "
f"WHERE release_version={self.release_version}"
)
with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn:
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
for row in get_all_results_for_query(db_conn, query):
assembly = row[0]
for index, metric in enumerate(all_metrics):
results[assembly][metric] = row[index + 1]
return results

def parse_logs(self, all_logs):
def parse_count_script_logs(self, all_logs):
for log_file in all_logs:
with open(log_file) as open_file:
for line in open_file:
Expand All @@ -204,7 +356,8 @@ def parse_logs(self, all_logs):
for annotation in set_of_annotations:
assembly, release_folder, idtype = annotation.split('-')
all_groups.append(
{'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype}
{'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype,
'annotation': annotation}
)
self.all_counts_grouped.append(all_groups)

Expand Down Expand Up @@ -256,7 +409,7 @@ def compare_assembly_counts_with_db(self, threshold=0, output_csv=None):
header = ('Assembly', 'Metric', 'File', 'DB', 'Diff (file-db)')
rows = []
count_per_assembly_from_files = self.generate_per_assembly_counts()
counts_per_assembly_from_db = self.get_counts_assembly_from_database()
counts_per_assembly_from_db = self.get_assembly_counts_from_database()
all_asms = set(count_per_assembly_from_files.keys()).union(counts_per_assembly_from_db.keys())
for asm in all_asms:
asm_counts_from_files = count_per_assembly_from_files.get(asm, {})
Expand Down Expand Up @@ -284,22 +437,32 @@ def main():
help="base directory where all the release was run.", required=True)
parser.add_argument("--output-dir", type=str,
help="Output directory where all count logs will be created.", required=True)
parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True)
parser.add_argument("--release-version", type=int, help="current release version", required=True)
parser.add_argument("--species-directories", type=str, nargs='+',
help="set of directory names to process. It will process all the related one as well. "
"Process all if missing")
parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True)
parser.add_argument("--config-profile", help="profile to use in the config xml", required=False,
default='production_processing')


args = parser.parse_args()
logging_config.add_stdout_handler()
output_dir = os.path.abspath(args.output_dir)
# Move to the output dir to make sure the bash tmp dir are written their
os.chdir(output_dir)
logger.info(f'Analyse {args.release_root_path}')
logs = calculate_all_logs(args.release_root_path, args.output_dir, args.species_directories)
counter = ReleaseCounter(args.private_config_xml_file, args.release_version, logs)
counter.detect_inconsistent_types()
generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy')
generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly')
generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly')
counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv'))
log_files = run_calculation_script_for_species(args.release_root_path, output_dir, args.species_directories)
counter = ReleaseCounter(args.private_config_xml_file,
config_profile=args.config_profile, release_version=args.release_version, logs=log_files)
counter.write_counts_to_db()

# TODO: Other analysis should be performed on the database counts
# counter.detect_inconsistent_types()
# generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy')
# generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly')
# generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly')
# counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv'))


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ def test_find_links():
'A': ['1', '2'],
'B': ['2', '5'],
'C': ['3', '4'],
'D': ['5']
'D': ['5'],
'E': []
}
d2 = {
'1': ['A', 'B'],
Expand All @@ -19,3 +20,4 @@ def test_find_links():
assert find_link({'B'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'}))
assert find_link({'C'}, d1, d2) == (frozenset({'C'}), frozenset({'3', '4'}))
assert find_link({'D'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'}))
assert find_link({'E'}, d1, d2) == (frozenset({'E'}), frozenset({}))
1 change: 1 addition & 0 deletions eva-accession-release-automation/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pytz==2022.6
pyyaml==5.3.1
requests==2.22.0
retry==0.9.2
sqlalchemy==2.0.22
Loading