Skip to content

Commit

Permalink
storage: Split PhoenixInputSplits into smaller splits. #TASK-6722
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Nov 27, 2024
1 parent f87686e commit a389e10
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption {
MR_HBASE_SCAN_CACHING("storage.hadoop.mr.scan.caching", 50),
MR_HBASE_SCAN_MAX_COLUMNS("storage.hadoop.mr.scan.maxColumns", 25000),
MR_HBASE_SCAN_MAX_FILTERS("storage.hadoop.mr.scan.maxFilters", 2000),
MR_HBASE_PHOENIX_SCAN_SPLIT("storage.hadoop.mr.phoenix.scanSplit", 5),

/**
* MapReduce executor. Could be either 'system' or 'ssh'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
Expand All @@ -21,11 +22,16 @@
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.opencb.opencga.storage.hadoop.HBaseCompat;
import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

Expand All @@ -39,6 +45,7 @@
*/
public class CustomPhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable, T> {
private static final Log LOG = LogFactory.getLog(CustomPhoenixInputFormat.class);
private static Logger logger = LoggerFactory.getLogger(CustomPhoenixInputFormat.class);

@Override
public RecordReader<NullWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context)
Expand All @@ -58,6 +65,20 @@ public CloseValueRecordReader(RecordReader<K, V> recordReader) {
super(recordReader, v -> v);
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
super.initialize(split, context);
if (split instanceof PhoenixInputSplit) {
PhoenixInputSplit phoenixInputSplit = (PhoenixInputSplit) split;
logger.info("Key range : " + phoenixInputSplit.getKeyRange());
logger.info("Split: " + phoenixInputSplit.getScans().size() + " scans");
int i = 0;
for (Scan scan : phoenixInputSplit.getScans()) {
logger.info("[{}] Scan: {}", ++i, scan);
}
}
}

@Override
public void close() throws IOException {
V currentValue;
Expand All @@ -78,16 +99,41 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
final Configuration configuration = context.getConfiguration();
final QueryPlan queryPlan = getQueryPlan(context, configuration);
final List<KeyRange> allSplits = queryPlan.getSplits();
final List<InputSplit> splits = generateSplits(queryPlan, allSplits);
final List<InputSplit> splits = generateSplits(queryPlan, allSplits, configuration);
return splits;
}

private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits, Configuration configuration)
throws IOException {
Preconditions.checkNotNull(qplan);
Preconditions.checkNotNull(splits);
final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
for (List<Scan> scans : qplan.getScans()) {
psplits.add(new PhoenixInputSplit(scans));
if (scans.size() == 1) {
// Split scans into multiple smaller scans
int numScans = configuration.getInt(HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.key(),
HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.defaultValue());
List<Scan> splitScans = new ArrayList<>(numScans);
Scan scan = scans.get(0);
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
if (startRow != null && startRow.length != 0 && stopRow != null && stopRow.length != 0) {
byte[][] ranges = Bytes.split(startRow, stopRow, numScans - 1);
for (int i = 1; i < ranges.length; i++) {
Scan splitScan = new Scan(scan);
splitScan.withStartRow(ranges[i - 1]);
splitScan.withStopRow(ranges[i], false);
splitScans.add(splitScan);
}
} else {
splitScans.add(scan);
}
for (Scan splitScan : splitScans) {
psplits.add(new PhoenixInputSplit(Collections.singletonList(splitScan)));
}
} else {
psplits.add(new PhoenixInputSplit(scans));
}
}
return psplits;
}
Expand Down

0 comments on commit a389e10

Please sign in to comment.