From a389e10115d75e9a023cebf63ac28e5e1313bf16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Wed, 27 Nov 2024 17:16:52 +0000 Subject: [PATCH] storage: Split PhoenixInputSplits into smaller splits. #TASK-6722 --- .../variant/HadoopVariantStorageOptions.java | 1 + .../variant/mr/CustomPhoenixInputFormat.java | 52 +++++++++++++++++-- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java index 268caaf9253..37331233aad 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java @@ -42,6 +42,7 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption { MR_HBASE_SCAN_CACHING("storage.hadoop.mr.scan.caching", 50), MR_HBASE_SCAN_MAX_COLUMNS("storage.hadoop.mr.scan.maxColumns", 25000), MR_HBASE_SCAN_MAX_FILTERS("storage.hadoop.mr.scan.maxFilters", 2000), + MR_HBASE_PHOENIX_SCAN_SPLIT("storage.hadoop.mr.phoenix.scanSplit", 5), /** * MapReduce executor. Could be either 'system' or 'ssh'. diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/CustomPhoenixInputFormat.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/CustomPhoenixInputFormat.java index 5b280facb0e..b8e34933c95 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/CustomPhoenixInputFormat.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/CustomPhoenixInputFormat.java @@ -7,6 +7,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.db.DBWritable; @@ -21,11 +22,16 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.opencb.opencga.storage.hadoop.HBaseCompat; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.sql.Connection; import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -39,6 +45,7 @@ */ public class CustomPhoenixInputFormat extends InputFormat { private static final Log LOG = LogFactory.getLog(CustomPhoenixInputFormat.class); + private static Logger logger = LoggerFactory.getLogger(CustomPhoenixInputFormat.class); @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) @@ -58,6 +65,20 @@ public CloseValueRecordReader(RecordReader recordReader) { super(recordReader, v -> v); } + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(split, context); + if (split instanceof PhoenixInputSplit) { + PhoenixInputSplit phoenixInputSplit = (PhoenixInputSplit) split; + logger.info("Key range : " + phoenixInputSplit.getKeyRange()); + logger.info("Split: " + phoenixInputSplit.getScans().size() + " scans"); + int i = 0; + for (Scan scan : phoenixInputSplit.getScans()) { + logger.info("[{}] Scan: {}", ++i, scan); + } + } + } + @Override public void close() throws IOException { V currentValue; @@ -78,16 +99,41 @@ public List getSplits(JobContext context) throws IOException, Interr final Configuration configuration = context.getConfiguration(); final QueryPlan queryPlan = getQueryPlan(context, configuration); final List allSplits = queryPlan.getSplits(); - final List splits = generateSplits(queryPlan, allSplits); + final List splits = generateSplits(queryPlan, allSplits, configuration); return splits; } - private List generateSplits(final QueryPlan qplan, final List splits) throws IOException { + private List generateSplits(final QueryPlan qplan, final List splits, Configuration configuration) + throws IOException { Preconditions.checkNotNull(qplan); Preconditions.checkNotNull(splits); final List psplits = Lists.newArrayListWithExpectedSize(splits.size()); for (List scans : qplan.getScans()) { - psplits.add(new PhoenixInputSplit(scans)); + if (scans.size() == 1) { + // Split scans into multiple smaller scans + int numScans = configuration.getInt(HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.key(), + HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.defaultValue()); + List splitScans = new ArrayList<>(numScans); + Scan scan = scans.get(0); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + if (startRow != null && startRow.length != 0 && stopRow != null && stopRow.length != 0) { + byte[][] ranges = Bytes.split(startRow, stopRow, numScans - 1); + for (int i = 1; i < ranges.length; i++) { + Scan splitScan = new Scan(scan); + splitScan.withStartRow(ranges[i - 1]); + splitScan.withStopRow(ranges[i], false); + splitScans.add(splitScan); + } + } else { + splitScans.add(scan); + } + for (Scan splitScan : splitScans) { + psplits.add(new PhoenixInputSplit(Collections.singletonList(splitScan))); + } + } else { + psplits.add(new PhoenixInputSplit(scans)); + } } return psplits; }