From 005ae8d42628a6b3b99382aecaa231f3906fa620 Mon Sep 17 00:00:00 2001 From: Chris Wright Date: Mon, 22 Nov 2021 12:39:26 +0000 Subject: [PATCH] Multithread only BAM reading --- CHANGELOG.md | 5 ++ src/bamstats/main.c | 60 +++++++++--------- src/bamstats/readstats.c | 127 +-------------------------------------- src/bamstats/readstats.h | 21 +------ src/version.h | 2 +- 5 files changed, 41 insertions(+), 174 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eaa97c..98d4311 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [v0.4.3] +### Changed +- Only multithread BAM decompression. + ## [v0.4.2] ### Added - Multithreading to `bamstats` for improved throughput. diff --git a/src/bamstats/main.c b/src/bamstats/main.c index bda252e..f66d630 100644 --- a/src/bamstats/main.c +++ b/src/bamstats/main.c @@ -8,6 +8,7 @@ #include #include "htslib/faidx.h" #include "htslib/sam.h" +#include "htslib/thread_pool.h" #include "readstats.h" #include "args.h" @@ -55,28 +56,29 @@ int main(int argc, char *argv[]) { write_header(); - htsFile *fps[args.threads]; - hts_idx_t *idxs[args.threads]; - sam_hdr_t *hdrs[args.threads]; - for (size_t i=0; i 1 ) { + pool = hts_tpool_init(args.threads); + hts_set_opt(fp, HTS_OPT_THREAD_POOL, &pool); + } if (args.region == NULL) { // process all regions - for (int i=0; i < hdrs[0]->n_targets; ++i) { - const char* chr = sam_hdr_tid2name(hdrs[0], i); - size_t ref_length = (size_t)sam_hdr_tid2len(hdrs[0], i); - process_region( - fps, idxs, hdrs, - args, chr, 0, ref_length, NULL); + for (int i=0; i < hdr->n_targets; ++i) { + const char* chr = sam_hdr_tid2name(hdr, i); + size_t ref_length = (size_t)sam_hdr_tid2len(hdr, i); + process_bams( + fp, idx, hdr, + chr, 0, ref_length, true, + args.read_group, args.tag_name, args.tag_value); } } else { // process given region @@ -91,26 +93,28 @@ int main(int argc, char *argv[]) { fprintf(stderr, "ERROR: Failed to parse region: '%s'.\n", args.region); exit(EXIT_FAILURE); } - int tid = sam_hdr_name2tid(hdrs[0], chr); + int tid = sam_hdr_name2tid(hdr, chr); if (tid < 0) { fprintf(stderr, "ERROR: Failed to find reference '%s' in BAM header.\n", chr); exit(EXIT_FAILURE); } - size_t ref_length = (size_t)sam_hdr_tid2len(hdrs[0], tid); + size_t ref_length = (size_t)sam_hdr_tid2len(hdr, tid); end = min(end, ref_length); - process_region( - fps, idxs, hdrs, - args, chr, start, end, NULL); + process_bams( + fp, idx, hdr, + chr, start, end, true, + args.read_group, args.tag_name, args.tag_value); free(chr); } - for (size_t i=0; itsize; i++) { - if (pthread_equal(s, p->t[i].tid)) - return i; - } - return -1; -} - - -typedef struct twarg { - arguments_t args; - const char *chr; - int start; - int end; - bool overlap_start; - hts_tpool *pool; - htsFile **fps; - hts_idx_t **idxs; - sam_hdr_t **hdrs; -} twarg; - - -void *bam_worker(void *arg) { - int *rtn = xalloc(1, sizeof(int), "worker_result"); - twarg j = *(twarg *)arg; - int tid = worker_id(j.pool); - if (tid < 0) { - fprintf(stderr, "Could not retrieve thread ID in worker function\n"); - rtn[0] = 1; return rtn; - } - process_bams( - j.fps[tid], j.idxs[tid], j.hdrs[tid], - j.chr, j.start, j.end, j.overlap_start, - j.args.read_group, j.args.tag_name, j.args.tag_value); - free(arg); - rtn[0] = 0; - return rtn; -} - - -/* Process and print a single region using a threadpool - * - * @param fp htsFile pointer - * @param idx hts_idx_t pointer - * @param hdr sam_hdr_t pointer - * @param args program arguments. - * @param chr reference sequence to process. - * @param start reference coordinate to process (0-based). - * @param end reference coordiate to process (exclusive). - * @param ref reference sequence. - * - */ -#ifdef NOTHREADS -int process_region( - htsFile **fps, hts_idx_t **idxs, sam_hdr_t **hdrs, - arguments_t args, const char *chr, int start, int end, char *ref) { - fprintf(stderr, "Processing: %s:%d-%d\n", chr, start, end); - process_bams( - fps[0], idxs[0], hdrs[0], - chr, start, end, - args.read_group, args.tag_name, args.tag_value); - return 0; -} -#else -void process_region( - htsFile **fps, hts_idx_t **idxs, sam_hdr_t **hdrs, - arguments_t args, const char *chr, int start, int end, char *ref) { - //TODO: Implement this properly - fprintf(stderr, "Processing: %s:%d-%d\n", chr, start, end); - hts_tpool *p = hts_tpool_init(args.threads); - hts_tpool_process *q = hts_tpool_process_init(p, 2 * args.threads, 0); - hts_tpool_result *r; - const int width = 100000; - - int nregs = 1 + (end - start) / width; float done = 0; - bool overlap_start = true; // process reads overlapping start? - for (int rstart = start; rstart < end; rstart += width) { - twarg *tw_args = xalloc(1, sizeof(*tw_args), "thread worker args"); // freed in worker - tw_args->args = args; - tw_args->chr = chr; tw_args->start = rstart; tw_args->end=min(rstart + width, end); - tw_args->overlap_start = overlap_start; - tw_args->fps = fps; tw_args->hdrs = hdrs; tw_args->idxs = idxs; - tw_args->pool = p; - int blk; - do { - blk = hts_tpool_dispatch2(p, q, bam_worker, tw_args, 1); - if ((r = hts_tpool_next_result(q))) { - int* res = (int*)hts_tpool_result_data(r); - if (*res == 0) { - done++; - fprintf(stderr, "\r%.1f %%", 100*done/nregs); - } else { - fprintf(stderr, "\nPool worker failed with: %d\n", *res); - } - free(res); - hts_tpool_delete_result(r, 0); - } - } while (blk == -1); - overlap_start = false; // all subsequent blocks, don't double count reads - } - - // wait for jobs, then collect. - hts_tpool_process_flush(q); - while ((r = hts_tpool_next_result(q))) { - int* res = (int*)hts_tpool_result_data(r); - if (*res == 0) { - done++; - fprintf(stderr, "\r%.1f %%", 100*done/nregs); - } else { - fprintf(stderr, "\nPool worker failed with: %d\n", *res); - } - free(res); - hts_tpool_delete_result(r, 0); - } - fprintf(stderr, "\r100 %% "); - fprintf(stderr, "\n"); - // clean up pool - hts_tpool_process_destroy(q); - hts_tpool_destroy(p); -} -#endif - diff --git a/src/bamstats/readstats.h b/src/bamstats/readstats.h index 9a32ac0..61183e1 100644 --- a/src/bamstats/readstats.h +++ b/src/bamstats/readstats.h @@ -1,5 +1,5 @@ -#ifndef _MODBAMBED_COUNTS_H -#define _MODBAMBED_COUNTS_H +#ifndef _BAMSTATS_STATS_H +#define _BAMSTATS_STATS_H #include #include "htslib/sam.h" @@ -31,21 +31,4 @@ void process_bams( const char *chr, int start, int end, bool overlap_start, const char *read_group, const char tag_name[2], const int tag_value); - -/* Process and print a single region using a threadpool - * - * @param fp htsFile pointer - * @param idx hts_idx_t pointer - * @param hdr sam_hdr_t pointer - * @param args program arguments. - * @param chr reference sequence to process. - * @param start reference coordinate to process (0-based). - * @param end reference coordiate to process (exclusive). - * @param ref reference sequence. - * - */ -void process_region( - htsFile **fp, hts_idx_t **idx, sam_hdr_t **hdr, - arguments_t args, const char *chr, int start, int end, char *ref); - #endif diff --git a/src/version.h b/src/version.h index acb7fe7..92d96d3 100644 --- a/src/version.h +++ b/src/version.h @@ -1,2 +1,2 @@ -const char *argp_program_version = "0.4.2"; +const char *argp_program_version = "0.4.3";