Skip to content

Commit

Permalink
Multithread only BAM reading
Browse files Browse the repository at this point in the history
  • Loading branch information
cjw85 committed Nov 22, 2021
1 parent a487281 commit 005ae8d
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 174 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [v0.4.3]
### Changed
- Only multithread BAM decompression.

## [v0.4.2]
### Added
- Multithreading to `bamstats` for improved throughput.
Expand Down
60 changes: 32 additions & 28 deletions src/bamstats/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <time.h>
#include "htslib/faidx.h"
#include "htslib/sam.h"
#include "htslib/thread_pool.h"

#include "readstats.h"
#include "args.h"
Expand Down Expand Up @@ -55,28 +56,29 @@ int main(int argc, char *argv[]) {

write_header();

htsFile *fps[args.threads];
hts_idx_t *idxs[args.threads];
sam_hdr_t *hdrs[args.threads];
for (size_t i=0; i<args.threads; ++i) {
fps[i] = hts_open(args.bam[0], "rb");
idxs[i] = sam_index_load(fps[i], args.bam[0]);
hdrs[i] = sam_hdr_read(fps[i]);
if (hdrs[i] == 0 || idxs[i] == 0 || fps[i] == 0) {
fprintf(stderr, "Failed to read .bam file '%s'.\n", args.bam[0]);
exit(EXIT_FAILURE);
}
htsFile *fp = hts_open(args.bam[0], "rb");
hts_idx_t *idx = sam_index_load(fp, args.bam[0]);
sam_hdr_t *hdr = sam_hdr_read(fp);
if (hdr == 0 || idx == 0 || fp == 0) {
fprintf(stderr, "Failed to read .bam file '%s'.\n", args.bam[0]);
exit(EXIT_FAILURE);
}

hts_tpool *pool = NULL;
if (args.threads > 1 ) {
pool = hts_tpool_init(args.threads);
hts_set_opt(fp, HTS_OPT_THREAD_POOL, &pool);
}

if (args.region == NULL) {
// process all regions
for (int i=0; i < hdrs[0]->n_targets; ++i) {
const char* chr = sam_hdr_tid2name(hdrs[0], i);
size_t ref_length = (size_t)sam_hdr_tid2len(hdrs[0], i);
process_region(
fps, idxs, hdrs,
args, chr, 0, ref_length, NULL);
for (int i=0; i < hdr->n_targets; ++i) {
const char* chr = sam_hdr_tid2name(hdr, i);
size_t ref_length = (size_t)sam_hdr_tid2len(hdr, i);
process_bams(
fp, idx, hdr,
chr, 0, ref_length, true,
args.read_group, args.tag_name, args.tag_value);
}
} else {
// process given region
Expand All @@ -91,26 +93,28 @@ int main(int argc, char *argv[]) {
fprintf(stderr, "ERROR: Failed to parse region: '%s'.\n", args.region);
exit(EXIT_FAILURE);
}
int tid = sam_hdr_name2tid(hdrs[0], chr);
int tid = sam_hdr_name2tid(hdr, chr);
if (tid < 0) {
fprintf(stderr, "ERROR: Failed to find reference '%s' in BAM header.\n", chr);
exit(EXIT_FAILURE);
}
size_t ref_length = (size_t)sam_hdr_tid2len(hdrs[0], tid);
size_t ref_length = (size_t)sam_hdr_tid2len(hdr, tid);
end = min(end, ref_length);
process_region(
fps, idxs, hdrs,
args, chr, start, end, NULL);
process_bams(
fp, idx, hdr,
chr, start, end, true,
args.read_group, args.tag_name, args.tag_value);
free(chr);
}

for (size_t i=0; i<args.threads; ++i) {
hts_close(fps[i]);
hts_idx_destroy(idxs[i]);
sam_hdr_destroy(hdrs[i]);
}
if (pool)
hts_tpool_destroy(pool);

hts_close(fp);
hts_idx_destroy(idx);
sam_hdr_destroy(hdr);

clock_t end = clock();
fprintf(stderr, "Total time: %fs\n", (double)(end - begin) / CLOCKS_PER_SEC);
fprintf(stderr, "Total CPU time: %fs\n", (double)(end - begin) / CLOCKS_PER_SEC);
exit(EXIT_SUCCESS);
}
127 changes: 1 addition & 126 deletions src/bamstats/readstats.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "htslib/sam.h"
#include "htslib/faidx.h"
#include "thread_pool_internal.h"
#include "htslib/thread_pool.h"

#include "../common.h"
#include "bamiter.h"
Expand Down Expand Up @@ -100,6 +99,7 @@ void process_bams(
htsFile *fp, hts_idx_t *idx, sam_hdr_t *hdr,
const char *chr, int start, int end, bool overlap_start,
const char *read_group, const char tag_name[2], const int tag_value) {
fprintf(stderr, "Processing: %s:%d-%d\n", chr, start, end);

// setup bam reading - reuse our pileup structure, but actually just need iterator
mplp_data* bam = create_bam_iter_data(
Expand Down Expand Up @@ -171,128 +171,3 @@ void process_bams(

return;
}

static int worker_id(hts_tpool *p) {
int i;
pthread_t s = pthread_self();
for (i = 0; i < p->tsize; i++) {
if (pthread_equal(s, p->t[i].tid))
return i;
}
return -1;
}


typedef struct twarg {
arguments_t args;
const char *chr;
int start;
int end;
bool overlap_start;
hts_tpool *pool;
htsFile **fps;
hts_idx_t **idxs;
sam_hdr_t **hdrs;
} twarg;


void *bam_worker(void *arg) {
int *rtn = xalloc(1, sizeof(int), "worker_result");
twarg j = *(twarg *)arg;
int tid = worker_id(j.pool);
if (tid < 0) {
fprintf(stderr, "Could not retrieve thread ID in worker function\n");
rtn[0] = 1; return rtn;
}
process_bams(
j.fps[tid], j.idxs[tid], j.hdrs[tid],
j.chr, j.start, j.end, j.overlap_start,
j.args.read_group, j.args.tag_name, j.args.tag_value);
free(arg);
rtn[0] = 0;
return rtn;
}


/* Process and print a single region using a threadpool
*
* @param fp htsFile pointer
* @param idx hts_idx_t pointer
* @param hdr sam_hdr_t pointer
* @param args program arguments.
* @param chr reference sequence to process.
* @param start reference coordinate to process (0-based).
* @param end reference coordiate to process (exclusive).
* @param ref reference sequence.
*
*/
#ifdef NOTHREADS
int process_region(
htsFile **fps, hts_idx_t **idxs, sam_hdr_t **hdrs,
arguments_t args, const char *chr, int start, int end, char *ref) {
fprintf(stderr, "Processing: %s:%d-%d\n", chr, start, end);
process_bams(
fps[0], idxs[0], hdrs[0],
chr, start, end,
args.read_group, args.tag_name, args.tag_value);
return 0;
}
#else
void process_region(
htsFile **fps, hts_idx_t **idxs, sam_hdr_t **hdrs,
arguments_t args, const char *chr, int start, int end, char *ref) {
//TODO: Implement this properly
fprintf(stderr, "Processing: %s:%d-%d\n", chr, start, end);
hts_tpool *p = hts_tpool_init(args.threads);
hts_tpool_process *q = hts_tpool_process_init(p, 2 * args.threads, 0);
hts_tpool_result *r;
const int width = 100000;

int nregs = 1 + (end - start) / width; float done = 0;
bool overlap_start = true; // process reads overlapping start?
for (int rstart = start; rstart < end; rstart += width) {
twarg *tw_args = xalloc(1, sizeof(*tw_args), "thread worker args"); // freed in worker
tw_args->args = args;
tw_args->chr = chr; tw_args->start = rstart; tw_args->end=min(rstart + width, end);
tw_args->overlap_start = overlap_start;
tw_args->fps = fps; tw_args->hdrs = hdrs; tw_args->idxs = idxs;
tw_args->pool = p;
int blk;
do {
blk = hts_tpool_dispatch2(p, q, bam_worker, tw_args, 1);
if ((r = hts_tpool_next_result(q))) {
int* res = (int*)hts_tpool_result_data(r);
if (*res == 0) {
done++;
fprintf(stderr, "\r%.1f %%", 100*done/nregs);
} else {
fprintf(stderr, "\nPool worker failed with: %d\n", *res);
}
free(res);
hts_tpool_delete_result(r, 0);
}
} while (blk == -1);
overlap_start = false; // all subsequent blocks, don't double count reads
}

// wait for jobs, then collect.
hts_tpool_process_flush(q);
while ((r = hts_tpool_next_result(q))) {
int* res = (int*)hts_tpool_result_data(r);
if (*res == 0) {
done++;
fprintf(stderr, "\r%.1f %%", 100*done/nregs);
} else {
fprintf(stderr, "\nPool worker failed with: %d\n", *res);
}
free(res);
hts_tpool_delete_result(r, 0);
}
fprintf(stderr, "\r100 %% ");
fprintf(stderr, "\n");
// clean up pool
hts_tpool_process_destroy(q);
hts_tpool_destroy(p);
}
#endif

21 changes: 2 additions & 19 deletions src/bamstats/readstats.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef _MODBAMBED_COUNTS_H
#define _MODBAMBED_COUNTS_H
#ifndef _BAMSTATS_STATS_H
#define _BAMSTATS_STATS_H

#include <stdbool.h>
#include "htslib/sam.h"
Expand Down Expand Up @@ -31,21 +31,4 @@ void process_bams(
const char *chr, int start, int end, bool overlap_start,
const char *read_group, const char tag_name[2], const int tag_value);


/* Process and print a single region using a threadpool
*
* @param fp htsFile pointer
* @param idx hts_idx_t pointer
* @param hdr sam_hdr_t pointer
* @param args program arguments.
* @param chr reference sequence to process.
* @param start reference coordinate to process (0-based).
* @param end reference coordiate to process (exclusive).
* @param ref reference sequence.
*
*/
void process_region(
htsFile **fp, hts_idx_t **idx, sam_hdr_t **hdr,
arguments_t args, const char *chr, int start, int end, char *ref);

#endif
2 changes: 1 addition & 1 deletion src/version.h
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

const char *argp_program_version = "0.4.2";
const char *argp_program_version = "0.4.3";

0 comments on commit 005ae8d

Please sign in to comment.