Skip to content

Commit

Permalink
hbase: Load into different tables and full expansion of the gVCFs (#11
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthias Haimel committed Jun 30, 2015
1 parent 051af3e commit d48e5d3
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.file.Paths;
Expand Down Expand Up @@ -510,12 +511,13 @@ private void cram2avro(String input, String output, String codecName) throws IOE


private void variant2hbase(String input, String output) throws Exception {
List<String> args = new ArrayList<String>(Arrays.asList(new String[]{"-i",input,"-t","VariantLoadAll"}));
if(StringUtils.isNotBlank(output)){
args.add("-o");
args.add(output);
URI server = null;
// new URI("//who1:60000/VariantExpanded");
if(StringUtils.isNotBlank(output)){
server = new URI(output);
}
int run = Variant2HbaseMR.run(args.toArray(new String[0]), null);

int run = Variant2HbaseMR.run(input,server);
if(run != 0)
throw new IllegalStateException(String.format("Variant 2 HBase finished with %s !", run));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void main(String[] args) {
try(FeatureReader<VariantContext> freader = AbstractFeatureReader.getFeatureReader(
file.getAbsolutePath(),
new FullVcfCodec(),
true);){
false);){
VCFHeader header = (VCFHeader) freader.getHeader();

int gtSize = header.getGenotypeSamples().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package org.opencb.hpg.bigdata.tools.converters.mr;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -45,6 +46,7 @@ public class Variant2HbaseMR extends Mapper<AvroKey<Variant>, NullWritable, Immu

private final static Logger log = LoggerFactory.getLogger(Variant2HbaseMR.class);
private Configuration config;
private boolean expandRegions = false;

public Variant2HbaseMR() {
super();
Expand All @@ -54,6 +56,14 @@ public static Logger getLog() {
return log;
}

public void setExpandRegions(boolean expandRegions) {
this.expandRegions = expandRegions;
}

public boolean isExpandRegions() {
return expandRegions;
}

@Override
protected void setup(
Mapper<AvroKey<Variant>, NullWritable, ImmutableBytesWritable, Put>.Context context)
Expand All @@ -68,23 +78,33 @@ protected void map(AvroKey<Variant> key,NullWritable value,Mapper<AvroKey<Varian

if(isReference(variant)){ // is a variant (not just coverage info)
String refplaceholder = "?"; // TODO require lookup service to expand
Map<CharSequence, List<CharSequence>> info = variant.getInfo();
List<CharSequence> endLst = info.get("END"); // Get End position

if(null == endLst || endLst.isEmpty()){
context.getCounter("VCF","REF_END_EMPTY").increment(1);
return;
}
String endStr = endLst.get(0).toString();
Long endPos = Long.valueOf(endStr);
Long start = variant.getStart();
for(long pos = start; pos < endPos; ++pos){
// For each position -> store
String idStr = HBaseUtils.buildRefernceStorageId(variant.getReferenceName(),pos,refplaceholder);
store(context,variant.getCalls(),idStr);
Long endPos = start + 1;
List<Call> calls = variant.getCalls();
boolean nocall = calls.isEmpty();
if(isExpandRegions()){
context.getCounter("VCF","REG_EXPAND"+(nocall?"_NOCALL":"")).increment(1);
Map<CharSequence, List<CharSequence>> info = variant.getInfo();
List<CharSequence> endLst = info.get("END"); // Get End position

if(null == endLst || endLst.isEmpty()){
// Region of size 1
context.getCounter("VCF","REF_END_EMPTY"+(nocall?"_NOCALL":"")).increment(1);
} else {
String endStr = endLst.get(0).toString();
endPos = Long.valueOf(endStr);
}
}
String counterName = "REG_EXPAND_CNT"+(nocall?"_NOCALL":"");
context.getCounter("VCF",counterName).increment((endPos - start));
if( ! nocall){ // only if calls
for(long pos = start; pos < endPos; ++pos){
// For each position -> store
String idStr = HBaseUtils.buildRefernceStorageId(variant.getReferenceName(),pos,refplaceholder);
store(context,calls,idStr);
}
}
} else {
// if(true){ // all
int altCnt = variant.getAlternateBases().size();
if(altCnt > 1){
context.getCounter("VCF","biallelic_COUNT").increment(1);
Expand Down Expand Up @@ -164,23 +184,37 @@ public Configuration getConf() {
return this.config;
}

public static int run(String[] args,String other) throws Exception{
public static int run(String inputFile, URI uri) throws Exception{
Configuration conf = new Configuration();
String tablename = "test_table";
String inputfile = null;
String output = null;
for(int i = 0; i < args.length; ++i){
if(args[i] == "-t")
tablename = args[++i];
if(args[i] == "-i")
inputfile = args[++i];
if(args[i] == "-o")
output = args[++i];
String inputfile = inputFile;
String server = null;
Integer port = 60000;
String tablename = null;

if(null == uri)
throw new IllegalArgumentException("No Server output specified!");

server = uri.getHost();
if(StringUtils.isBlank(server))
throw new IllegalArgumentException("No Server host name specified in URI: " + uri);

if(uri.getPort() > 0){ // if port is specified
port = uri.getPort();
}

// Extract table name from Path
if(StringUtils.isBlank(uri.getPath()) || StringUtils.equals(uri.getPath().trim(), "/")){
throw new IllegalArgumentException("No Table name specified in URI: " + uri);
}
tablename = uri.getPath();
tablename = tablename.startsWith("/")?tablename.substring(1):tablename; // Remove leading /

String master = String.join(":", server,port.toString());

getLog().info(String.format("Loading data into server '%s' using table '%s' ", master,tablename));

conf.set("hbase.zookeeper.quorum", "who1");
conf.set("hbase.master", "who1:60000");
conf.set("hbase.zookeeper.quorum", server);
conf.set("hbase.master", master);

Job job = Job.getInstance(conf, "Variant2HBase");
job.setJarByClass(Variant2HbaseMR.class);
Expand Down Expand Up @@ -208,8 +242,8 @@ public static int run(String[] args,String other) throws Exception{

@Override
public int run(String[] args) throws Exception {
getLog().info(String.format("Configuration: %s ", getConf()));
setConf(new Configuration());
// getLog().info(String.format("Configuration: %s ", getConf()));
// setConf(new Configuration());
String tablename = "test_table";
String inputfile = null;
String output = null;
Expand Down

0 comments on commit d48e5d3

Please sign in to comment.