Skip to content

Commit

Permalink
storage: Fix AIOOBE SampleVariantStatsDriver #TASK-6722
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Nov 29, 2024
1 parent 9f326d9 commit 627e56a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,26 +125,27 @@ private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRan
Preconditions.checkNotNull(qplan);
Preconditions.checkNotNull(splits);
final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
int undividedSplits = 0;
int numScanSplit = configuration.getInt(HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.key(),
HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.defaultValue());
for (List<Scan> scans : qplan.getScans()) {
if (scans.size() == 1) {
// Split scans into multiple smaller scans
int numScans = configuration.getInt(HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.key(),
HadoopVariantStorageOptions.MR_HBASE_PHOENIX_SCAN_SPLIT.defaultValue());
List<Scan> splitScans = new ArrayList<>(numScans);
List<Scan> splitScans = new ArrayList<>(numScanSplit);
Scan scan = scans.get(0);
byte[] startRow = scan.getStartRow();
if (startRow == null || startRow.length == 0) {
startRow = Bytes.toBytesBinary("1\\x00\\x00\\x00\\x00\\x00");
logger.info("Scan with empty startRow. Set default start. "
+ "[" + Bytes.toStringBinary(startRow) + "-" + Bytes.toStringBinary(scan.getStopRow()) + ")");
+ "[" + Bytes.toStringBinary(startRow) + " - " + Bytes.toStringBinary(scan.getStopRow()) + ")");
}
byte[] stopRow = scan.getStopRow();
if (stopRow == null || stopRow.length == 0) {
stopRow = Bytes.toBytesBinary("Z\\x00\\x00\\x00\\x00\\x00");
logger.info("Scan with empty stopRow. Set default stop. "
+ "[" + Bytes.toStringBinary(startRow) + "-" + Bytes.toStringBinary(stopRow) + ")");
+ "[" + Bytes.toStringBinary(startRow) + " - " + Bytes.toStringBinary(stopRow) + ")");
}
byte[][] ranges = Bytes.split(startRow, stopRow, numScans - 1);
byte[][] ranges = Bytes.split(startRow, stopRow, numScanSplit - 1);
for (int i = 1; i < ranges.length; i++) {
Scan splitScan = new Scan(scan);
splitScan.withStartRow(ranges[i - 1]);
Expand All @@ -156,8 +157,14 @@ private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRan
}
} else {
psplits.add(new PhoenixInputSplit(scans));
undividedSplits++;
}
}
logger.info("Subdivided " + qplan.getScans().size() + " splits into " + psplits.size() + " splits. "
+ "Intended sub-splits per split: " + numScanSplit);
if (undividedSplits > 0) {
logger.info("There are " + undividedSplits + " splits that were not subdivided.");
}
return psplits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public static class SampleVariantStatsMapper extends VariantRowMapper<IntWritabl
private final Map<Integer, List<Integer>> fileToSampleIds = new HashMap<>();
private DistributedSampleVariantStatsCalculator calculator;
private final HBaseToVariantAnnotationConverter annotationConverter = new HBaseToVariantAnnotationConverter();
private int[] sampleIdsPosition;
private Map<Integer, Integer> sampleIdsPosition;
private int sampleDataDpIdx;
private int fileDataDpIdx;
private Predicate<VariantRow.FileColumn> fileDataFilter;
Expand All @@ -393,7 +393,6 @@ protected void setup(Context context) throws IOException, InterruptedException {
studyId = context.getConfiguration().getInt(STUDY_ID, -1);
samples = context.getConfiguration().getInts(SAMPLE_IDS);
includeSamples = context.getConfiguration().getInts(INCLUDE_SAMPLE_IDS);
sampleIdsPosition = new int[IntStream.of(includeSamples).max().orElse(0) + 1];

String fileDataQuery = context.getConfiguration().get(VariantQueryParam.FILE_DATA.key());
String sampleDataQuery = context.getConfiguration().get(VariantQueryParam.SAMPLE_DATA.key());
Expand All @@ -404,9 +403,9 @@ protected void setup(Context context) throws IOException, InterruptedException {
sampleDataDpIdx = fixedFormat.indexOf(VCFConstants.DEPTH_KEY);
fileDataDpIdx = fileAttributes.indexOf(VCFConstants.DEPTH_KEY);

Arrays.fill(sampleIdsPosition, -1);
sampleIdsPosition = new HashMap<>(includeSamples.length);
for (int i = 0; i < includeSamples.length; i++) {
sampleIdsPosition[includeSamples[i]] = i;
sampleIdsPosition.put(includeSamples[i], i);
}

Pedigree pedigree = readPedigree(context.getConfiguration());
Expand All @@ -424,11 +423,20 @@ private List<Integer> getSamplesFromFileId(int fileId) {
id -> {
ArrayList<Integer> sampleIds = new ArrayList<>(vsm.getFileMetadata(studyId, id).getSamples());
// Discard unused samples
sampleIds.removeIf(s -> sampleIdsPosition.length <= s || sampleIdsPosition[s] < 0);
sampleIds.removeIf(s -> !sampleIdsPosition.containsKey(s));
return sampleIds;
});
}

private int getSamplePosition(Integer sampleId) {
Integer samplePosition = sampleIdsPosition.get(sampleId);
if (samplePosition == null) {
throw new IllegalStateException("Sample " + sampleId + " not found in includeSamples "
+ Arrays.toString(includeSamples));
}
return samplePosition;
}

@Override
protected void map(Object key, VariantRow row, Context context) throws IOException, InterruptedException {
VariantAnnotation[] annotation = new VariantAnnotation[1];
Expand All @@ -443,9 +451,10 @@ protected void map(Object key, VariantRow row, Context context) throws IOExcepti

Variant variant = row.walker().onSample(sampleCell -> {
int sampleId = sampleCell.getSampleId();
int samplePosition = getSamplePosition(sampleId);
if (!sampleDataFilter.test(sampleCell)) {
// Invalidate sample
invalidSamples[sampleIdsPosition[sampleId]] = true;
invalidSamples[samplePosition] = true;
return;
}

Expand All @@ -454,43 +463,45 @@ protected void map(Object key, VariantRow row, Context context) throws IOExcepti
if (gt == null || gt.isEmpty()) {
// This is a really weird situation, most likely due to errors in the input files
logger.error("Empty genotype at sample " + sampleId + " in variant " + row.getVariant());
gts.set(sampleIdsPosition[sampleId], GenotypeClass.NA_GT_VALUE);
gts.set(samplePosition, GenotypeClass.NA_GT_VALUE);
} else if (gt.equals(GenotypeClass.UNKNOWN_GENOTYPE)) {
// skip unknown genotypes
context.getCounter(COUNTER_GROUP_NAME, "unknownGt").increment(1);
} else {
gts.set(sampleIdsPosition[sampleId], gt);
gts.set(samplePosition, gt);
}

if (sampleDataDpIdx > 0) {
String dp = sampleCell.getSampleData(sampleDataDpIdx);
// Do not set invalid values
if (StringUtils.isNumeric(dp)) {
dps.set(sampleIdsPosition[sampleId], dp);
dps.set(samplePosition, dp);
}
}
}).onFile(fileCell -> {
int fileId = fileCell.getFileId();

if (fileDataFilter.test(fileCell)) {
for (Integer sampleId : getSamplesFromFileId(fileId)) {
filters.set(sampleIdsPosition[sampleId], fileCell.getFilter());
quals.set(sampleIdsPosition[sampleId], fileCell.getQualString());
int samplePosition = getSamplePosition(sampleId);
filters.set(samplePosition, fileCell.getFilter());
quals.set(samplePosition, fileCell.getQualString());
if (fileDataDpIdx > 0) {
String dp = fileCell.getFileData(fileDataDpIdx);
// Do not set invalid values
if (StringUtils.isNumeric(dp)) {
// Prioritize DP value from FORMAT. Do not overwrite if present.
if (StringUtils.isEmpty(dps.get(sampleIdsPosition[sampleId]))) {
dps.set(sampleIdsPosition[sampleId], dp);
if (StringUtils.isEmpty(dps.get(samplePosition))) {
dps.set(samplePosition, dp);
}
}
}
}
} else {
// Invalidate samples from this file
for (Integer sampleId : getSamplesFromFileId(fileId)) {
invalidSamples[sampleIdsPosition[sampleId]] = true;
int samplePosition = getSamplePosition(sampleId);
invalidSamples[samplePosition] = true;
}
}
}).onVariantAnnotation(variantAnnotationColumn -> {
Expand Down

0 comments on commit 627e56a

Please sign in to comment.