Skip to content

Commit

Permalink
add rgfa interface
Browse files Browse the repository at this point in the history
  • Loading branch information
glennhickey committed Sep 29, 2023
1 parent 5510755 commit 61dbed5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/cactus/cactus_progressive_config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@
<!-- odgiDrawOptions: options to odgi draw, used for 2d viz -->
<!-- rindexOptions: options to vg gbwt -r (r-index construction) as used to make haplotype sampling index -->
<!-- haplOptions: options to vg haplotypes as used to make haplotype sampling index -->
<!-- minRGFAFragment: smallest rGFA (rank > 0) cover path to write -->
<graphmap_join
gfaffix="1"
clipNonMinigraph="1"
Expand All @@ -392,6 +393,7 @@
odgiDrawOptions="-H 1000"
rindexOptions="-p"
haplOptions="-v 2"
minRGFAFragment="50"
/>
<!-- hal2vg options -->
<!-- includeMinigraph: include minigraph node sequences as paths in output (note that cactus-graphmap-join will still remove them by default) -->
Expand Down
86 changes: 76 additions & 10 deletions src/cactus/refmap/cactus_graphmap_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def graphmap_join_options(parser):
parser.add_argument("--filter", type=int, default=2, help = "Generate a frequency filtered graph (from the clipped graph) by removing any sequence present in fewer than this many sequences. Set to 0 to disable. [default=2]")

parser.add_argument("--gfa", nargs='*', default=None, help = "Produce a GFA for given graph type(s) if specified. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default]")

parser.add_argument("--rgfa", nargs='*', default=None, help = "Produce a rGFA cover for given graph type(s) if specified. The rGFA cover will be output as a separate .rgfa.gz file, as well as included in the .gbz is reference path segements. This cover will be based on the (first) reference sample only. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default]")

parser.add_argument("--gbz", nargs='*', default=None, help = "Generate GBZ/snarls indexes for the given graph type(s) if specified. Valid types are 'full', 'clip' and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. --giraffe will also produce these (and other) indexes")

Expand Down Expand Up @@ -174,6 +176,17 @@ def graphmap_join_validate_options(options):
if gfa == 'filter' and not options.filter:
raise RuntimeError('--gfa cannot be set to filter since filtering is disabled')

if not options.rgfa:
options.rgfa = ['clip'] if options.clip else ['full']
options.rgfa = list(set(options.rgfa))
for rgfa in options.rgfa:
if rgfa not in ['clip', 'filter', 'full']:
raise RuntimeError('Unrecognized value for --gfa: {}. Must be one of {clip, filter, full}'.format(gfa))
if rgfa == 'clip' and not options.clip:
raise RuntimError('--rgfa cannot be set to clip since clipping is disabled')
if rgfa == 'filter' and not options.filter:
raise RuntimeError('--rgfa cannot be set to filter since filtering is disabled')

if options.gbz == []:
options.gbz = ['clip'] if options.clip else ['full']
options.gbz = list(set(options.gbz)) if options.gbz else []
Expand Down Expand Up @@ -285,10 +298,10 @@ def graphmap_join_validate_options(options):
raise RuntimeError('Specifying --haplo {} requires also specifying --giraffe {}'.format(haplo, haplo))

# Prevent some useless compute due to default param combos
if options.clip and 'clip' not in options.gfa + options.gbz + options.odgi + options.chrom_vg + options.chrom_og + options.vcf + options.giraffe + options.viz + options.draw\
and 'filter' not in options.gfa + options.gbz + options.odgi + options.chrom_vg + options.chrom_og + options.vcf + options.giraffe + options.viz + options.draw:
if options.clip and 'clip' not in options.gfa + options.rgfa + options.gbz + options.odgi + options.chrom_vg + options.chrom_og + options.vcf + options.giraffe + options.viz + options.draw\
and 'filter' not in options.gfa + options.rgfa + options.gbz + options.odgi + options.chrom_vg + options.chrom_og + options.vcf + options.giraffe + options.viz + options.draw:
options.clip = None
if options.filter and 'filter' not in options.gfa + options.gbz + options.odgi + options.chrom_vg + options.chrom_og + options.vcf + options.giraffe + options.viz + options.draw:
if options.filter and 'filter' not in options.gfa + options.rgfa + options.gbz + options.odgi + options.chrom_vg + options.chrom_og + options.vcf + options.giraffe + options.viz + options.draw:
options.filter = None

return options
Expand Down Expand Up @@ -459,12 +472,13 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
gfa_root_job = Job()
phase_root_job.addFollowOn(gfa_root_job)
gfa_ids = []
rgfa_ids = []
current_out_dict = None
do_gbz = workflow_phase in options.gbz + options.vcf + options.giraffe
if do_gbz or workflow_phase in options.gfa:
assert len(options.vg) == len(phase_vg_ids) == len(vg_ids)
for vg_path, vg_id, input_vg_id in zip(options.vg, phase_vg_ids, vg_ids):
gfa_job = gfa_root_job.addChildJobFn(vg_to_gfa, options, config, vg_path, vg_id,
gfa_job = gfa_root_job.addChildJobFn(vg_to_gfa, options, config, vg_path, vg_id, rgfa=False,
disk=input_vg_id.size * 10,
memory=max(2**31, input_vg_id.size * 16))
gfa_ids.append(gfa_job.rv())
Expand All @@ -477,8 +491,22 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
memory=index_mem)
out_dicts.append(gfa_merge_job.rv())
prev_job = gfa_merge_job
current_out_dict = gfa_merge_job.rv()
current_out_dict = gfa_merge_job.rv()

