From 0bc165e4d2228decb61dad90ff35748ee405b7f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joaqu=C3=ADn=20T=C3=A1rraga=20Gim=C3=A9nez?= Date: Mon, 23 Jan 2017 09:39:17 +0100 Subject: [PATCH] tools: RvTests tests, #125 --- .../app/cli/local/LocalCliOptionsParser.java | 27 ++ .../app/cli/local/VariantCommandExecutor.java | 22 +- .../app/cli/local/VariantRvTestsCLITest.java | 43 +++ .../hpg/bigdata/core/lib/ParentDataset.java | 1 + .../variant/analysis/RvTestsAnalysis.java | 148 +++++++++ .../tools/variant/analysis/ChiSquareTest.java | 1 - .../tools/variant/analysis/MLTest.java | 314 +++++++++++++++--- 7 files changed, 515 insertions(+), 41 deletions(-) create mode 100644 hpg-bigdata-app/src/test/java/org/opencb/hpg/bigdata/app/cli/local/VariantRvTestsCLITest.java create mode 100644 hpg-bigdata-tools/src/main/java/org/opencb/hpg/bigdata/tools/variant/analysis/RvTestsAnalysis.java diff --git a/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/LocalCliOptionsParser.java b/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/LocalCliOptionsParser.java index df5e4676..69f90397 100644 --- a/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/LocalCliOptionsParser.java +++ b/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/LocalCliOptionsParser.java @@ -82,6 +82,7 @@ public LocalCliOptionsParser() { variantSubCommands.addCommand("view", variantCommandOptions.viewVariantCommandOptions); variantSubCommands.addCommand("query", variantCommandOptions.queryVariantCommandOptions); variantSubCommands.addCommand("metadata", variantCommandOptions.metadataVariantCommandOptions); + variantSubCommands.addCommand("rvtests", variantCommandOptions.rvtestsVariantCommandOptions); } public void parse(String[] args) throws ParameterException { @@ -439,6 +440,7 @@ public class VariantCommandOptions extends CommandOptions { ViewVariantCommandOptions viewVariantCommandOptions; QueryVariantCommandOptions queryVariantCommandOptions; MetadataVariantCommandOptions metadataVariantCommandOptions; + RvTestsVariantCommandOptions rvtestsVariantCommandOptions; public VariantCommandOptions() { this.convertVariantCommandOptions = new ConvertVariantCommandOptions(); @@ -446,6 +448,7 @@ public VariantCommandOptions() { this.viewVariantCommandOptions = new ViewVariantCommandOptions(); this.queryVariantCommandOptions = new QueryVariantCommandOptions(); this.metadataVariantCommandOptions = new MetadataVariantCommandOptions(); + this.rvtestsVariantCommandOptions = new RvTestsVariantCommandOptions(); } } @@ -740,6 +743,30 @@ class MetadataVariantCommandOptions { public boolean summary = false; } + @Parameters(commandNames = {"rvtests"}, commandDescription = "Execute the 'rvtests' program.") + class RvTestsVariantCommandOptions { + + @ParametersDelegate + public CommonCommandOptions commonOptions = commonCommandOptions; + + @Parameter(names = {"-i", "--input"}, description = "Input file name (in Avro/Parquet file format).", + required = true, arity = 1) + public String inFilename; + + @Parameter(names = {"-o", "--output"}, description = "Output directory name to save the rvtests results.", + required = true, arity = 1) + public String outDirname; + + @Parameter(names = {"--dataset"}, description = "Target dataset.", + arity = 1) + public String datasetId = null; + + @Parameter(names = {"-c", "--config"}, description = "Configuration file name containing the rvtests parameters.", + required = true, arity = 1) + public String confFilename; + } + + private void printMainUsage() { // TODO This is a nasty hack. By some unknown reason JCommander only prints the description from first command Map commandDescription = new HashMap<>(); diff --git a/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/VariantCommandExecutor.java b/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/VariantCommandExecutor.java index d07e5f02..d5f0411a 100644 --- a/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/VariantCommandExecutor.java +++ b/hpg-bigdata-app/src/main/java/org/opencb/hpg/bigdata/app/cli/local/VariantCommandExecutor.java @@ -44,6 +44,7 @@ import org.opencb.hpg.bigdata.core.lib.SparkConfCreator; import org.opencb.hpg.bigdata.core.lib.VariantDataset; import org.opencb.hpg.bigdata.core.parquet.VariantParquetConverter; +import org.opencb.hpg.bigdata.tools.variant.analysis.RvTestsAnalysis; import java.io.*; import java.nio.file.Files; @@ -96,11 +97,17 @@ public void execute() throws Exception { query(); break; case "metadata": - init(variantCommandOptions.queryVariantCommandOptions.commonOptions.logLevel, - variantCommandOptions.queryVariantCommandOptions.commonOptions.verbose, - variantCommandOptions.queryVariantCommandOptions.commonOptions.conf); + init(variantCommandOptions.metadataVariantCommandOptions.commonOptions.logLevel, + variantCommandOptions.metadataVariantCommandOptions.commonOptions.verbose, + variantCommandOptions.metadataVariantCommandOptions.commonOptions.conf); metadata(); break; + case "rvtests": + init(variantCommandOptions.rvtestsVariantCommandOptions.commonOptions.logLevel, + variantCommandOptions.rvtestsVariantCommandOptions.commonOptions.verbose, + variantCommandOptions.rvtestsVariantCommandOptions.commonOptions.conf); + rvtests(); + break; default: break; } @@ -842,4 +849,13 @@ public void metadata() throws Exception { System.out.println("Error: metafile does not exist, " + metaFile.getAbsolutePath()); } } + + + public void rvtests() throws Exception { + RvTestsAnalysis rvtests = new RvTestsAnalysis(variantCommandOptions.rvtestsVariantCommandOptions.inFilename, + variantCommandOptions.rvtestsVariantCommandOptions.outDirname, + variantCommandOptions.rvtestsVariantCommandOptions.confFilename); + + rvtests.run(variantCommandOptions.rvtestsVariantCommandOptions.datasetId); + } } diff --git a/hpg-bigdata-app/src/test/java/org/opencb/hpg/bigdata/app/cli/local/VariantRvTestsCLITest.java b/hpg-bigdata-app/src/test/java/org/opencb/hpg/bigdata/app/cli/local/VariantRvTestsCLITest.java new file mode 100644 index 00000000..0aa58b9c --- /dev/null +++ b/hpg-bigdata-app/src/test/java/org/opencb/hpg/bigdata/app/cli/local/VariantRvTestsCLITest.java @@ -0,0 +1,43 @@ +package org.opencb.hpg.bigdata.app.cli.local; + +import org.junit.Test; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Created by joaquin on 1/19/17. + */ +public class VariantRvTestsCLITest { + Path inPath; + Path outPath; + Path confPath; + + private void init() throws URISyntaxException { + inPath = Paths.get("/home/jtarraga/data/vcf/skat/example.vcf.avro"); + outPath = Paths.get("/home/jtarraga/data/vcf/skat/out"); + confPath = Paths.get("/home/jtarraga/data/vcf/skat/skat.params"); + } + + @Test + public void skat() { + + try { + init(); + + StringBuilder commandLine = new StringBuilder(); + commandLine.append(" variant rvtests"); + commandLine.append(" --log-level ERROR"); + commandLine.append(" -i ").append(inPath); + commandLine.append(" -o ").append(outPath); + commandLine.append(" -c ").append(confPath); + commandLine.append(" --dataset noname"); + + VariantQueryCLITest.execute(commandLine.toString()); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/hpg-bigdata-core/src/main/java/org/opencb/hpg/bigdata/core/lib/ParentDataset.java b/hpg-bigdata-core/src/main/java/org/opencb/hpg/bigdata/core/lib/ParentDataset.java index 0cdcd324..bf607e1f 100644 --- a/hpg-bigdata-core/src/main/java/org/opencb/hpg/bigdata/core/lib/ParentDataset.java +++ b/hpg-bigdata-core/src/main/java/org/opencb/hpg/bigdata/core/lib/ParentDataset.java @@ -23,6 +23,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import org.opencb.biodata.models.core.Region; +import org.opencb.biodata.models.variant.avro.VariantAvro; import org.opencb.commons.datastore.core.Query; import org.opencb.commons.datastore.core.QueryOptions; import scala.Symbol; diff --git a/hpg-bigdata-tools/src/main/java/org/opencb/hpg/bigdata/tools/variant/analysis/RvTestsAnalysis.java b/hpg-bigdata-tools/src/main/java/org/opencb/hpg/bigdata/tools/variant/analysis/RvTestsAnalysis.java new file mode 100644 index 00000000..1e4878b1 --- /dev/null +++ b/hpg-bigdata-tools/src/main/java/org/opencb/hpg/bigdata/tools/variant/analysis/RvTestsAnalysis.java @@ -0,0 +1,148 @@ +package org.opencb.hpg.bigdata.tools.variant.analysis; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.opencb.biodata.formats.pedigree.PedigreeManager; +import org.opencb.biodata.models.core.Region; +import org.opencb.biodata.models.variant.VariantMetadataManager; +import org.opencb.biodata.models.variant.avro.VariantAvro; +import org.opencb.commons.utils.FileUtils; +import org.opencb.hpg.bigdata.core.lib.SparkConfCreator; +import org.opencb.hpg.bigdata.core.lib.VariantDataset; + +import java.io.*; +import java.nio.file.Paths; +import java.util.List; +import java.util.Properties; + +/** + * Created by joaquin on 1/19/17. + */ +public class RvTestsAnalysis { + private String inFilename; + private String outDirname; + private String confFilename; + + private final String RVTEST_BIN = "/home/jtarraga/softs/rvtests/executable/rvtest"; + private final String BGZIP_BIN = "/home/joaquin/softs/htslib/bgzip"; + private final String TABIX_BIN = "/home/joaquin/softs/htslib/tabix"; + + public RvTestsAnalysis(String inFilename, String outDirname, String confFilename) { + this.inFilename = inFilename; + this.outDirname = outDirname; + this.confFilename = confFilename; + } + +// ./build/bin/hpg-bigdata-local2.sh variant rvtests -i ~/data/vcf/skat/example.vcf.avro -o ~/data/vcf/skat/out --dataset noname -c ~/data/vcf/skat/skat.params + + public void run(String dataset) throws Exception { + // create spark session + SparkConf sparkConf = SparkConfCreator.getConf("variant rvtests", "local", 1, true); + SparkSession sparkSession = new SparkSession(new SparkContext(sparkConf)); + + // load dataset + VariantDataset vd = new VariantDataset(sparkSession); + vd.load(inFilename); + vd.createOrReplaceTempView("vcf"); + + // load rvtests parameters + Properties prop = new Properties(); + InputStream confStream = new FileInputStream(confFilename); + prop.load(confStream); + confStream.close(); + + for (Object key: prop.keySet()) { + System.out.println((String) key + " = " + (String) prop.get(key)); + } + + // create temporary directory + File tmpDir = new File(outDirname + "/tmp"); + tmpDir.mkdir(); + + // create temporary file for --pheno + File phenoFile = new File(tmpDir.getAbsolutePath() + "/pheno"); + VariantMetadataManager metadataManager = new VariantMetadataManager(); + metadataManager.load(inFilename + ".meta.json"); + new PedigreeManager().save(metadataManager.getPedigree(dataset), phenoFile.toPath()); + + // loop for regions + String line; + BufferedReader reader = FileUtils.newBufferedReader(Paths.get(prop.getProperty("setFile"))); + int i = 0; + StringBuilder cmdline = new StringBuilder(); + while ((line = reader.readLine()) != null) { + String[] fields = line.split("[\t ]"); + System.out.println(fields[0]); + String regionName = fields[0]; + Region region = new Region(fields[1]); + + // create temporary files for --inVcf and --setFile + File setFile = new File(tmpDir.getAbsolutePath() + "/setFile." + i); + BufferedWriter writer = FileUtils.newBufferedWriter(setFile.toPath()); + writer.write(fields[0] + "\t" + fields[1] + "\n"); + writer.close(); + + // create temporary vcf file fot the region variants + VariantDataset ds = (VariantDataset) vd.regionFilter(region); + Dataset variantDS = ds.as(Encoders.bean(VariantAvro.class)); + + List rows = ds.collectAsList(); + for (Row row: rows) { + row.g + System.out.println(row); + } + File vcfFile = new File(tmpDir.getAbsolutePath() + "/variants." + i + ".vcf"); + + // compress vcf to bgz + cmdline.setLength(0); + cmdline.append(this.BGZIP_BIN).append(" ").append(vcfFile.getAbsolutePath()); + execute(cmdline.toString()); + + // and create tabix index + cmdline.setLength(0); + cmdline.append(this.TABIX_BIN).append(" -p vcf ").append(vcfFile.getAbsolutePath()).append(".gz"); + execute(cmdline.toString()); + + // rvtests command line + cmdline.setLength(0); + cmdline.append(this.RVTEST_BIN).append(" --kernel skat --pheno ").append(phenoFile.getAbsolutePath()) + .append(" --inVcf ").append(vcfFile.getAbsolutePath()).append(".gz") + .append(" --setFile ").append(setFile.getAbsolutePath()) + .append(" --out ").append(tmpDir.getAbsolutePath()).append("/out.").append(i); + execute(cmdline.toString()); + + i++; + } + reader.close(); + } + + + private void execute(String cmdline) { + try { + System.out.println("Executing: " + cmdline); + Process p = Runtime.getRuntime().exec(cmdline); + + BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); + BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream())); + + // read the output from the command + String s; + System.out.println("Here is the standard output of the command:\n"); + while ((s = stdInput.readLine()) != null) { + System.out.println(s); + } + + // read any errors from the attempted command + System.out.println("Here is the standard error of the command (if any):\n"); + while ((s = stdError.readLine()) != null) { + System.out.println(s); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/ChiSquareTest.java b/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/ChiSquareTest.java index 8c539fcc..2d5c0027 100644 --- a/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/ChiSquareTest.java +++ b/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/ChiSquareTest.java @@ -87,6 +87,5 @@ public void test() { trainingSummary.residuals().show(); System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError()); System.out.println("r2: " + trainingSummary.r2()); - } } \ No newline at end of file diff --git a/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/MLTest.java b/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/MLTest.java index 776b520c..5fa35eb5 100644 --- a/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/MLTest.java +++ b/hpg-bigdata-tools/src/test/java/org/opencb/hpg/bigdata/tools/variant/analysis/MLTest.java @@ -2,48 +2,32 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.ml.classification.LogisticRegression; -import org.apache.spark.ml.classification.LogisticRegressionModel; -import org.apache.spark.ml.linalg.VectorUDT; -import org.apache.spark.ml.linalg.Vectors; -import org.apache.spark.ml.regression.LinearRegression; -import org.apache.spark.ml.regression.LinearRegressionModel; import org.apache.spark.mllib.linalg.Matrices; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.stat.Statistics; import org.apache.spark.mllib.stat.test.ChiSqTestResult; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.StreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Test; -import org.opencb.biodata.models.core.pedigree.Individual; import org.opencb.biodata.models.core.pedigree.Pedigree; import org.opencb.biodata.models.variant.VariantMetadataManager; import org.opencb.hpg.bigdata.core.lib.SparkConfCreator; import org.opencb.hpg.bigdata.core.lib.VariantDataset; import scala.Serializable; -import scala.collection.mutable.*; -import scala.collection.mutable.Map; -import scala.collection.mutable.Queue; +import scala.Tuple2; import scala.collection.mutable.StringBuilder; +import scala.collection.mutable.WrappedArray; import java.io.File; import java.io.PrintWriter; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.*; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.List; import static org.apache.spark.sql.functions.desc; @@ -64,25 +48,281 @@ public class MLTest implements Serializable { // // } -// @Test + + @Test public void streaming() throws Exception { // it doesn't matter what we set to spark's home directory SparkConf sparkConf = SparkConfCreator.getConf("AlignmentDatasetTest", "local", 1, true, ""); System.out.println("sparkConf = " + sparkConf.toDebugString()); - JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); - SparkSession sparkSession = new SparkSession(ssc.sparkContext().sc()); -// SparkSession sparkSession = new SparkSession(new SparkContext(sparkConf)); + SparkSession sparkSession = new SparkSession(new SparkContext(sparkConf)); VariantDataset vd = new VariantDataset(sparkSession); - Path inputPath = Paths.get("/tmp/test.vcf.avro"); +// Path inputPath = Paths.get("/tmp/test.vcf.avro"); // Path inputPath = Paths.get("/media/data100/jtarraga/data/spark/100.variants.avro"); - //Path inputPath = Paths.get(getClass().getResource("/100.variants.avro").toURI()); +// Path inputPath = Paths.get(getClass().getResource("/100.variants.avro").toURI()); + Path inputPath = Paths.get("/home/jtarraga/appl/hpg-bigdata/hpg-bigdata-core/src/test/resources/100.variants.avro"); System.out.println(">>>> opening file " + inputPath); vd.load(inputPath.toString()); vd.createOrReplaceTempView("vcf"); + Dataset rows = vd.sqlContext().sql("SELECT chromosome, start, end FROM vcf"); + + List rowList = rows.collectAsList(); + + List list = rows.toJavaRDD().groupBy(new Function() { + @Override + public String call(Row row) throws Exception { + String key; + int start = row.getInt(1); + if (start < 16066000) { + key = "1"; + } else if (start < 16067000) { + key = "2"; + } else if (start < 16069000) { + key = "3"; + } else { + key = "4"; + } + return key; + } + }).map(new Function>, Integer>() { + @Override + public Integer call(Tuple2> keyValue) throws Exception { + System.out.println("key = " + keyValue._1()); + int i = 0; + PrintWriter writer = new PrintWriter(new File("/tmp/key-" + keyValue._1())); + java.util.Iterator iterator = keyValue._2().iterator(); + while (iterator.hasNext()) { + Row row = iterator.next(); + System.out.println("\t\t" + row.get(0) + "\t" + row.get(1)); + writer.println(row.get(0) + "\t" + row.get(1)); + i++; + } + writer.close(); + return i; + } + }).collect(); + list.forEach(i -> System.out.println(i)); + } + +// @Test + public void streaming00() throws Exception { + + // it doesn't matter what we set to spark's home directory + SparkConf sparkConf = SparkConfCreator.getConf("AlignmentDatasetTest", "local", 1, true, ""); + System.out.println("sparkConf = " + sparkConf.toDebugString()); + +// JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); +// SparkSession sparkSession = new SparkSession(ssc.sparkContext().sc()); + SparkSession sparkSession = new SparkSession(new SparkContext(sparkConf)); + VariantDataset vd = new VariantDataset(sparkSession); +// Path inputPath = Paths.get("/tmp/test.vcf.avro"); +// Path inputPath = Paths.get("/media/data100/jtarraga/data/spark/100.variants.avro"); +// Path inputPath = Paths.get(getClass().getResource("/100.variants.avro").toURI()); + Path inputPath = Paths.get("/home/jtarraga/appl/hpg-bigdata/hpg-bigdata-core/src/test/resources/100.variants.avro"); + System.out.println(">>>> opening file " + inputPath); + vd.load(inputPath.toString()); + vd.createOrReplaceTempView("vcf"); + + Dataset rows = vd.sqlContext().sql("SELECT chromosome, start, end FROM vcf"); + + List rowList = rows.collectAsList(); + + +// rows.grou +// JavaPairRDD> pairs = rows.toJavaRDD().groupBy(new ); + //JavaPairRDD> + + List list = rows.toJavaRDD().groupBy(new Function() { + @Override + public String call(Row row) throws Exception { + String key; + int start = row.getInt(1); + if (start < 16066000) { + key = "1"; + } else if (start < 16067000) { + key = "2"; + } else if (start < 16069000) { + key = "3"; + } else { + key = "4"; + } + return key; + } + }).map(new Function>, Integer>() { + @Override + public Integer call(Tuple2> keyValue) throws Exception { + System.out.println("key = " + keyValue._1()); + int i = 0; + PrintWriter writer = new PrintWriter(new File("/tmp/key-" + keyValue._1())); + java.util.Iterator iterator = keyValue._2().iterator(); + while (iterator.hasNext()) { + Row row = iterator.next(); + System.out.println("\t\t" + row.get(0) + "\t" + row.get(1)); + writer.println(row.get(0) + "\t" + row.get(1)); + i++; + } + writer.close(); +// keyValue._2().forEach(row -> { +// i++; +// System.out.println("\t\t" + row.get(0) + "\t" + row.get(1)) +// }); + return i; + } + }).collect(); + list.forEach(i -> System.out.println(i)); +/* + KeyValueGroupedDataset groups = rows.groupByKey(new MapFunction() { + @Override + public String call(Row row) throws Exception { + String key; + int start = row.getInt(1); + if (start < 16066000) { + key = "1"; + } else if (start < 16067000) { + key = "2"; + } else if (start < 16069000) { + key = "3"; + } else { + key = "4"; + } + return key; + } + }, Encoders.STRING()); + +// groups.keys().foreach(k -> System.out.println(k)); +/* + groups.mapGroups(new Function2, Object>() { + @Override + public Object apply(String s, Iterator rowIterator) { + return null; + } +/* + r educe(new ReduceFunction() { + @Override + public Row call(Row row, Row t1) throws Exception { + return null; + } + }); + + JavaPairRDD keyValues = rows.toJavaRDD().groupBy(new Function() { + @Override + public Object call(Row row) throws Exception { + return null; + } + }); + + JavaPairRDD groupMap = productSaleMap.groupBy(new Function() { + @Override + public Object call(ProductSale productSale) throws Exception { + c.setTime(productSale.getSale().getPurchaseDate()); + return c.get(Calendar.YEAR); + } + }); + + + JavaPairRDD totalSaleData = groupMap.mapValues(new Function() { + @Override + public Long call(Iterable productSales) throws Exception { + Long sumData = 0L; + for (ProductSale productSale : productSales) { + sumData = sumData + (productSale.getProduct().getPrice() * productSale.getSale().getItemPurchased()); + } + return sumData; + } + }); + + + JavaPairRDD pairs = rows.toJavaRDD().mapToPair(new PairFunction() { + @Override + public Tuple2 call(Row row) throws Exception { + String key = ""; + int start = row.getInt(1); + if (start < 16066000) { + key = "1"; + } else if (start < 16067000) { + key = "2"; + } else if(start < 16069000) { + key = "3"; + } else { + key = "4"; + } + + return new Tuple2<>(key, row); + } + }).reduceByKey()reduce(new Function2, Tuple2, Tuple2>() { + @Override + public Tuple2 call(Tuple2 stringRowTuple2, Tuple2 stringRowTuple22) throws Exception { + return null; + } + }); + + rows.toJavaRDD(( + + JavaPairRDD rddX = + x.mapToPair(e -> new Tuple2(e, 1)); + + // New JavaPairRDD + JavaPairRDD rddY = rddX.reduceByKey(reduceSumFunc); + + //Print tuples + for(Tuple2 element : rddY.collect()){ + System.out.println("("+element._1+", "+element._2+")"); + } + } +} + + rows.toJavaRDD().key + rows.mreduce(new ReduceFunction() { + @Override + public Row call(Row row, Row t1) throws Exception { + return null; + } + }); + +/* + List> buffer = new ArrayList<>(); + + + + //Dataset> datasets; + + //Dataset dataset; + //dataset.gr + + + Thread thread = new Thread() { + @Override public void run() { + System.out.println("------>>>>> Starting thread"); + for (int i = 0; i < 5; i++) { + Dataset ds = vd.sqlContext().sql("SELECT chromosome, start, end FROM vcf"); + //System.out.println("=================> Stream, dataset " + i + " with " + ds.count() + " rows"); + System.out.println("------------------------>>>>> adding new dataset"); + buffer.add(ds); + } + System.out.println("------>>>>> Ending thread"); + } + }; + + thread.start(); + + while (thread.isAlive() || buffer.size() > 0) { + System.out.println("Still alive or buffer size " + buffer.size() + "..."); + if (buffer.size() > 0) { + Dataset ds = buffer.remove(0); + List list = ds.collectAsList(); + for (int i = 0; i < list.size(); i++) { + System.out.println(i + "\t" + list.get(i).get(0) + "\t" + list.get(i).get(1) + "\t" + list.get(i).get(2)); + } + } + Thread.sleep(1000); + } + + + + //StreamingExamples.setStreamingLogLevels(); @@ -92,16 +332,16 @@ public void streaming() throws Exception { // Create an input stream with the custom receiver on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - JavaReceiverInputDStream> datasets = ssc.receiverStream( - new JavaCustomReceiver(vd.sqlContext())); +// JavaReceiverInputDStream> datasets = ssc.receiverStream( +// new JavaCustomReceiver(vd.sqlContext())); - JavaDStream words = datasets.map(new Function, String>() { - @Override - public String call(Dataset dataset) throws Exception { - return "toto"; - } - }); - words.print(); + // JavaDStream words = datasets.map(new Function, String>() { +// @Override +// public String call(Dataset dataset) throws Exception { +// return "toto"; +// } +// }); +// words.print(); /* JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override @@ -123,8 +363,8 @@ public Integer call(Integer i1, Integer i2) { }); wordCounts.print(); -*/ ssc.start(); - ssc.awaitTermination(); +*/// ssc.start(); + // ssc.awaitTermination(); } // @Test