Skip to content

Commit

Permalink
tools: add some tests for accessing datasets, streaming...
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Jan 13, 2017
1 parent d9f47f0 commit ecba314
Show file tree
Hide file tree
Showing 4 changed files with 549 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package org.opencb.hpg.bigdata.tools.variant.analysis;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.mllib.linalg.*;
import org.apache.spark.mllib.linalg.distributed.DistributedMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.mllib.stat.Statistics;
import org.apache.spark.mllib.stat.test.ChiSqTestResult;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.opencb.biodata.models.core.pedigree.Individual;
import org.opencb.biodata.models.core.pedigree.Pedigree;
import org.opencb.hpg.bigdata.tools.variant.analysis.statistics.MultipleTestCorrection;
import scala.collection.mutable.WrappedArray;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
* Created by jtarraga on 11/01/17.
*/
public class ChiSquareAnalysis {

public static void run(Dataset<Row> data, Pedigree pedigree) {
data.show();

System.out.println(pedigree);
List<Individual.Phenotype> phenotype = new ArrayList<>(pedigree.getIndividuals().size());
for (Individual indivual: pedigree.getIndividuals().values()) {
phenotype.add(indivual.getPhenotype());
}

System.out.println(phenotype);

//result.show();
Encoder<ChiSquareAnalysisResult> resultEncoder = Encoders.bean(ChiSquareAnalysisResult.class);
Dataset<ChiSquareAnalysisResult> resultDS = data.map(new MapFunction<Row, ChiSquareAnalysisResult>() {
@Override
public ChiSquareAnalysisResult call(Row r) throws Exception {

double[] counters = new double[]{0, 0, 0, 0};

System.out.print("Variant at " + r.get(1) + ":" + r.get(2) + " -> ");
for (int i = 0; i < phenotype.size(); i++) {
final int length = 2;
String pheno = ((WrappedArray) r.getList(5).get(i)).head().toString();
System.out.print(pheno + "\t");

String[] fields = pheno.split("[|/]");
if (fields.length == length) {
if (phenotype.get(i) != Individual.Phenotype.MISSING) {
for (int j = 0; j < length; j++) {
switch (fields[j]) {
case "0":
counters[phenotype.get(i) == Individual.Phenotype.UNAFFECTED ? 0 : 2]++;
break;
case "1":
counters[phenotype.get(i) == Individual.Phenotype.UNAFFECTED ? 1 : 3]++;
break;
}
}
}
}
}
System.out.print("\tmatrix: ");
for (int i = 0; i < counters.length; i++) {
System.out.print(counters[i] + "\t");
}
System.out.println("");

Matrix mat = Matrices.dense(2, 2, counters);
ChiSqTestResult independenceTestResult = Statistics.chiSqTest(mat);

ChiSquareAnalysisResult res = new ChiSquareAnalysisResult();
res.setStatistic(independenceTestResult.statistic());
res.setpValue(independenceTestResult.pValue());
res.setDegreesOfFreedom(independenceTestResult.degreesOfFreedom());
res.setMethod(independenceTestResult.method());
res.setAdjPValue(0.0);
return res;
}
}, resultEncoder);
resultDS.show(false);


double[] pValues = new double[(int) resultDS.count()];

int i = 0;
Iterator<Row> it = resultDS.toDF().javaRDD().toLocalIterator();
while (it.hasNext()) {
Row row = it.next();
pValues[i++] = row.getDouble(row.fieldIndex("pValue"));
}

System.out.print("pValues = ");
for (i = 0; i < pValues.length; i++) {
System.out.print(pValues[i] + "\t");
}
System.out.println("");

MultipleTestCorrection multipleTestCorrection = new MultipleTestCorrection();
double[] adjPValues = multipleTestCorrection.fdrCorrection(pValues);

System.out.print("adj. pValues = ");
for (i = 0; i < adjPValues.length; i++) {
System.out.print(adjPValues[i] + "\t");
}
System.out.println("");

//resultDS.map

//resultDS.coalesce(1).write().format("json").save("/tmp/test.vcf.chi.square");

// it = resultDS.toDF().javaRDD().toLocalIterator();
// while (it.hasNext()) {
// Row row = it.next();
// row. getDouble(row.fieldIndex("pValue"));
// }

// resultDS.show(false);

// System.out.println("column pValue");
// System.out.println(resultDS.col("pValue"));

//val mkString = udf((a: Seq[Double]) => a.mkString(", "))
//df.withColumn("coordinates_string", mkString($"coordinates"))

// resultDS.withColumn("adj. pValue", new Column(adj));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opencb.hpg.bigdata.tools.variant.analysis;

import scala.Serializable;

/**
* Created by jtarraga on 11/01/17.
*/
public class ChiSquareAnalysisResult implements Serializable {
private double statistic;
private double pValue;
private double adjPValue;
private int degreesOfFreedom;
private String method;

public double getAdjPValue() {
return adjPValue;
}

public void setAdjPValue(double adjPValue) {
this.adjPValue = adjPValue;
}

public int getDegreesOfFreedom() {
return degreesOfFreedom;
}

public void setDegreesOfFreedom(int degreesOfFreedom) {
this.degreesOfFreedom = degreesOfFreedom;
}

public String getMethod() {
return method;
}

public void setMethod(String method) {
this.method = method;
}

public double getpValue() {
return pValue;
}

public void setpValue(double pValue) {
this.pValue = pValue;
}

public double getStatistic() {
return statistic;
}

public void setStatistic(double statistic) {
this.statistic = statistic;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.opencb.hpg.bigdata.tools.variant.analysis;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
import org.junit.Test;
import org.opencb.hpg.bigdata.core.lib.SparkConfCreator;
import org.opencb.hpg.bigdata.core.lib.VariantDataset;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;

/**
* Created by jtarraga on 13/01/17.
*/
public class JavaCustomReceiver extends Receiver<Dataset<Row>> {
private static final Pattern SPACE = Pattern.compile(" ");

// ============= Receiver code that receives data over a socket ==============

SQLContext sqlContext;

public JavaCustomReceiver(SQLContext sqlContext) {
super(StorageLevel.MEMORY_AND_DISK_2());
this.sqlContext = sqlContext;
}

@Override
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
receive();
}
}.start();
}

@Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}

/**
* Create a socket connection and receive data until receiver is stopped
* */
private void receive() {
for (int i = 0; i < 5; i++) {
Dataset<Row> rows = sqlContext.sql("SELECT chromosome, start");
System.out.println("=================> Stream, dataset " + i + " with " + rows.count() + " rows");
store(rows);
}
System.exit(0);
}}
Loading

0 comments on commit ecba314

Please sign in to comment.