# optional rgfa
if workflow_phase in options.rgfa:
assert len(options.vg) == len(phase_vg_ids) == len(vg_ids)
for vg_path, vg_id, input_vg_id in zip(options.vg, phase_vg_ids, vg_ids):
rgfa_job = gfa_root_job.addChildJobFn(vg_to_gfa, options, config, vg_path, vg_id, rgfa=True,
disk=input_vg_id.size * 5)
rgfa_ids.append(rgfa_job.rv())

rgfa_merge_job = gfa_root_job.addFollowOnJobFn(merge_rgfa, options, config, rgfa_ids,
tag=workflow_phase + '.',
cores=max(1, int(options.indexCores / 4)),
disk=sum(f.size for f in vg_ids) * 3)
out_dicts.append(rgfa_merge_job.rv())

# optional vcf
if workflow_phase in options.vcf:
for vcf_ref in options.vcfReference:
Expand Down Expand Up @@ -599,6 +627,13 @@ def clip_vg(job, options, config, vg_path, vg_id, phase):
# todo: do we want to add the minigraph prefix to keep stubs from minigraph? but I don't think it makes stubs....
cmd.append(stub_cmd)

if phase in options.rgfa:
# do the rgfa cover
min_rgfa_fragment = getOptionalAttrib(findRequiredNode(config.xmlRoot, "graphmap_join"), "minRGFAFragment", typeFn=int, default=1)
rgfa_cover_cmd = ['vg', 'paths', '-v', '-', '-R', str(min_rgfa_fragment),
'-Q', options.reference[0], '-t', str(job.cores)]
cmd.append(rgfa_cover_cmd)

# enforce chopping
if phase == 'full' and options.chop:
cmd.append(['vg', 'mod', '-X', str(options.chop), '-'])
Expand Down Expand Up @@ -721,14 +756,16 @@ def drop_graph_event(job, config, vg_path, full_vg_id):
cactus_call(parameters=['vg', 'paths', '-d', '-S', graph_event, '-x', full_vg_path], outfile=out_path)
return job.fileStore.writeGlobalFile(out_path)

def vg_to_gfa(job, options, config, vg_path, vg_id):
def vg_to_gfa(job, options, config, vg_path, vg_id, rgfa=False):
""" run gfa conversion """
work_dir = job.fileStore.getLocalTempDir()
vg_path = os.path.join(work_dir, os.path.basename(vg_path))
job.fileStore.readGlobalFile(vg_id, vg_path)
out_path = vg_path + '.gfa'

cmd = ['vg', 'convert', '-f', '-Q', options.reference[0], os.path.basename(vg_path), '-B']
cmd = ['vg', 'convert', '-f', os.path.basename(vg_path)]
if rgfa:
cmd += ['-Q', options.reference[0], os.path.basename(vg_path), '-B']

cactus_call(parameters=cmd, outfile=out_path, work_dir=work_dir, job_memory=job.memory)

Expand All @@ -749,6 +786,29 @@ def vg_to_og(job, options, config, vg_path, vg_id):
os.path.basename(og_path), '-t', str(job.cores)], work_dir=work_dir, job_memory=job.memory)
return job.fileStore.writeGlobalFile(og_path)

def merge_rgfa(job, options, config, rgfa_ids, tag=''):
""" merge the rgfas, pretty much same as gfas below but doesn't
need to be folded into the indexing job as it's not used for anything downstream """
work_dir = job.fileStore.getLocalTempDir()

merge_rgfa_path = os.path.join(work_dir, '{}merged.rgfa'.format(tag))
with open(merge_rgfa_path, 'w') as merge_rgfa_file:
merge_rgfa_file.write('H\tVN:Z:1.0\n')

# merge the gfas
assert len(options.vg) == len(rgfa_ids)
for i, (vg_path, rgfa_id) in enumerate(zip(options.vg, rgfa_ids)):
rgfa_path = os.path.join(work_dir, os.path.basename(vg_path) + '.rgfa')
job.fileStore.readGlobalFile(rgfa_id, rgfa_path, mutable=True)
# get rid of paths and headers
cmd = [['grep', '-v', '^W', rgfa_path], ['grep', '-v', '^P'], ['grep', '-v', '^H']]
cactus_call(parameters=cmd, outfile=merge_rgfa_path, outappend=True)
job.fileStore.deleteGlobalFile(rgfa_id)

cactus_call(parameters=['bgzip', merge_rgfa_path, '--threads', str(job.cores)])

return { '{}rgfa.gz'.format(tag) : job.fileStore.writeGlobalFile(merge_rgfa_path + '.gz') }

def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False):
""" merge of the gfas, then make gbz / snarls / trans
"""
Expand Down Expand Up @@ -786,9 +846,15 @@ def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False):
gbz_path = os.path.join(work_dir, '{}merged.gbz'.format(tag))
cactus_call(parameters=['vg', 'gbwt', '-G', merge_gfa_path, '--gbz-format', '-g', gbz_path], job_memory=job.memory)

# zip the gfa
cactus_call(parameters=['bgzip', merge_gfa_path, '--threads', str(job.cores)])
gfa_path = merge_gfa_path + '.gz'
# zip the gfa and remove any rgfa paths (they will be kept in the GBZ and .rgfa
# but not here as it would be just too confusing)
rgfa_sample_name = '_rGFA_'
gfa_path = merge_gfa_path + '.gz'
cactus_call(parameters=[['grep', '-v', '^W\t{}'.format(rgfa_sample_name), merge_gfa_path],
['bgzip', '--threads', str(job.cores)]],
outfile=gfa_path)
os.remove(merge_gfa_path)


# make the snarls
if do_gbz:
Expand Down

0 comments on commit 61dbed5

Please sign in to comment.