From 45df65f82b1c01d4d429248f81538af75ed63faa Mon Sep 17 00:00:00 2001 From: Matthew Peterson Date: Thu, 24 Feb 2022 21:37:47 +0000 Subject: [PATCH 1/8] Fix #1438 - optionally gather fields after ingest exception --- .../ingest/mapreduce/EventMapper.java | 249 ++++------ .../ingest/mapreduce/FieldHarvester.java | 255 ++++++++++ .../ingest/mapreduce/FieldSalvager.java | 24 + .../edge/ProtobufEdgeDataTypeHandler.java | 4 +- .../data/RawRecordContainerImplTest.java | 4 +- .../data/config/DataTypeHelperImplTest.java | 12 +- .../config/NormalizedFieldAndValueTest.java | 4 +- .../FieldNameAliaserNormalizerTest.java | 6 +- .../MinimalistIngestHelperInterfaceImpl.java | 246 ++++++++++ .../normalizer/AbstractNormalizerTest.java | 4 +- .../EventMapperSalvageFieldsOnErrorTest.java | 233 +++++++++ .../ingest/mapreduce/EventMapperTest.java | 32 +- .../ingest/mapreduce/FieldHarvesterTest.java | 464 ++++++++++++++++++ .../ingest/mapreduce/IngestTestSetup.java | 65 +++ .../DateIndexDataTypeHandlerTest.java | 4 +- .../edge/ProtobufEdgeDeleteModeTest.java | 6 +- .../job/CBMutationOutputFormatterTest.java | 14 +- .../datawave/util/TypeRegistryTestSetup.java | 20 + 18 files changed, 1433 insertions(+), 213 deletions(-) create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java index 6ccf80c8fa2..ba37b2e6538 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java @@ -3,18 +3,13 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import datawave.data.normalizer.DateNormalizer; import datawave.ingest.data.RawRecordContainer; import datawave.ingest.data.Type; import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelper; import datawave.ingest.data.config.NormalizedContentInterface; -import datawave.ingest.data.config.NormalizedFieldAndValue; import datawave.ingest.data.config.filter.KeyValueFilter; -import datawave.ingest.data.config.ingest.CompositeIngest; -import datawave.ingest.data.config.ingest.FilterIngest; import datawave.ingest.data.config.ingest.IngestHelperInterface; -import datawave.ingest.data.config.ingest.VirtualIngest; import datawave.ingest.input.reader.event.EventErrorSummary; import datawave.ingest.mapreduce.handler.DataTypeHandler; import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler; @@ -56,12 +51,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.Stack; @@ -92,9 +85,6 @@ * output value */ public class EventMapper extends StatsDEnabledMapper { - - private static final String SRC_FILE_DEL = "|"; - private static final Logger log = Logger.getLogger(EventMapper.class); /** @@ -107,24 +97,6 @@ public class EventMapper extends StatsDE public static final String CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS = "ingest.event.mapper.context.writer.output.table.counters"; public static final String FILE_NAME_COUNTERS = "ingest.event.mapper.file.name.counters"; - protected boolean createSequenceFileName = true; - - protected boolean trimSequenceFileName = true; - - protected boolean createRawFileName = true; - - public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE"; - - public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE"; - - public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename"; - - public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename"; - - public static final String RAW_FILE_FIELDNAME = "RAW_FILE"; - - public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename"; - public static final String ID_FILTER_FSTS = "ingest.event.mapper.id.filter.fsts"; protected Map>> typeMap = new HashMap<>(); @@ -142,8 +114,6 @@ public class EventMapper extends StatsDE private StandaloneStatusReporter reporter = new StandaloneStatusReporter(); - private DateNormalizer dateNormalizer = new DateNormalizer(); - private ContextWriter contextWriter = null; protected long offset = 0; @@ -160,6 +130,8 @@ public class EventMapper extends StatsDE private MetricsService metricsService; private ReusableMetricsLabels metricsLabels; + private FieldHarvester fieldHarvester; + /** * Set up the datatype handlers */ @@ -186,12 +158,8 @@ public void setup(Context context) throws IOException, InterruptedException { interval = context.getConfiguration().getLong(DISCARD_INTERVAL, 0l); - // default to true, but it can be disabled - createSequenceFileName = context.getConfiguration().getBoolean(LOAD_SEQUENCE_FILE_NAME, true); - - trimSequenceFileName = context.getConfiguration().getBoolean(TRIM_SEQUENCE_FILE_NAME, true); - - createRawFileName = context.getConfiguration().getBoolean(LOAD_RAW_FILE_NAME, true); + // FieldHarvester encapsulates the addition of virtual fields, composite fields, LOAD_DATE, etc. + fieldHarvester = new FieldHarvester(context.getConfiguration()); Class> firstFilter = null; @@ -287,7 +255,7 @@ public void setup(Context context) throws IOException, InterruptedException { * * @return the data type handlers */ - private List> loadDataType(String typeStr, Context context) { + private List> loadDataTypeHandlers(String typeStr, Context context) { // Do not load the type twice if (!typeMap.containsKey(typeStr)) { @@ -388,7 +356,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt } // ensure this datatype's handlers etc are loaded such that the dataTypeDiscardIntervalCache and validators are filled as well - List> typeHandlers = loadDataType(value.getDataType().typeName(), context); + List> typeHandlers = loadDataTypeHandlers(value.getDataType().typeName(), context); // This is a little bit fragile, but there is no other way // to get the context on a partitioner, and we are only @@ -432,13 +400,13 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt try { // Load error dataType into typeMap - loadDataType(TypeRegistry.ERROR_PREFIX, context); + loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context); // purge event errorSummary.purge(contextWriter, context, value, typeMap); // Set the original file value from the event in the error table - Collection origFiles = errorSummary.getEventFields().get(SEQUENCE_FILE_FIELDNAME); + Collection origFiles = errorSummary.getEventFields().get(FieldHarvester.SEQUENCE_FILE_FIELDNAME); if (!origFiles.isEmpty()) { NDC.push(origFiles.iterator().next()); reprocessedNDCPush = true; @@ -468,7 +436,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt // Add the list of handlers with the ALL specified handlers List> handlers = new ArrayList<>(); handlers.addAll(typeHandlers); - handlers.addAll(loadDataType(TypeRegistry.ALL_PREFIX, context)); + handlers.addAll(loadDataTypeHandlers(TypeRegistry.ALL_PREFIX, context)); // Always include any event errors in the counters for (String error : value.getErrors()) { @@ -481,7 +449,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt handlers.clear(); if (!value.ignorableError()) { // since this is not an ignorable error, lets add the error handlers back into the list - handlers.addAll(loadDataType(TypeRegistry.ERROR_PREFIX, context)); + handlers.addAll(loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context)); getCounter(context, IngestInput.EVENT_FATAL_ERROR).increment(1); getCounter(context, IngestInput.EVENT_FATAL_ERROR.name(), "ValidationError").increment(1); @@ -500,42 +468,12 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt // Rollback anything written for this event contextWriter.rollback(); - // Fail job on constraint violations - if (e instanceof ConstraintChecker.ConstraintViolationException) { - throw ((RuntimeException) e); - } + failJobOnConstraintViolations(e); // ensure they know we are still working on it context.progress(); - // log error - log.error("Runtime exception processing event", e); - - // now lets dump to the errors table - // first set the exception on the event if not a field normalization error in which case the fields contain the errors - if (!(e instanceof FieldNormalizationError)) { - value.setAuxData(e); - } - for (DataTypeHandler handler : loadDataType(TypeRegistry.ERROR_PREFIX, context)) { - if (log.isTraceEnabled()) - log.trace("executing handler: " + handler.getClass().getName()); - try { - executeHandler(key, value, fields, handler, context); - context.progress(); - } catch (Exception e2) { - // This is a real bummer, we had a critical exception attempting to throw the event into the error table. - // lets terminate this job - log.error("Failed to process error data handlers for an event", e2); - throw new IOException("Failed to process error data handlers for an event", e2); - } - } - - // now create some counters - getCounter(context, IngestProcess.RUNTIME_EXCEPTION).increment(1); - List exceptions = getExceptionSynopsis(e); - for (String exception : exceptions) { - getCounter(context, IngestProcess.RUNTIME_EXCEPTION.name(), exception).increment(1); - } + handleProcessingError(key, value, context, fields, e); } finally { // Remove ORIG_FILE from NDC that was populated by reprocessing events from the error tables if (reprocessedNDCPush) { @@ -546,10 +484,60 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt context.progress(); } + incrementEventCount(value, context); + + updateMetrics(value, eventMapperTimer, fields); + } + + private void failJobOnConstraintViolations(Exception e) { + if (e instanceof ConstraintChecker.ConstraintViolationException) { + throw ((RuntimeException) e); + } + } + + private void handleProcessingError(K1 key, V1 value, Context context, Multimap fields, Exception e) throws IOException { + log.error("Runtime exception processing event", e); + // first set the exception on the event if not a field normalization error in which case the fields contain the errors + if (!(e instanceof FieldHarvester.FieldNormalizationError)) { + value.setAuxData(e); + } + writeToErrorTables(key, value, context, fields); + incrementExceptionCounters(context, e); + } + + private void writeToErrorTables(K1 key, V1 value, Context context, Multimap fields) throws IOException { + // now lets dump to the errors table + for (DataTypeHandler handler : loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context)) { + if (log.isTraceEnabled()) + log.trace("executing handler: " + handler.getClass().getName()); + try { + executeHandler(key, value, fields, handler, context); + context.progress(); + } catch (Exception e2) { + // This is a real bummer, we had a critical exception attempting to throw the event into the error table. + // lets terminate this job + log.error("Failed to process error data handlers for an event", e2); + throw new IOException("Failed to process error data handlers for an event", e2); + } + } + } + + private void incrementExceptionCounters(Context context, Exception e) { + // now create some counters + getCounter(context, IngestProcess.RUNTIME_EXCEPTION).increment(1); + List exceptions = getExceptionSynopsis(e); + for (String exception : exceptions) { + getCounter(context, IngestProcess.RUNTIME_EXCEPTION.name(), exception).increment(1); + } + } + + private void incrementEventCount(V1 value, Context context) { getCounter(context, IngestOutput.EVENTS_PROCESSED.name(), value.getDataType().typeName().toUpperCase()).increment(1); offset++; - + } + + private void updateMetrics(V1 value, TraceStopwatch eventMapperTimer, Multimap fields) { if (metricsEnabled && eventMapperTimer != null) { eventMapperTimer.stop(); long timeInEventMapper = eventMapperTimer.elapsed(TimeUnit.MILLISECONDS); @@ -695,27 +683,15 @@ public void processEvent(K1 key, RawRecordContainer value, List entry : getFields(value, handler).entries()) { - // noinspection ThrowableResultOfMethodCallIgnored - if (entry.getValue().getError() != null) { - e = entry.getValue().getError(); - } - fields.put(entry.getKey(), entry.getValue()); - } - if (e != null) { - throw new FieldNormalizationError("Failed getting all fields", e); - } - // Event based metrics - if (metricsEnabled) { - metricsLabels.clear(); - metricsLabels.put("dataType", value.getDataType().typeName()); - - metricsService.collect(Metric.EVENT_COUNT, metricsLabels.get(), fields, 1L); - metricsService.collect(Metric.BYTE_COUNT, metricsLabels.get(), fields, (long) value.getRawData().length); + // populates fields by parsing value and using IngestHelper + fieldHarvester.extractFields(fields, thisHelper, value, offset, splitStart); + if (fieldHarvester.hasError()) { + throw new Exception(fieldHarvester.getException()); } + updateMetrics(value, fields); + previousHelper = thisHelper; } @@ -730,79 +706,32 @@ public void processEvent(K1 key, RawRecordContainer value, List fields) { + // Event based metrics + if (metricsEnabled) { + metricsLabels.clear(); + metricsLabels.put("dataType", value.getDataType().typeName()); + + metricsService.collect(Metric.EVENT_COUNT, metricsLabels.get(), fields, 1L); + metricsService.collect(Metric.BYTE_COUNT, metricsLabels.get(), fields, (long) value.getRawData().length); } } + /** + * Deprecated. Use #fieldHarvester.extractFields() + */ + @Deprecated + // After eliminating this method, expand fieldHarvester.extractFields by eliminating faultTolerantGetEventFields and addSupplementalFields public Multimap getFields(RawRecordContainer value, DataTypeHandler handler) throws Exception { - Multimap newFields; - // Parse the event into its field names and field values using the DataTypeHandler's BaseIngestHelper object. - newFields = handler.getHelper(value.getDataType()).getEventFields(value); - - // Also get the virtual fields, if applicable. - if (handler.getHelper(value.getDataType()) instanceof VirtualIngest) { - VirtualIngest vHelper = (VirtualIngest) handler.getHelper(value.getDataType()); - Multimap virtualFields = vHelper.getVirtualFields(newFields); - for (Entry v : virtualFields.entries()) - newFields.put(v.getKey(), v.getValue()); - } - // Also get the composite fields, if applicable - if (handler.getHelper(value.getDataType()) instanceof CompositeIngest) { - CompositeIngest vHelper = (CompositeIngest) handler.getHelper(value.getDataType()); - Multimap compositeFields = vHelper.getCompositeFields(newFields); - for (String fieldName : compositeFields.keySet()) { - // if this is an overloaded composite field, we are replacing the existing field data - if (vHelper.isOverloadedCompositeField(fieldName)) - newFields.removeAll(fieldName); - newFields.putAll(fieldName, compositeFields.get(fieldName)); - } - } - - // Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes - long loadDate = now.get(); - NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate)); - // set an indexed field value for use by the date index data type handler - loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate))); - newFields.put(LOAD_DATE_FIELDNAME, loadDateValue); - - String seqFileName = null; - - // place the sequence filename into the event - if (createSequenceFileName) { - seqFileName = NDC.peek(); - - if (trimSequenceFileName) { - seqFileName = StringUtils.substringAfterLast(seqFileName, "/"); - } - - if (null != seqFileName) { - StringBuilder seqFile = new StringBuilder(seqFileName); - - seqFile.append(SRC_FILE_DEL).append(offset); - - if (null != splitStart) { - seqFile.append(SRC_FILE_DEL).append(splitStart); - } - - newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString())); - } + Multimap fields = fieldHarvester.faultTolerantGetEventFields(value, handler.getHelper(value.getDataType())); + if (fieldHarvester.hasError()) { + throw new Exception(fieldHarvester.getException()); } - - if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) { - newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName())); - } - - // Also if this helper needs to filter the fields before returning, apply now - if (handler.getHelper(value.getDataType()) instanceof FilterIngest) { - FilterIngest fHelper = (FilterIngest) handler.getHelper(value.getDataType()); - fHelper.filter(newFields); + fieldHarvester.addSupplementalFields(value, offset, splitStart, handler.getHelper(value.getDataType()), fields); + if (fieldHarvester.hasError()) { + throw new Exception(fieldHarvester.getException()); } - - return newFields; + return fields; } @SuppressWarnings("unchecked") diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java new file mode 100644 index 00000000000..8893fbeb880 --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java @@ -0,0 +1,255 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import datawave.data.normalizer.DateNormalizer; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValue; +import datawave.ingest.data.config.ingest.CompositeIngest; +import datawave.ingest.data.config.ingest.FilterIngest; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.VirtualIngest; +import datawave.ingest.time.Now; +import datawave.util.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import java.util.Date; +import java.util.Map; + +public class FieldHarvester { + private static final Logger log = Logger.getLogger(FieldHarvester.class); + + private static Now now = Now.getInstance(); + + public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE"; + public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE"; + public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename"; + public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename"; + public static final String RAW_FILE_FIELDNAME = "RAW_FILE"; + public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename"; + + private boolean createSequenceFileName; + private boolean trimSequenceFileName; + private boolean createRawFileName; + private final DateNormalizer dateNormalizer = new DateNormalizer(); + + private static final String SRC_FILE_DEL = "|"; + private Exception exception; + + public FieldHarvester(Configuration configuration) { + this.createSequenceFileName = configuration.getBoolean(LOAD_SEQUENCE_FILE_NAME, true); + this.trimSequenceFileName = configuration.getBoolean(TRIM_SEQUENCE_FILE_NAME, true); + this.createRawFileName = configuration.getBoolean(LOAD_RAW_FILE_NAME, true); + } + + public boolean hasError() { + return null != this.exception; + } + + public Exception getException() { + return this.exception; + } + + /** + * Updates "fields" with extracted, derived, and automatically generated fields. + * + * @param fields + * the Multimap to modify with extracted and generated fields + * @param ingestHelper + * interface to use for field extraction + * @param value + * the record from which the fields will be extracted + * @param offset + * record offset within the source file + * @param splitStart + * the splitStart for the record + */ + public void extractFields(Multimap fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset, + String splitStart) { + // reset exception-in-extraction tracking + this.exception = null; + + // "candidateFields" holds the fields that may eventually be added to "fields" + Multimap candidateFields = null; + try { + // get salvaged fields if getEventFields throws exception + candidateFields = faultTolerantGetEventFields(value, ingestHelper); + + // try adding supplemental fields to candidateFields, whether or not there was an exception + addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields); + } catch (Exception exception) { + this.exception = exception; + } finally { + // Add each "candidateFields" field value to "fields" as long as the field value is without error + addErrorFreeFields(fields, candidateFields); + } + } + + /** + * Calls IngestHelper.getEventFields with value. If an exception is thrown, captures it and attempts to salvage fields from the value. + */ + // package method visibility for EventMapper.getFields only + @Deprecated + Multimap faultTolerantGetEventFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { + try { + // Parse the event into its candidate field names and values using the IngestHelperInterface. + return ingestHelper.getEventFields(value); + } catch (Exception exception) { + // delay throwing the exception + this.exception = exception; + return attemptToSalvageFields(value, ingestHelper); + } + } + + // todo test case where salvage fields are empty + // package method visibility for EventMapper.getFields only + void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper, + Multimap fields) { + addVirtualFields(ingestHelper, fields); + addCompositeFields(ingestHelper, fields); + addLoadDateField(fields); + addFileNameFields(value, offset, splitStart, fields); + applyFieldFilters(ingestHelper, fields); + } + + /* + * Populate the "fields" method parameter with any candidateFields that do not have an Error + */ + private void addErrorFreeFields(Multimap fields, Multimap candidateFields) { + if (null == candidateFields) { + return; + } + Throwable fieldError = null; + for (Map.Entry entry : candidateFields.entries()) { + // noinspection ThrowableResultOfMethodCallIgnored + if (entry.getValue().getError() != null) { + fieldError = entry.getValue().getError(); + } + fields.put(entry.getKey(), entry.getValue()); + } + if (fieldError != null) { + this.exception = new FieldNormalizationError("Failed getting all fields", fieldError); + } + } + + private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also get the virtual fields, if applicable. + if (ingestHelper instanceof VirtualIngest) { + VirtualIngest vHelper = (VirtualIngest) ingestHelper; + Multimap virtualFields = vHelper.getVirtualFields(newFields); + for (Map.Entry v : virtualFields.entries()) + newFields.put(v.getKey(), v.getValue()); + } + } + + private void addCompositeFields(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also get the composite fields, if applicable + if (ingestHelper instanceof CompositeIngest) { + CompositeIngest vHelper = (CompositeIngest) ingestHelper; + Multimap compositeFields = vHelper.getCompositeFields(newFields); + for (String fieldName : compositeFields.keySet()) { + // if this is an overloaded composite field, we are replacing the existing field data + if (vHelper.isOverloadedCompositeField(fieldName)) { + newFields.removeAll(fieldName); + } + newFields.putAll(fieldName, compositeFields.get(fieldName)); + } + } + } + + private void addLoadDateField(Multimap newFields) { + // Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes + long loadDate = now.get(); + NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate)); + // set an indexed field value for use by the date index data type handler + loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate))); + newFields.put(LOAD_DATE_FIELDNAME, loadDateValue); + } + + private void addRawFileField(RawRecordContainer value, Multimap newFields, String seqFileName) { + if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) { + newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName())); + } + } + + private void addOrigFileField(Multimap newFields, long offset, String splitStart, String seqFileName) { + if (null != seqFileName) { + StringBuilder seqFile = new StringBuilder(seqFileName); + + seqFile.append(SRC_FILE_DEL).append(offset); + + if (null != splitStart) { + seqFile.append(SRC_FILE_DEL).append(splitStart); + } + + newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString())); + } + } + + private String getSeqFileName() { + String seqFileName; + seqFileName = NDC.peek(); + + if (trimSequenceFileName) { + seqFileName = StringUtils.substringAfterLast(seqFileName, "/"); + } + return seqFileName; + } + + private void addFileNameFields(RawRecordContainer value, long offset, String splitStart, Multimap newFields) { + String seqFileName = null; + + if (createSequenceFileName) { + seqFileName = getSeqFileName(); + + // place the sequence filename into the event + addOrigFileField(newFields, offset, splitStart, seqFileName); + } + + addRawFileField(value, newFields, seqFileName); + } + + private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also if this helper needs to filter the fields before returning, apply now + if (ingestHelper instanceof FilterIngest) { + FilterIngest fHelper = (FilterIngest) ingestHelper; + fHelper.filter(newFields); + } + } + + /** + * If IngestHelper implements FieldSalvager, get the salvageable fields from value. Otherwise, return an empty Multimap. + */ + private Multimap attemptToSalvageFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { + // If this helper is able, attempt to salvage a subset of the fields + if (null != ingestHelper && ingestHelper instanceof FieldSalvager) { + FieldSalvager salvager = (FieldSalvager) ingestHelper; + try { + Multimap salvagedFields = salvager.getSalvageableEventFields(value); + if (null != salvagedFields) { + return salvagedFields; + } + } catch (Exception salvagerException) { + // Do not overwrite the original exception + if (null == this.exception) { + this.exception = new IllegalStateException("Unexpected state (FieldExpander.exception should be non-null if salvaging", salvagerException); + } else { + // allow original exception (this.exception) to be thrown by caller + log.error("Even salvager threw an exception", salvagerException); + } + } + } + return HashMultimap.create(); + } + + public static class FieldNormalizationError extends Exception { + private static final long serialVersionUID = 1L; + + public FieldNormalizationError(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java new file mode 100644 index 00000000000..0f6d07a2e2c --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java @@ -0,0 +1,24 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; + +/** + * This optional interface is intended to complement the IngestHelperInterface interface's handling of errors that occur within ingest jobs. + * + * One use case is when IngestHelperInterface's getEventFields throws an exception. The getEventFields method will not return a Multimap of field values + * (because it instead threw an exception). Prior to FieldSalvager, this meant that the error tables would not have information on any of the + * RawRecordContainer's field values. + * + * FieldSalvager implementations can attempt to provide a subset of the field values, so that the error tables can have more helpful information about the + * failed record, perhaps aiding troubleshooting efforts. An implementation could return only those field names that are relatively well-structured and + * predictably formatted, very unlikely to cause exceptions while processing. + */ +public interface FieldSalvager { + /** + * @param rawRecordContainer + * @return Multimap containing subset of field values, possibly empty but not null + */ + Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer); +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java index 050d12772dc..6cdafac59c5 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java @@ -20,7 +20,7 @@ import datawave.ingest.data.config.GroupedNormalizedContentInterface; import datawave.ingest.data.config.NormalizedContentInterface; import datawave.ingest.data.config.ingest.IngestHelperInterface; -import datawave.ingest.mapreduce.EventMapper; +import datawave.ingest.mapreduce.FieldHarvester; import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler; import datawave.ingest.mapreduce.handler.edge.define.EdgeDataBundle; import datawave.ingest.mapreduce.handler.edge.define.EdgeDefinition; @@ -500,7 +500,7 @@ public long process(KEYIN key, RawRecordContainer event, Multimap loadDates = fields.get(EventMapper.LOAD_DATE_FIELDNAME); + Collection loadDates = fields.get(FieldHarvester.LOAD_DATE_FIELDNAME); if (!loadDates.isEmpty()) { NormalizedContentInterface nci = loadDates.iterator().next(); Date date = new Date(Long.parseLong(nci.getEventFieldValue())); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java index dde9b6dcb7e..865d52b9297 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java @@ -19,6 +19,7 @@ import datawave.ingest.config.RawRecordContainerImpl; import datawave.ingest.data.config.MarkingsHelper; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Before; import org.junit.Test; @@ -53,8 +54,7 @@ public void setUp() throws Exception { conf.set("samplecsv" + TypeRegistry.INGEST_HELPER, TestCSVIngestHelper.class.getName()); conf.set("samplecsv.reader.class", TestCSVReader.class.getName()); conf.set("samplecsv" + MarkingsHelper.DEFAULT_MARKING, "PUBLIC|PRIVATE"); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); dataType = TypeRegistry.getType("samplecsv"); } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java index c116d4182d8..2a7b325dc50 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java @@ -1,8 +1,7 @@ package datawave.ingest.data.config; -import datawave.ingest.data.TypeRegistry; - import datawave.policy.IngestPolicyEnforcer; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Before; @@ -25,8 +24,7 @@ public void setup() { @Test(expected = IllegalArgumentException.class) public void testInvalidConfig() { DataTypeHelperImpl helper = new DataTypeHelperImpl(); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); helper.setup(conf); } @@ -36,8 +34,7 @@ public void testValidConfig() throws Exception { Assert.assertNotNull(configStream); conf.addResource(configStream); Assert.assertThat(conf.get("data.name"), is("fake")); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); DataTypeHelperImpl helper = new DataTypeHelperImpl(); helper.setup(conf); @@ -54,8 +51,7 @@ public void testDowncaseFields() throws Exception { Assert.assertNotNull(configStream); conf.addResource(configStream); conf.set("fake" + DataTypeHelper.Properties.DOWNCASE_FIELDS, "one,two,three,FOUR"); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); DataTypeHelperImpl helper = new DataTypeHelperImpl(); helper.setup(conf); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java index 82b3a3385e9..5ebef6b0992 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java @@ -21,7 +21,7 @@ public static class NonGroupedInstance implements NormalizedContentInterface { private Map _markings; private Throwable _error; - protected NonGroupedInstance() { + public NonGroupedInstance() { _fieldName = "TestNonGroupedInstance"; _indexedFieldName = "TestIndexedField"; @@ -141,7 +141,7 @@ public static class GroupedInstance implements GroupedNormalizedContentInterface private String _group; private String _subGroup; - protected GroupedInstance() { + public GroupedInstance() { _fieldName = "TestNonGroupedInstance"; _indexedFieldName = "TestIndexedField"; diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java index b34937d2c88..245e7486519 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java @@ -7,6 +7,7 @@ import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelper.Properties; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Before; @@ -21,10 +22,7 @@ public void setup() { conf = new Configuration(); conf.set(Properties.DATA_NAME, "test"); conf.set("test" + TypeRegistry.INGEST_HELPER, TestBaseIngestHelper.class.getName()); - - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); - + TypeRegistryTestSetup.resetTypeRegistry(conf); } @Test(expected = IllegalArgumentException.class) diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java new file mode 100644 index 00000000000..3966bb8c9a4 --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java @@ -0,0 +1,246 @@ +package datawave.ingest.data.config.ingest; + +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.config.DataTypeHelperImpl; +import datawave.ingest.data.config.MaskedFieldHelper; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.policy.IngestPolicyEnforcer; +import org.apache.hadoop.conf.Configuration; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +// For testing only, this implementation fulfills the interface contract with methods that throw UnsupportedOperationException. +// It is is extendable when implementations are needed for specific tests. +public class MinimalistIngestHelperInterfaceImpl implements IngestHelperInterface { + @Override + public Type getType() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public IngestPolicyEnforcer getPolicyEnforcer() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Configuration conf) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Set getShardExclusions() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShardExcluded(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getEventFields(RawRecordContainer value) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap normalizeMap(Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap normalize(Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public List> getDataTypes(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public String getNormalizedMaskedValue(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMappings() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public String get(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean getDeleteMode() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean getReplaceMalformedUTF8() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmbeddedHelperMaskedFieldHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public MaskedFieldHelper getEmbeddedHelperAsMaskedFieldHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public DataTypeHelperImpl getEmbeddedHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReverseIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isIndexOnlyField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addShardExclusionField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addReverseIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addIndexOnlyField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCompositeField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOverloadedCompositeField(String fieldName) { + return true; + } + + @Override + public boolean isNormalizedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addNormalizedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAliasedIndexField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public HashSet getAliasesForIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDataTypeField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFieldDefinitions() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Map getCompositeFieldSeparators() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isVirtualIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Map getVirtualNameAndIndex(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean shouldHaveBeenIndexed(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean shouldHaveBeenReverseIndexed(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java index acc39e28839..ec8e5f12b5f 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java @@ -7,6 +7,7 @@ import datawave.data.normalizer.NormalizationException; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Before; @@ -32,8 +33,7 @@ public String convertFieldRegex(String fieldName, String fieldRegex) { public void setUp() { Configuration conf = new Configuration(); conf.set("test" + TypeRegistry.INGEST_HELPER, TestBaseIngestHelper.class.getName()); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); normalizer.setup(TypeRegistry.getType("test"), "test", conf); } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java new file mode 100644 index 00000000000..bc09d9eb244 --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java @@ -0,0 +1,233 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.config.BaseNormalizedContent; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.ingest.ContentBaseIngestHelper; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.writer.ContextWriter; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.easymock.EasyMockRule; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This test class aims to test only the scenarios in which EventMapper encounters an error, to verify its behavior with the FieldSalvager interface. It's + * modeled after EventMapperTest. + */ +public class EventMapperSalvageFieldsOnErrorTest { + private static final String[] SALVAGED_FIELDS = {"ISBN", "author", "borrower", "dueDate", "libraryBranch"}; + + @Rule + public EasyMockRule easyMockRule = new EasyMockRule(this); + + @Mock + private Mapper.Context mapContext; + + private Configuration conf; + private SimpleRawRecord record; + private EventMapper eventMapper; + + public void setupTest(String dataTypeHandler) throws Exception { + eventMapper = new EventMapper<>(); + + conf = new Configuration(); + conf.setClass(EventMapper.CONTEXT_WRITER_CLASS, TestContextWriter.class, ContextWriter.class); + EventMapperTest.setupMapContextMock(mapContext, conf); + + record = IngestTestSetup.createRecord(dataTypeHandler, conf); + } + + /** + * FakeSalvagingIngestHelper: - always throws an exception when getEventFields is called, to ensure error handling code path is reached within EventMapper. + * - allows for anonymous inline helper creation. + */ + public static abstract class FakeSalvagingIngestHelper extends ContentBaseIngestHelper implements FieldSalvager { + @Override + public Multimap getEventFields(RawRecordContainer value) { + throw new RuntimeException("Simulated exception while getting event fields for value."); + } + } + + /** + * SalvagingDataTypeHandler provides a FieldSalvager implementation that deserializes rawData as a Map<String, String>, then returns a Multimap + * containing only the SALVAGED_FIELDS that were encountered, skipping all others. + */ + public static class SalvagingDataTypeHandler extends SimpleDataTypeHandler { + @Override + public IngestHelperInterface getHelper(Type datatype) { + FakeSalvagingIngestHelper fakeSalvagingIngestHelper = new FakeSalvagingIngestHelper() { + @Override + public Multimap getSalvageableEventFields(RawRecordContainer value) { + HashMultimap fields = HashMultimap.create(); + byte[] rawData = value.getRawData(); + + try (ByteArrayInputStream bytArrayInputStream = new ByteArrayInputStream(rawData); + ObjectInputStream objectInputStream = new ObjectInputStream(bytArrayInputStream);) { + Map deserializedData = (Map) objectInputStream.readObject(); + for (String fieldToSalvage : SALVAGED_FIELDS) { + String fieldValue = deserializedData.get(fieldToSalvage); + if (null != fieldValue) { + try { + fields.put(fieldToSalvage, new BaseNormalizedContent(fieldToSalvage, fieldValue)); + } catch (Exception fieldException) { + // skip this field and proceed to the next + } + } + } + } catch (Exception e) { + return fields; + } + return fields; + } + + @Override + public Multimap getEventFields(RawRecordContainer value) { + throw new RuntimeException("Simulated exception while getting event fields for value."); + } + }; + return fakeSalvagingIngestHelper; + } + } + + /** + * NullSalvagingSimpleDataTypeHandler provides a FieldSalvager implementation that always returns null. + */ + public static class NullSalvagingSimpleDataTypeHandler extends SimpleDataTypeHandler { + @Override + public IngestHelperInterface getHelper(Type datatype) { + return new FakeSalvagingIngestHelper() { + @Override + public Multimap getSalvageableEventFields(RawRecordContainer value) { + return null; + } + }; + } + } + + @After + public void checkMock() { + verify(mapContext); + } + + @Test + public void shouldSalvageAllFields() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Add both salvageable and unsalvageable fields as rawData + HashMap fieldValues = createMapOfSalveagableFieldValues(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect only the salvageable fields, each exactly once + assertEquals(written.toString(), SALVAGED_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + for (String expectedFieldName : SALVAGED_FIELDS) { + assertTrue(fieldNameOccurrences.toString(), fieldNameOccurrences.containsKey(expectedFieldName)); + assertEquals(1, (int) fieldNameOccurrences.get(expectedFieldName)); + } + } + + @Test + public void shouldTolerateNullSalvagedFieldsMap() throws Exception { + // Use a DataTypeHandler that provides a FieldSalvager that always returns null + setupTest(NullSalvagingSimpleDataTypeHandler.class.getName()); + + // Create a record with salvageable and unsalvageable fields + HashMap fieldValues = createMapOfSalveagableFieldValues(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect nothing to be salvaged + assertEquals(written.toString(), 0, written.size()); + } + + @Test + public void shouldIgnoreNonSalvagedFields() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Add only unsalvagable fields + HashMap fieldValues = new HashMap<>(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect all of the salvageable fields to occur once + assertEquals(written.toString(), 0, written.size()); + } + + @Test + public void shouldTolerateErrorInSalvager() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Set raw data to an invalid format, causing an error in the FieldSalvager implementation + record.setRawData("Not a map".getBytes()); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect all of the salvageable fields to occur once + assertEquals(written.toString(), 0, written.size()); + } + + private void runMapper() throws IOException, InterruptedException { + eventMapper.setup(mapContext); + eventMapper.map(new LongWritable(1), record, mapContext); + eventMapper.cleanup(mapContext); + } + + private HashMap createMapOfSalveagableFieldValues() { + HashMap fieldValues = new HashMap<>(); + fieldValues.put("ISBN", "0-123-00000-1"); + fieldValues.put("dueDate", "20211126"); + fieldValues.put("author", "Henry Hope Reed"); + fieldValues.put("borrower", "Edward Clark Potter"); + fieldValues.put("libraryBranch", "Grand Central"); + return fieldValues; + } + + private void addUnsalvageableFieldsToMap(HashMap fieldValues) { + fieldValues.put("genre", "FICTION"); + fieldValues.put("format", "Hardcover"); + } + + /** + * Extracts field names from persisted data, creating a map containing the number of occurrences per field name. + */ + private Map countPersistedFieldsByName(Multimap entries) { + Map result = new HashMap<>(); + for (BulkIngestKey key : entries.keys()) { + String fieldName = key.getKey().getColumnFamily().toString(); + // create or increment + result.merge(fieldName, 1, Integer::sum); + } + return result; + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java index a8dfc3d1586..bbed1a6e19a 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java @@ -12,6 +12,7 @@ import datawave.ingest.mapreduce.job.metrics.MetricsConfiguration; import datawave.ingest.mapreduce.job.metrics.TestEventCountMetricsReceiver; import datawave.ingest.mapreduce.job.writer.ContextWriter; +import datawave.util.TypeRegistryTestSetup; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.Map; +// Also see EventMapperSalvageFieldsOnErrorTest public class EventMapperTest { @Rule @@ -51,12 +53,11 @@ public void setUp() throws Exception { conf = new Configuration(); conf.setClass(EventMapper.CONTEXT_WRITER_CLASS, TestContextWriter.class, ContextWriter.class); - Type type = new Type("file", null, null, new String[] {SimpleDataTypeHandler.class.getName()}, 10, null); - Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, new String[] {SimpleDataTypeHandler.class.getName()}, 20, null); + String[] defaultDataTypeHandlers = {SimpleDataTypeHandler.class.getName()}; - TypeRegistry registry = TypeRegistry.getInstance(conf); - registry.put(type.typeName(), type); - registry.put(errorType.typeName(), errorType); + Type type = new Type("file", null, null, defaultDataTypeHandlers, 10, null); + Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, defaultDataTypeHandlers, 20, null); + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type, errorType); Multimap fields = HashMultimap.create(); fields.put("fileExtension", new BaseNormalizedContent("fileExtension", "gz")); @@ -64,25 +65,18 @@ public void setUp() throws Exception { SimpleDataTypeHelper.registerFields(fields); - record = new SimpleRawRecord(); - record.setRawFileTimestamp(eventTime); - record.setDataType(type); - record.setDate(eventTime); - record.setRawFileName("/some/filename"); - record.setRawData("some data".getBytes()); - record.generateId(null); + record = IngestTestSetup.createBasicRecord(eventTime, type); - errorRecord = new SimpleRawRecord(); + errorRecord = IngestTestSetup.createBasicRecord(eventTime, type); errorRecord.setRawFileTimestamp(0); - errorRecord.setDataType(type); - errorRecord.setDate(eventTime); - errorRecord.setRawFileName("/some/filename"); - errorRecord.setRawData("some data".getBytes()); - errorRecord.generateId(null); errorRecord.setRawFileName(""); errorRecord.addError("EVENT_DATE_MISSING"); errorRecord.setFatalError(true); + EventMapperTest.setupMapContextMock(mapContext, conf); + } + + static void setupMapContextMock(Mapper.Context mapContext, Configuration conf) throws IOException, InterruptedException { expect(mapContext.getConfiguration()).andReturn(conf).anyTimes(); mapContext.progress(); @@ -209,7 +203,7 @@ private Map.Entry getMetric(Multimap w } private Map.Entry getRawFileName(Multimap written) { - return getFieldEntry(written, EventMapper.RAW_FILE_FIELDNAME.toString()); + return getFieldEntry(written, FieldHarvester.RAW_FILE_FIELDNAME.toString()); } private Map.Entry getFieldEntry(Multimap written, String field) { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java new file mode 100644 index 00000000000..0b65036a5bc --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java @@ -0,0 +1,464 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValueTest; +import datawave.ingest.data.config.ingest.CompositeIngest; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.MinimalistIngestHelperInterfaceImpl; +import datawave.ingest.data.config.ingest.VirtualIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.NDC; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; + +public class FieldHarvesterTest { + private static final String SAMPLE_FIELD_NAME = "SAMPLE_FIELD_NAME"; + private static final String COMPOSITE_FIELD = "COMPOSITE_FIELD"; + private static final String VIRTUAL_FIELD = "VIRTUAL_FIELD"; + private static final String SEQ_FILENAME_ONLY = "InputFile.seq"; + private static final String SEQ_FILENAME_AND_DIR = "/input/directory/" + SEQ_FILENAME_ONLY; + private static final int NUM_SUPPLEMENTAL_FIELDS = 3; + private static final String LOAD_DATE = "LOAD_DATE"; + private static final String ORIG_FILE = "ORIG_FILE"; + private static final String RAW_FILE = "RAW_FILE"; + + private final Multimap fields = HashMultimap.create(); + private FieldHarvester fieldHarvester; + private long offset = 0; + private String splitStart = null; + private RawRecordContainer value; + + @Before + public void before() { + value = IngestTestSetup.createBasicRecord(); + value.setRawData(null); // rawData is ignored by below implementations of getEventFields + fieldHarvester = new FieldHarvester(new Configuration()); + NDC.push(SEQ_FILENAME_AND_DIR); + } + + @After + public void after() { + NDC.pop(); + } + + @Test + public void reusableFieldHarvester() { + // The first call to extractFields produces an error, adding only supplemental fields + fieldHarvester.extractFields(fields, null, value, offset, splitStart); + + // Verify error is captured (NullPointerException because null provided as the IngestHelperInterface param) + assertExceptionCaptured(fieldHarvester, NullPointerException.class); + assertContainsOnlySupplementalFields(); + + // The second extractFields calls an IngestHelper that doesn't error + // There should be no residue from the prior call (prior errors cleared, prior fields cleared) + // field map with single field and value + fields.clear(); + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + assertNoErrors(fieldHarvester); + + // The third call is just like the first call + fields.clear(); + fieldHarvester.extractFields(fields, null, value, offset, splitStart); + + // Verify error is captured (NullPointerException because null provided as IngestHelperInterface) + assertExceptionCaptured(fieldHarvester, NullPointerException.class); + assertContainsOnlySupplementalFields(); + } + + @Test + public void disableSeqFileNameCreation() { + // Configuration disables seq file name creation + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.LOAD_SEQUENCE_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(RAW_FILE)); + // excluded due to config + Assert.assertFalse(fields.containsKey(ORIG_FILE)); + Assert.assertEquals(fields.toString(), 3, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void disableTrimmingSeqFileName() { + // Configuration disables trimming of seq file name + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.TRIM_SEQUENCE_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + Collection result = fields.get(ORIG_FILE); + NormalizedContentInterface fieldValue = result.iterator().next(); + Assert.assertEquals(SEQ_FILENAME_AND_DIR + "|0", fieldValue.getEventFieldValue()); + Assert.assertEquals(SEQ_FILENAME_AND_DIR + "|0", fieldValue.getIndexedFieldValue()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void enableTrimmingSeqFileName() { + // Default configuration enables trimming of seq file name + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + Collection result = fields.get(ORIG_FILE); + NormalizedContentInterface fieldValue = result.iterator().next(); + Assert.assertEquals(SEQ_FILENAME_ONLY + "|0", fieldValue.getEventFieldValue()); + Assert.assertEquals(SEQ_FILENAME_ONLY + "|0", fieldValue.getIndexedFieldValue()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void disableRawFileName() { + // Configuration disables raw file name creation + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.LOAD_RAW_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(ORIG_FILE)); + // excluded due to config + Assert.assertFalse(fields.containsKey(RAW_FILE)); + Assert.assertEquals(fields.toString(), 3, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void addsVirtualFields() { + // Ensure that a virtual field is added + Multimap fields = createOneFieldMultiMap(); + IngestHelperInterface ingestHelper = new BasicWithVirtualFieldsIngestHelper(fields); + + // field map with single field and value + fieldHarvester.extractFields(this.fields, ingestHelper, value, offset, splitStart); + + // Verify field returned + Assert.assertTrue(this.fields.containsKey(SAMPLE_FIELD_NAME)); + // Verify field is used for virtual field creation + Assert.assertTrue(this.fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(this.fields.toString(), 5, this.fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void addsCompositeFields() { + // cause exception in getEventFields, get salvaged fields and ensure they're used for composite field creation + Multimap salvagableFields = createOneFieldMultiMap(); + IngestHelperInterface ingestHelper = new BasicWithCompositeFieldsIngestHelper(salvagableFields); + + // field map with single field and value + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // Verify field returned + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + // Verify field is used for composite + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + COMPOSITE_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), 5, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void supplementsSalvagedFields() { + // cause exception in getEventFields, get salvaged fields and ensure they're used for virtual and composite + Multimap salvagableFields = createOneFieldMultiMap(); + ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(salvagableFields); + + // field map with single field and value + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // Verify salvaged fields returned + // Verify salvaged fields are used for virtual, composite + // Not virtual field is used by composite field implementation + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + COMPOSITE_FIELD)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD + COMPOSITE_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), 7, fields.size()); + + // Verify an exception was detected (for getEventFields) + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void emptySalvagedFields() { + // cause exception in getEventFields, causing retrieval of empty multimap of salvaged fields + ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(HashMultimap.create()); + + // field map with empty fields + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // empty salvaged fields + assertContainsOnlySupplementalFields(); + + // Verify an exception was detected (for getEventFields) + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void doubleException() { + // exception in getEventFields and in salvager + fieldHarvester.extractFields(fields, new DoubleErrorIngestHelper(), value, offset, splitStart); + + // Verify it contains expected fields + assertContainsOnlySupplementalFields(); + + // Verify the original exception was captured + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void extractFields() { + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + assertNoErrors(fieldHarvester); + } + + @Test + public void erroredFieldExcluded() { + // create a field containing a field error + NormalizedFieldAndValueTest.NonGroupedInstance fieldWithError = new NormalizedFieldAndValueTest.NonGroupedInstance(); + fieldWithError.setError(new Exception()); + + // field map contains a field containing a field error + Multimap multiMap = HashMultimap.create(); + multiMap.put(SAMPLE_FIELD_NAME, fieldWithError); + + fieldHarvester.extractFields(fields, new BasicIngestHelper(multiMap), this.value, offset, splitStart); + + // Verify fields contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + // Verify there was a FieldNormalizationError due to the field error + assertExceptionCaptured(this.fieldHarvester, FieldHarvester.FieldNormalizationError.class); + } + + @Test + public void nullIngestHelper() { + // field map with single field and value + fieldHarvester.extractFields(fields, null, value, offset, splitStart); + + // Verify it contains expected fields + assertContainsOnlySupplementalFields(); + + // Verify it captured the null pointer exception + assertExceptionCaptured(this.fieldHarvester, NullPointerException.class); + } + + private void assertNoErrors(FieldHarvester fieldHarvester) { + Assert.assertFalse("Unexpected exception: " + fieldHarvester.getException(), fieldHarvester.hasError()); + Assert.assertNull(fieldHarvester.getException()); + } + + private void assertExceptionCaptured(FieldHarvester fieldHarvester, Class exceptionClass) { + Assert.assertTrue(fieldHarvester.hasError()); + Assert.assertEquals(exceptionClass, fieldHarvester.getException().getClass()); + } + + private Multimap createOneFieldMultiMap() { + Multimap multiMap = HashMultimap.create(); + multiMap.put(SAMPLE_FIELD_NAME, new NormalizedFieldAndValueTest.NonGroupedInstance()); + return multiMap; + } + + private void assertContainsSupplementalFields(Multimap fields) { + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(ORIG_FILE)); + Assert.assertTrue(fields.containsKey(RAW_FILE)); + } + + private void assertContainsOnlySupplementalFields() { + assertContainsSupplementalFields(fields); + // and only supplemental + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS, fields.size()); + } + + private static class DoubleErrorIngestHelper extends MinimalistIngestHelperInterfaceImpl implements FieldSalvager { + @Override + public Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer) { + throw new RuntimeException(); + } + } + + private static class BasicIngestHelper extends MinimalistIngestHelperInterfaceImpl { + private final Multimap multiMap; + + public BasicIngestHelper(Multimap multiMap) { + this.multiMap = multiMap; + } + + public Multimap getEventFields(RawRecordContainer event) { + return multiMap; + } + } + + private static class BasicWithCompositeFieldsIngestHelper extends BasicIngestHelper implements CompositeIngest { + public BasicWithCompositeFieldsIngestHelper(Multimap multiMap) { + super(multiMap); + } + + @Override + public void setCompositeFieldDefinitions(Multimap compositeFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + COMPOSITE_FIELD, entry.getValue()); + } + return compositeFields; + } + } + + private static class BasicWithVirtualFieldsIngestHelper extends BasicIngestHelper implements VirtualIngest { + public BasicWithVirtualFieldsIngestHelper(Multimap multiMap) { + super(multiMap); + } + + @Override + public Map getVirtualFieldDefinitions() { + return null; + } + + @Override + public void setVirtualFieldDefinitions(Map virtualFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public String getDefaultVirtualFieldSeparator() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDefaultVirtualFieldSeparator(String separator) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getVirtualFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + VIRTUAL_FIELD, entry.getValue()); + } + return compositeFields; + } + } + + private static class ErroringSalvagableIngestHelper extends MinimalistIngestHelperInterfaceImpl implements VirtualIngest, CompositeIngest, FieldSalvager { + private final Multimap multiMap; + + ErroringSalvagableIngestHelper(Multimap multiMap) { + this.multiMap = multiMap; + } + + @Override + public Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer) { + return this.multiMap; + } + + @Override + public void setCompositeFieldDefinitions(Multimap compositeFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + COMPOSITE_FIELD, entry.getValue()); + } + return compositeFields; + } + + @Override + public Map getVirtualFieldDefinitions() { + return null; + } + + @Override + public void setVirtualFieldDefinitions(Map virtualFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public String getDefaultVirtualFieldSeparator() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDefaultVirtualFieldSeparator(String separator) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getVirtualFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + VIRTUAL_FIELD, entry.getValue()); + } + return compositeFields; + } + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java new file mode 100644 index 00000000000..f2392cdc0dc --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java @@ -0,0 +1,65 @@ +package datawave.ingest.mapreduce; + +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.TypeRegistry; +import datawave.util.TypeRegistryTestSetup; +import org.apache.hadoop.conf.Configuration; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +/** + * Test class containing some reusable test code for creating a test Type and test records. + */ +public class IngestTestSetup { + + static SimpleRawRecord createRecord(String dataTypeHandler, Configuration conf) { + Type type = setupTypeAndTypeRegistry(dataTypeHandler, conf); + long eventTime = System.currentTimeMillis(); + return createBasicRecord(eventTime, type); + } + + public static RawRecordContainer createBasicRecord() { + Type type = IngestTestSetup.createBasicType(); + long eventTime = System.currentTimeMillis(); + return createBasicRecord(eventTime, type); + } + + static Type setupTypeAndTypeRegistry(String dataTypeHandler, Configuration conf) { + String[] defaultDataTypeHandlers = {dataTypeHandler}; + Type type = new Type("file", null, null, defaultDataTypeHandlers, 10, null); + + String[] errorDataTypeHandlers = {SimpleDataTypeHandler.class.getName()}; + Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, errorDataTypeHandlers, 20, null); + + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type, errorType); + return type; + } + + static Type createBasicType() { + return createBasicType(new String[] {}); + } + + static Type createBasicType(String[] defaultDataTypeHandlers) { + return new Type("file", null, null, defaultDataTypeHandlers, 10, null); + } + + static SimpleRawRecord createBasicRecord(long eventTime, Type type) { + SimpleRawRecord result = new SimpleRawRecord(); + result.setDate(eventTime); + result.setRawFileTimestamp(eventTime); + result.setDataType(type); + result.setRawFileName("/some/filename"); + result.setRawData("some data".getBytes()); + result.generateId(null); + return result; + } + + static byte[] objectToRawBytes(Object map) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + new ObjectOutputStream(outputStream).writeObject(map); + return outputStream.toByteArray(); + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java index 524ff956f08..d522237f6ef 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java @@ -16,6 +16,7 @@ import datawave.ingest.table.config.DateIndexTableConfigHelper; import datawave.policy.IngestPolicyEnforcer; +import datawave.util.TypeRegistryTestSetup; import datawave.util.TableName; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -58,8 +59,7 @@ public void setup() throws Exception { conf.set("testdatatype.date.index.type.to.field.map", "ACTIVITY=ACTIVITY_DATE,LOADED=LOAD_DATE"); conf.set("all" + Properties.INGEST_POLICY_ENFORCER_CLASS, IngestPolicyEnforcer.NoOpIngestPolicyEnforcer.class.getName()); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); handler = new DateIndexDataTypeHandler<>(); handler.setup(new TaskAttemptContextImpl(conf, new TaskAttemptID())); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java index c70791f0fe0..81bedcc9829 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java @@ -8,7 +8,6 @@ import datawave.ingest.config.RawRecordContainerImpl; import datawave.ingest.data.RawRecordContainer; import datawave.ingest.data.Type; -import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.BaseNormalizedContent; import datawave.ingest.data.config.GroupedNormalizedContentInterface; import datawave.ingest.data.config.NormalizedContentInterface; @@ -26,6 +25,7 @@ import datawave.ingest.test.StandaloneTaskAttemptContext; import datawave.ingest.time.Now; import datawave.metadata.protobuf.EdgeMetadata; +import datawave.util.TypeRegistryTestSetup; import datawave.util.TableName; import datawave.util.time.DateHelper; import org.apache.accumulo.core.data.Key; @@ -124,13 +124,11 @@ public static void tearDown() { @Before public void setup() { - TypeRegistry.reset(); conf = new Configuration(); conf.addResource(ClassLoader.getSystemResource("config/all-config.xml")); conf.addResource(ClassLoader.getSystemResource("config/edge-ingest-config.xml")); conf.addResource(ClassLoader.getSystemResource("config/metadata-config.xml")); - TypeRegistry registry = TypeRegistry.getInstance(conf); - registry.put(type.typeName(), type); + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type); } private RawRecordContainer getEvent(Configuration conf) { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java index 90e1c7b2bfe..a0aadbb90ea 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java @@ -1,9 +1,9 @@ package datawave.ingest.mapreduce.job; import datawave.common.test.logging.CommonTestAppender; -import datawave.ingest.data.TypeRegistry; import datawave.data.hash.UID; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; +import datawave.util.TypeRegistryTestSetup; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; @@ -170,8 +170,6 @@ public void setup() { uutLogger.addAppender(uutAppender); Logger.getLogger(AccumuloOutputFormat.class).addAppender(uutAppender); - - TypeRegistry.reset(); } @After @@ -513,7 +511,7 @@ public void testRecordWriterWriteWithUpdatesAndTypes() throws IOException, Inter conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -573,7 +571,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTyped() throws IOExcept conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -633,7 +631,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithUID() throws I conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -695,7 +693,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithBadUID() throw conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -757,7 +755,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithoutUID() throw conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); diff --git a/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java b/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java new file mode 100644 index 00000000000..1533402ce0f --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java @@ -0,0 +1,20 @@ +package datawave.util; + +import datawave.ingest.data.Type; +import datawave.ingest.data.TypeRegistry; +import org.apache.hadoop.conf.Configuration; + +public class TypeRegistryTestSetup { + public static TypeRegistry resetTypeRegistry(Configuration conf) { + TypeRegistry.reset(); + return TypeRegistry.getInstance(conf); + } + + public static TypeRegistry resetTypeRegistryWithTypes(Configuration conf, Type... types) { + TypeRegistry registry = resetTypeRegistry(conf); + for (Type type : types) { + registry.put(type.typeName(), type); + } + return registry; + } +} From 863842839349f9398350983f492613db7accd9ba Mon Sep 17 00:00:00 2001 From: Matthew Peterson Date: Mon, 28 Feb 2022 17:21:31 +0000 Subject: [PATCH 2/8] Fix #1438: remove getFields method after researching extensions --- .../datawave/ingest/mapreduce/EventMapper.java | 17 ----------------- .../ingest/mapreduce/FieldHarvester.java | 12 ++++-------- 2 files changed, 4 insertions(+), 25 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java index ba37b2e6538..d2db4d4e5cb 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java @@ -717,23 +717,6 @@ private void updateMetrics(RawRecordContainer value, Multimap getFields(RawRecordContainer value, DataTypeHandler handler) throws Exception { - Multimap fields = fieldHarvester.faultTolerantGetEventFields(value, handler.getHelper(value.getDataType())); - if (fieldHarvester.hasError()) { - throw new Exception(fieldHarvester.getException()); - } - fieldHarvester.addSupplementalFields(value, offset, splitStart, handler.getHelper(value.getDataType()), fields); - if (fieldHarvester.hasError()) { - throw new Exception(fieldHarvester.getException()); - } - return fields; - } - @SuppressWarnings("unchecked") public void executeHandler(K1 key, RawRecordContainer event, Multimap fields, DataTypeHandler handler, Context context) throws Exception { diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java index 8893fbeb880..46dde6ec478 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java @@ -26,9 +26,9 @@ public class FieldHarvester { public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE"; public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE"; + public static final String RAW_FILE_FIELDNAME = "RAW_FILE"; public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename"; public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename"; - public static final String RAW_FILE_FIELDNAME = "RAW_FILE"; public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename"; private boolean createSequenceFileName; @@ -83,7 +83,7 @@ public void extractFields(Multimap fields, In } catch (Exception exception) { this.exception = exception; } finally { - // Add each "candidateFields" field value to "fields" as long as the field value is without error + // Add each "candidateFields" entry to "fields" as long as the field value is without error addErrorFreeFields(fields, candidateFields); } } @@ -91,9 +91,7 @@ public void extractFields(Multimap fields, In /** * Calls IngestHelper.getEventFields with value. If an exception is thrown, captures it and attempts to salvage fields from the value. */ - // package method visibility for EventMapper.getFields only - @Deprecated - Multimap faultTolerantGetEventFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { + private Multimap faultTolerantGetEventFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { try { // Parse the event into its candidate field names and values using the IngestHelperInterface. return ingestHelper.getEventFields(value); @@ -104,9 +102,7 @@ Multimap faultTolerantGetEventFields(RawRecor } } - // todo test case where salvage fields are empty - // package method visibility for EventMapper.getFields only - void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper, + private void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper, Multimap fields) { addVirtualFields(ingestHelper, fields); addCompositeFields(ingestHelper, fields); From 5c7359de6227f17f4f1d373a327958849d3ad395 Mon Sep 17 00:00:00 2001 From: Matthew Peterson Date: Mon, 28 Feb 2022 18:31:31 +0000 Subject: [PATCH 3/8] Fix #1438: consolidate FieldHarvester methods and adjust tests --- .../ingest/mapreduce/EventMapper.java | 3 - .../ingest/mapreduce/FieldHarvester.java | 163 ++++++++++-------- .../ingest/mapreduce/FieldHarvesterTest.java | 49 ++++-- 3 files changed, 121 insertions(+), 94 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java index d2db4d4e5cb..9b2db49a0cd 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java @@ -686,9 +686,6 @@ public void processEvent(K1 key, RawRecordContainer value, List fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset, - String splitStart) { + String splitStart) throws Exception { // reset exception-in-extraction tracking - this.exception = null; + this.originalException = null; + + // "candidateFields" holds the fields that will eventually be added to "fields" + Multimap candidateFields; - // "candidateFields" holds the fields that may eventually be added to "fields" - Multimap candidateFields = null; try { - // get salvaged fields if getEventFields throws exception - candidateFields = faultTolerantGetEventFields(value, ingestHelper); - - // try adding supplemental fields to candidateFields, whether or not there was an exception + // parse the record into its candidate field names and values using the IngestHelperInterface. + candidateFields = ingestHelper.getEventFields(value); + } catch (Exception exception) { + // delay throwing the exception to attempt salvaging + this.originalException = exception; + candidateFields = attemptToSalvageFields(value, ingestHelper); + } + + try { + // try adding supplemental fields to candidateFields, whether or not they were salvaged addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields); } catch (Exception exception) { - this.exception = exception; - } finally { - // Add each "candidateFields" entry to "fields" as long as the field value is without error - addErrorFreeFields(fields, candidateFields); + if (null == this.originalException) { + this.originalException = exception; + } else { + // preserve original exception and log the latest exception + log.error(exception); + } } + + // add candidateFields to fields, even if there was an error + // identify if any individual fields contain an error + addFieldsAndDetectFieldErrors(fields, candidateFields); + + if (null != this.originalException) { + throw new Exception("An exception was encountered during field harvesting", originalException); + } + } + + @VisibleForTesting + boolean hasError() { + return null != this.originalException; + } + + @VisibleForTesting + Exception getOriginalException() { + return this.originalException; } /** - * Calls IngestHelper.getEventFields with value. If an exception is thrown, captures it and attempts to salvage fields from the value. + * If IngestHelper implements FieldSalvager, get the salvageable fields from value. Otherwise, return an empty Multimap. */ - private Multimap faultTolerantGetEventFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { - try { - // Parse the event into its candidate field names and values using the IngestHelperInterface. - return ingestHelper.getEventFields(value); - } catch (Exception exception) { - // delay throwing the exception - this.exception = exception; - return attemptToSalvageFields(value, ingestHelper); + private Multimap attemptToSalvageFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { + // If this helper is able, attempt to salvage a subset of the fields + if (null != ingestHelper && ingestHelper instanceof FieldSalvager) { + FieldSalvager salvager = (FieldSalvager) ingestHelper; + try { + Multimap salvagedFields = salvager.getSalvageableEventFields(value); + if (null != salvagedFields) { + return salvagedFields; + } + } catch (Exception salvagerException) { + // Do not overwrite the original exception + if (null == this.originalException) { + this.originalException = new IllegalStateException("Unexpected state (FieldExpander.exception should be non-null if salvaging", + salvagerException); + } else { + // allow original exception (this.exception) to be thrown by caller + log.error("Even salvager threw an exception", salvagerException); + } + } } + return HashMultimap.create(); } private void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper, @@ -111,29 +148,9 @@ private void addSupplementalFields(RawRecordContainer value, long offset, String applyFieldFilters(ingestHelper, fields); } - /* - * Populate the "fields" method parameter with any candidateFields that do not have an Error - */ - private void addErrorFreeFields(Multimap fields, Multimap candidateFields) { - if (null == candidateFields) { - return; - } - Throwable fieldError = null; - for (Map.Entry entry : candidateFields.entries()) { - // noinspection ThrowableResultOfMethodCallIgnored - if (entry.getValue().getError() != null) { - fieldError = entry.getValue().getError(); - } - fields.put(entry.getKey(), entry.getValue()); - } - if (fieldError != null) { - this.exception = new FieldNormalizationError("Failed getting all fields", fieldError); - } - } - private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap newFields) { // Also get the virtual fields, if applicable. - if (ingestHelper instanceof VirtualIngest) { + if (null != newFields && ingestHelper instanceof VirtualIngest) { VirtualIngest vHelper = (VirtualIngest) ingestHelper; Multimap virtualFields = vHelper.getVirtualFields(newFields); for (Map.Entry v : virtualFields.entries()) @@ -143,7 +160,7 @@ private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap newFields) { // Also get the composite fields, if applicable - if (ingestHelper instanceof CompositeIngest) { + if (null != newFields && ingestHelper instanceof CompositeIngest) { CompositeIngest vHelper = (CompositeIngest) ingestHelper; Multimap compositeFields = vHelper.getCompositeFields(newFields); for (String fieldName : compositeFields.keySet()) { @@ -217,31 +234,33 @@ private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap attemptToSalvageFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { - // If this helper is able, attempt to salvage a subset of the fields - if (null != ingestHelper && ingestHelper instanceof FieldSalvager) { - FieldSalvager salvager = (FieldSalvager) ingestHelper; - try { - Multimap salvagedFields = salvager.getSalvageableEventFields(value); - if (null != salvagedFields) { - return salvagedFields; - } - } catch (Exception salvagerException) { - // Do not overwrite the original exception - if (null == this.exception) { - this.exception = new IllegalStateException("Unexpected state (FieldExpander.exception should be non-null if salvaging", salvagerException); - } else { - // allow original exception (this.exception) to be thrown by caller - log.error("Even salvager threw an exception", salvagerException); - } + private void addFieldsAndDetectFieldErrors(Multimap fields, Multimap candidateFields) { + if (null == candidateFields) { + return; + } + Throwable fieldError = null; + for (Map.Entry entry : candidateFields.entries()) { + // noinspection ThrowableResultOfMethodCallIgnored + if (entry.getValue().getError() != null) { + fieldError = entry.getValue().getError(); + } + fields.put(entry.getKey(), entry.getValue()); + } + if (fieldError != null) { + if (null == this.originalException) { + this.originalException = new FieldNormalizationError("Failed getting all fields", fieldError); + } else { + // preserve original exception + log.error(originalException); } } - return HashMultimap.create(); } public static class FieldNormalizationError extends Exception { + private static final long serialVersionUID = 1L; public FieldNormalizationError(String message, Throwable cause) { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java index 0b65036a5bc..0e92fa8a893 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java @@ -50,9 +50,9 @@ public void after() { } @Test - public void reusableFieldHarvester() { + public void reusableFieldHarvester() throws Exception { // The first call to extractFields produces an error, adding only supplemental fields - fieldHarvester.extractFields(fields, null, value, offset, splitStart); + exceptionSwallowingExtractFields(fieldHarvester, fields, null, value, offset, splitStart); // Verify error is captured (NullPointerException because null provided as the IngestHelperInterface param) assertExceptionCaptured(fieldHarvester, NullPointerException.class); @@ -70,17 +70,28 @@ public void reusableFieldHarvester() { Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); assertNoErrors(fieldHarvester); - // The third call is just like the first call + // The third call is just like the first call, throwing an exception fields.clear(); - fieldHarvester.extractFields(fields, null, value, offset, splitStart); + exceptionSwallowingExtractFields(fieldHarvester, fields, null, value, offset, splitStart); // Verify error is captured (NullPointerException because null provided as IngestHelperInterface) assertExceptionCaptured(fieldHarvester, NullPointerException.class); assertContainsOnlySupplementalFields(); } + private void exceptionSwallowingExtractFields(FieldHarvester fieldHarvester, Multimap fields, + IngestHelperInterface ingestHelper, RawRecordContainer value, long offset, String splitStart) { + try { + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + } catch (Exception e) { + // expected case + return; + } + Assert.fail("Expected an exception"); + } + @Test - public void disableSeqFileNameCreation() { + public void disableSeqFileNameCreation() throws Exception { // Configuration disables seq file name creation Configuration config = new Configuration(); config.setBoolean(FieldHarvester.LOAD_SEQUENCE_FILE_NAME, false); @@ -102,7 +113,7 @@ public void disableSeqFileNameCreation() { } @Test - public void disableTrimmingSeqFileName() { + public void disableTrimmingSeqFileName() throws Exception { // Configuration disables trimming of seq file name Configuration config = new Configuration(); config.setBoolean(FieldHarvester.TRIM_SEQUENCE_FILE_NAME, false); @@ -126,7 +137,7 @@ public void disableTrimmingSeqFileName() { } @Test - public void enableTrimmingSeqFileName() { + public void enableTrimmingSeqFileName() throws Exception { // Default configuration enables trimming of seq file name // field map with single field and value @@ -147,7 +158,7 @@ public void enableTrimmingSeqFileName() { } @Test - public void disableRawFileName() { + public void disableRawFileName() throws Exception { // Configuration disables raw file name creation Configuration config = new Configuration(); config.setBoolean(FieldHarvester.LOAD_RAW_FILE_NAME, false); @@ -169,7 +180,7 @@ public void disableRawFileName() { } @Test - public void addsVirtualFields() { + public void addsVirtualFields() throws Exception { // Ensure that a virtual field is added Multimap fields = createOneFieldMultiMap(); IngestHelperInterface ingestHelper = new BasicWithVirtualFieldsIngestHelper(fields); @@ -189,7 +200,7 @@ public void addsVirtualFields() { } @Test - public void addsCompositeFields() { + public void addsCompositeFields() throws Exception { // cause exception in getEventFields, get salvaged fields and ensure they're used for composite field creation Multimap salvagableFields = createOneFieldMultiMap(); IngestHelperInterface ingestHelper = new BasicWithCompositeFieldsIngestHelper(salvagableFields); @@ -215,7 +226,7 @@ public void supplementsSalvagedFields() { ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(salvagableFields); // field map with single field and value - fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + exceptionSwallowingExtractFields(fieldHarvester, fields, ingestHelper, value, offset, splitStart); // Verify salvaged fields returned // Verify salvaged fields are used for virtual, composite @@ -237,7 +248,7 @@ public void emptySalvagedFields() { ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(HashMultimap.create()); // field map with empty fields - fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + exceptionSwallowingExtractFields(fieldHarvester, fields, ingestHelper, value, offset, splitStart); // empty salvaged fields assertContainsOnlySupplementalFields(); @@ -249,7 +260,7 @@ public void emptySalvagedFields() { @Test public void doubleException() { // exception in getEventFields and in salvager - fieldHarvester.extractFields(fields, new DoubleErrorIngestHelper(), value, offset, splitStart); + exceptionSwallowingExtractFields(fieldHarvester, fields, new DoubleErrorIngestHelper(), value, offset, splitStart); // Verify it contains expected fields assertContainsOnlySupplementalFields(); @@ -259,7 +270,7 @@ public void doubleException() { } @Test - public void extractFields() { + public void extractFields() throws Exception { // field map with single field and value fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); @@ -281,7 +292,7 @@ public void erroredFieldExcluded() { Multimap multiMap = HashMultimap.create(); multiMap.put(SAMPLE_FIELD_NAME, fieldWithError); - fieldHarvester.extractFields(fields, new BasicIngestHelper(multiMap), this.value, offset, splitStart); + exceptionSwallowingExtractFields(fieldHarvester, fields, new BasicIngestHelper(multiMap), this.value, offset, splitStart); // Verify fields contains expected fields Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); @@ -295,7 +306,7 @@ public void erroredFieldExcluded() { @Test public void nullIngestHelper() { // field map with single field and value - fieldHarvester.extractFields(fields, null, value, offset, splitStart); + exceptionSwallowingExtractFields(fieldHarvester, fields, null, value, offset, splitStart); // Verify it contains expected fields assertContainsOnlySupplementalFields(); @@ -305,13 +316,13 @@ public void nullIngestHelper() { } private void assertNoErrors(FieldHarvester fieldHarvester) { - Assert.assertFalse("Unexpected exception: " + fieldHarvester.getException(), fieldHarvester.hasError()); - Assert.assertNull(fieldHarvester.getException()); + Assert.assertFalse("Unexpected exception: " + fieldHarvester.getOriginalException(), fieldHarvester.hasError()); + Assert.assertNull(fieldHarvester.getOriginalException()); } private void assertExceptionCaptured(FieldHarvester fieldHarvester, Class exceptionClass) { Assert.assertTrue(fieldHarvester.hasError()); - Assert.assertEquals(exceptionClass, fieldHarvester.getException().getClass()); + Assert.assertEquals(exceptionClass, fieldHarvester.getOriginalException().getClass()); } private Multimap createOneFieldMultiMap() { From 539b3a2eb5da89a735da2413d7d0b62252fa0a07 Mon Sep 17 00:00:00 2001 From: Matthew Peterson Date: Mon, 28 Feb 2022 18:41:50 +0000 Subject: [PATCH 4/8] Fix #1438: preserve original exception instead of wrapping it --- .../java/datawave/ingest/mapreduce/FieldHarvester.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java index ce3dc719b41..f6dcec824fd 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java @@ -99,7 +99,8 @@ public void extractFields(Multimap fields, In addFieldsAndDetectFieldErrors(fields, candidateFields); if (null != this.originalException) { - throw new Exception("An exception was encountered during field harvesting", originalException); + log.error("Rethrowing original exception after completing field extraction."); + throw originalException; } } @@ -244,12 +245,12 @@ private void addFieldsAndDetectFieldErrors(Multimap entry : candidateFields.entries()) { // noinspection ThrowableResultOfMethodCallIgnored - if (entry.getValue().getError() != null) { + if (null != entry.getValue().getError()) { fieldError = entry.getValue().getError(); } fields.put(entry.getKey(), entry.getValue()); } - if (fieldError != null) { + if (null != fieldError) { if (null == this.originalException) { this.originalException = new FieldNormalizationError("Failed getting all fields", fieldError); } else { From 9f093718cfdf709b48725a0e1f1ceb7c1ca091f2 Mon Sep 17 00:00:00 2001 From: Matthew Peterson Date: Mon, 28 Feb 2022 21:27:40 +0000 Subject: [PATCH 5/8] Fix #1438: adjust Mapper test to reflect supplemental fields addition to error records --- ...l.java => MinimalistIngestHelperImpl.java} | 2 +- .../EventMapperSalvageFieldsOnErrorTest.java | 43 ++++++++++++------- .../ingest/mapreduce/FieldHarvesterTest.java | 8 ++-- 3 files changed, 32 insertions(+), 21 deletions(-) rename warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/{MinimalistIngestHelperInterfaceImpl.java => MinimalistIngestHelperImpl.java} (98%) diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java similarity index 98% rename from warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java rename to warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java index 3966bb8c9a4..7080a97c5d5 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java @@ -16,7 +16,7 @@ // For testing only, this implementation fulfills the interface contract with methods that throw UnsupportedOperationException. // It is is extendable when implementations are needed for specific tests. -public class MinimalistIngestHelperInterfaceImpl implements IngestHelperInterface { +public class MinimalistIngestHelperImpl implements IngestHelperInterface { @Override public Type getType() { // override this method, as needed diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java index bc09d9eb244..a04e8cb71af 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java @@ -6,8 +6,8 @@ import datawave.ingest.data.Type; import datawave.ingest.data.config.BaseNormalizedContent; import datawave.ingest.data.config.NormalizedContentInterface; -import datawave.ingest.data.config.ingest.ContentBaseIngestHelper; import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.MinimalistIngestHelperImpl; import datawave.ingest.mapreduce.job.BulkIngestKey; import datawave.ingest.mapreduce.job.writer.ContextWriter; import org.apache.accumulo.core.data.Value; @@ -36,6 +36,8 @@ */ public class EventMapperSalvageFieldsOnErrorTest { private static final String[] SALVAGED_FIELDS = {"ISBN", "author", "borrower", "dueDate", "libraryBranch"}; + private static final String[] SUPPLEMENTAL_FIELDS = {FieldHarvester.LOAD_DATE_FIELDNAME, FieldHarvester.RAW_FILE_FIELDNAME, + FieldHarvester.SEQUENCE_FILE_FIELDNAME}; @Rule public EasyMockRule easyMockRule = new EasyMockRule(this); @@ -58,15 +60,10 @@ record = IngestTestSetup.createRecord(dataTypeHandler, conf); } /** - * FakeSalvagingIngestHelper: - always throws an exception when getEventFields is called, to ensure error handling code path is reached within EventMapper. - * - allows for anonymous inline helper creation. + * FakeSalvagingIngestHelper implements FieldSalvager to allow anonymous test classes to implement that interface. By extending + * MinimalistIngestHelperInterfaceImpl, it always throws an exception when getEventFields is called. */ - public static abstract class FakeSalvagingIngestHelper extends ContentBaseIngestHelper implements FieldSalvager { - @Override - public Multimap getEventFields(RawRecordContainer value) { - throw new RuntimeException("Simulated exception while getting event fields for value."); - } - } + public static abstract class FakeSalvagingIngestHelper extends MinimalistIngestHelperImpl implements FieldSalvager {} /** * SalvagingDataTypeHandler provides a FieldSalvager implementation that deserializes rawData as a Map<String, String>, then returns a Multimap @@ -142,12 +139,20 @@ public void shouldSalvageAllFields() throws Exception { Multimap written = TestContextWriter.getWritten(); // Expect only the salvageable fields, each exactly once - assertEquals(written.toString(), SALVAGED_FIELDS.length, written.size()); + assertEquals(written.toString(), SALVAGED_FIELDS.length + SUPPLEMENTAL_FIELDS.length, written.size()); Map fieldNameOccurrences = countPersistedFieldsByName(written); for (String expectedFieldName : SALVAGED_FIELDS) { assertTrue(fieldNameOccurrences.toString(), fieldNameOccurrences.containsKey(expectedFieldName)); assertEquals(1, (int) fieldNameOccurrences.get(expectedFieldName)); } + verifyContainsSupplementalFields(fieldNameOccurrences); + } + + private void verifyContainsSupplementalFields(Map fieldNameOccurrences) { + for (String expectedFieldName : SUPPLEMENTAL_FIELDS) { + assertTrue(fieldNameOccurrences.toString(), fieldNameOccurrences.containsKey(expectedFieldName)); + assertEquals(1, (int) fieldNameOccurrences.get(expectedFieldName)); + } } @Test @@ -163,8 +168,10 @@ public void shouldTolerateNullSalvagedFieldsMap() throws Exception { runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields Multimap written = TestContextWriter.getWritten(); - // Expect nothing to be salvaged - assertEquals(written.toString(), 0, written.size()); + // Expect all of the salvageable fields to be missing but supplemental fields to appear + assertEquals(written.toString(), SUPPLEMENTAL_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + verifyContainsSupplementalFields(fieldNameOccurrences); } @Test @@ -179,8 +186,10 @@ public void shouldIgnoreNonSalvagedFields() throws Exception { runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields Multimap written = TestContextWriter.getWritten(); - // Expect all of the salvageable fields to occur once - assertEquals(written.toString(), 0, written.size()); + // Expect all of the salvageable fields to be missing but supplemental fields to appear + assertEquals(written.toString(), SUPPLEMENTAL_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + verifyContainsSupplementalFields(fieldNameOccurrences); } @Test @@ -193,8 +202,10 @@ public void shouldTolerateErrorInSalvager() throws Exception { runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields Multimap written = TestContextWriter.getWritten(); - // Expect all of the salvageable fields to occur once - assertEquals(written.toString(), 0, written.size()); + // Expect all of the salvageable fields to be missing but supplemental fields to appear + assertEquals(written.toString(), SUPPLEMENTAL_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + verifyContainsSupplementalFields(fieldNameOccurrences); } private void runMapper() throws IOException, InterruptedException { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java index 0e92fa8a893..27d09732dc6 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java @@ -7,7 +7,7 @@ import datawave.ingest.data.config.NormalizedFieldAndValueTest; import datawave.ingest.data.config.ingest.CompositeIngest; import datawave.ingest.data.config.ingest.IngestHelperInterface; -import datawave.ingest.data.config.ingest.MinimalistIngestHelperInterfaceImpl; +import datawave.ingest.data.config.ingest.MinimalistIngestHelperImpl; import datawave.ingest.data.config.ingest.VirtualIngest; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.NDC; @@ -343,14 +343,14 @@ private void assertContainsOnlySupplementalFields() { Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS, fields.size()); } - private static class DoubleErrorIngestHelper extends MinimalistIngestHelperInterfaceImpl implements FieldSalvager { + private static class DoubleErrorIngestHelper extends MinimalistIngestHelperImpl implements FieldSalvager { @Override public Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer) { throw new RuntimeException(); } } - private static class BasicIngestHelper extends MinimalistIngestHelperInterfaceImpl { + private static class BasicIngestHelper extends MinimalistIngestHelperImpl { private final Multimap multiMap; public BasicIngestHelper(Multimap multiMap) { @@ -417,7 +417,7 @@ public Multimap getVirtualFields(Multimap multiMap; ErroringSalvagableIngestHelper(Multimap multiMap) { From a4015f97267b1122a2115372c97999ebeb9cbf43 Mon Sep 17 00:00:00 2001 From: Matthew Peterson Date: Mon, 14 Mar 2022 14:00:17 +0000 Subject: [PATCH 6/8] Fix #1438: initial PR feedback --- .../main/java/datawave/ingest/mapreduce/FieldHarvester.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java index f6dcec824fd..87ec51806db 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java @@ -53,6 +53,7 @@ public FieldHarvester(Configuration configuration) { /** * Updates "fields" with extracted, derived, and automatically generated fields. Will capture + * exception along the way and attempt to add salvaged fields before rethrowing the exception. * * @param fields * the Multimap to modify with extracted and generated fields @@ -90,7 +91,7 @@ public void extractFields(Multimap fields, In this.originalException = exception; } else { // preserve original exception and log the latest exception - log.error(exception); + log.error("A secondary exception occurred while adding supplemental fields", exception); } } @@ -255,7 +256,7 @@ private void addFieldsAndDetectFieldErrors(Multimap Date: Mon, 14 Mar 2022 21:21:19 +0000 Subject: [PATCH 7/8] Fix #1438: change interface as per PR feedback --- .../data/config/ingest/BaseIngestHelper.java | 7 ++ .../config/ingest/IngestHelperInterface.java | 14 ++-- .../ingest/mapreduce/FieldHarvester.java | 37 ++-------- .../ingest/mapreduce/FieldSalvager.java | 24 ------- .../ingest/MinimalistIngestHelperImpl.java | 6 ++ .../EventMapperSalvageFieldsOnErrorTest.java | 69 +++++++++---------- .../ingest/mapreduce/FieldHarvesterTest.java | 42 ++++++----- .../mapreduce/SimpleDataTypeHelper.java | 8 ++- 8 files changed, 91 insertions(+), 116 deletions(-) delete mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java index e6043702b75..12268a64360 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java @@ -11,6 +11,7 @@ import datawave.data.type.OneToManyNormalizerType; import datawave.ingest.config.IngestConfiguration; import datawave.ingest.config.IngestConfigurationFactory; +import datawave.ingest.data.RawRecordContainer; import datawave.ingest.data.Type; import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelperImpl; @@ -697,6 +698,12 @@ protected NormalizedContentInterface normalizeFieldValue(NormalizedContentInterf return copy; } + @Override + public void getEventFields(RawRecordContainer value, Multimap fields) { + // default implementation calls legacy method + fields.putAll(this.getEventFields(value)); + } + /** * This is a helper routine that will create a normalize a field out of a base normalized field. * diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java index b0839cdd5dd..13054af7538 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java @@ -43,13 +43,19 @@ default boolean isShardExcluded(String fieldName) { } /** - * Fully parse the raw record and return a map of field names and values. - * - * @param value - * @return + * Deprecated. Use #getEventFields(value, fields) */ + @Deprecated Multimap getEventFields(RawRecordContainer value); + /** + * Fully parse the raw record and update the provided multimap of field names and values, with a partial update in the event of an exception. + * + * @param value + * @param fields + */ + void getEventFields(RawRecordContainer value, Multimap fields); + Multimap normalizeMap(Multimap fields); Multimap normalize(Multimap fields); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java index 87ec51806db..6677c7bcb98 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java @@ -52,8 +52,8 @@ public FieldHarvester(Configuration configuration) { } /** - * Updates "fields" with extracted, derived, and automatically generated fields. Will capture - * exception along the way and attempt to add salvaged fields before rethrowing the exception. + * Updates "fields" with extracted, derived, and automatically generated fields. Will capture exception along the way and attempt to add salvaged fields + * before rethrowing the exception. * * @param fields * the Multimap to modify with extracted and generated fields @@ -72,19 +72,18 @@ public void extractFields(Multimap fields, In this.originalException = null; // "candidateFields" holds the fields that will eventually be added to "fields" - Multimap candidateFields; + Multimap candidateFields = HashMultimap.create(); try { // parse the record into its candidate field names and values using the IngestHelperInterface. - candidateFields = ingestHelper.getEventFields(value); + ingestHelper.getEventFields(value, candidateFields); } catch (Exception exception) { // delay throwing the exception to attempt salvaging this.originalException = exception; - candidateFields = attemptToSalvageFields(value, ingestHelper); } try { - // try adding supplemental fields to candidateFields, whether or not they were salvaged + // try adding supplemental fields to candidateFields, whether or not they were complete addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields); } catch (Exception exception) { if (null == this.originalException) { @@ -115,32 +114,6 @@ Exception getOriginalException() { return this.originalException; } - /** - * If IngestHelper implements FieldSalvager, get the salvageable fields from value. Otherwise, return an empty Multimap. - */ - private Multimap attemptToSalvageFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { - // If this helper is able, attempt to salvage a subset of the fields - if (null != ingestHelper && ingestHelper instanceof FieldSalvager) { - FieldSalvager salvager = (FieldSalvager) ingestHelper; - try { - Multimap salvagedFields = salvager.getSalvageableEventFields(value); - if (null != salvagedFields) { - return salvagedFields; - } - } catch (Exception salvagerException) { - // Do not overwrite the original exception - if (null == this.originalException) { - this.originalException = new IllegalStateException("Unexpected state (FieldExpander.exception should be non-null if salvaging", - salvagerException); - } else { - // allow original exception (this.exception) to be thrown by caller - log.error("Even salvager threw an exception", salvagerException); - } - } - } - return HashMultimap.create(); - } - private void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper, Multimap fields) { addVirtualFields(ingestHelper, fields); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java deleted file mode 100644 index 0f6d07a2e2c..00000000000 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java +++ /dev/null @@ -1,24 +0,0 @@ -package datawave.ingest.mapreduce; - -import com.google.common.collect.Multimap; -import datawave.ingest.data.RawRecordContainer; -import datawave.ingest.data.config.NormalizedContentInterface; - -/** - * This optional interface is intended to complement the IngestHelperInterface interface's handling of errors that occur within ingest jobs. - * - * One use case is when IngestHelperInterface's getEventFields throws an exception. The getEventFields method will not return a Multimap of field values - * (because it instead threw an exception). Prior to FieldSalvager, this meant that the error tables would not have information on any of the - * RawRecordContainer's field values. - * - * FieldSalvager implementations can attempt to provide a subset of the field values, so that the error tables can have more helpful information about the - * failed record, perhaps aiding troubleshooting efforts. An implementation could return only those field names that are relatively well-structured and - * predictably formatted, very unlikely to cause exceptions while processing. - */ -public interface FieldSalvager { - /** - * @param rawRecordContainer - * @return Multimap containing subset of field values, possibly empty but not null - */ - Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer); -} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java index 7080a97c5d5..65fa3065210 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java @@ -53,6 +53,12 @@ public Multimap getEventFields(RawRecordConta throw new UnsupportedOperationException(); } + @Override + public void getEventFields(RawRecordContainer value, Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + @Override public Multimap normalizeMap(Multimap fields) { // override this method, as needed diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java index a04e8cb71af..8378bc1057b 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java @@ -6,6 +6,7 @@ import datawave.ingest.data.Type; import datawave.ingest.data.config.BaseNormalizedContent; import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValue; import datawave.ingest.data.config.ingest.IngestHelperInterface; import datawave.ingest.data.config.ingest.MinimalistIngestHelperImpl; import datawave.ingest.mapreduce.job.BulkIngestKey; @@ -59,12 +60,6 @@ public void setupTest(String dataTypeHandler) throws Exception { record = IngestTestSetup.createRecord(dataTypeHandler, conf); } - /** - * FakeSalvagingIngestHelper implements FieldSalvager to allow anonymous test classes to implement that interface. By extending - * MinimalistIngestHelperInterfaceImpl, it always throws an exception when getEventFields is called. - */ - public static abstract class FakeSalvagingIngestHelper extends MinimalistIngestHelperImpl implements FieldSalvager {} - /** * SalvagingDataTypeHandler provides a FieldSalvager implementation that deserializes rawData as a Map<String, String>, then returns a Multimap * containing only the SALVAGED_FIELDS that were encountered, skipping all others. @@ -72,34 +67,38 @@ public static abstract class FakeSalvagingIngestHelper extends MinimalistIngestH public static class SalvagingDataTypeHandler extends SimpleDataTypeHandler { @Override public IngestHelperInterface getHelper(Type datatype) { - FakeSalvagingIngestHelper fakeSalvagingIngestHelper = new FakeSalvagingIngestHelper() { + MinimalistIngestHelperImpl fakeSalvagingIngestHelper = new MinimalistIngestHelperImpl() { + @Override + public Multimap getEventFields(RawRecordContainer value) { + throw new RuntimeException("Simulated exception while getting event fields for value."); + } + @Override - public Multimap getSalvageableEventFields(RawRecordContainer value) { - HashMultimap fields = HashMultimap.create(); - byte[] rawData = value.getRawData(); - - try (ByteArrayInputStream bytArrayInputStream = new ByteArrayInputStream(rawData); - ObjectInputStream objectInputStream = new ObjectInputStream(bytArrayInputStream);) { - Map deserializedData = (Map) objectInputStream.readObject(); - for (String fieldToSalvage : SALVAGED_FIELDS) { - String fieldValue = deserializedData.get(fieldToSalvage); - if (null != fieldValue) { - try { - fields.put(fieldToSalvage, new BaseNormalizedContent(fieldToSalvage, fieldValue)); - } catch (Exception fieldException) { - // skip this field and proceed to the next + public void getEventFields(RawRecordContainer value, Multimap fields) { + try { + fields.putAll(getEventFields(value)); + } catch (Exception exception) { + // salvage fields implementation + byte[] rawData = value.getRawData(); + + try (ByteArrayInputStream bytArrayInputStream = new ByteArrayInputStream(rawData); + ObjectInputStream objectInputStream = new ObjectInputStream(bytArrayInputStream);) { + Map deserializedData = (Map) objectInputStream.readObject(); + for (String fieldToSalvage : SALVAGED_FIELDS) { + String fieldValue = deserializedData.get(fieldToSalvage); + if (null != fieldValue) { + try { + fields.put(fieldToSalvage, new BaseNormalizedContent(fieldToSalvage, fieldValue)); + } catch (Exception fieldException) { + // skip this field and proceed to the next + } } } + } catch (IOException | ClassNotFoundException e) { + throw new IllegalStateException(e); } - } catch (Exception e) { - return fields; + throw exception; } - return fields; - } - - @Override - public Multimap getEventFields(RawRecordContainer value) { - throw new RuntimeException("Simulated exception while getting event fields for value."); } }; return fakeSalvagingIngestHelper; @@ -107,16 +106,14 @@ public Multimap getEventFields(RawRecordConta } /** - * NullSalvagingSimpleDataTypeHandler provides a FieldSalvager implementation that always returns null. + * NoopSalvagingSimpleDataTypeHandler provides a MinimalistIngestHelperImpl implementation that does nothing to fields. */ - public static class NullSalvagingSimpleDataTypeHandler extends SimpleDataTypeHandler { + public static class NoopSalvagingSimpleDataTypeHandler extends SimpleDataTypeHandler { @Override public IngestHelperInterface getHelper(Type datatype) { - return new FakeSalvagingIngestHelper() { + return new MinimalistIngestHelperImpl() { @Override - public Multimap getSalvageableEventFields(RawRecordContainer value) { - return null; - } + public void getEventFields(RawRecordContainer value, Multimap fields) {} }; } } @@ -158,7 +155,7 @@ private void verifyContainsSupplementalFields(Map fieldNameOccur @Test public void shouldTolerateNullSalvagedFieldsMap() throws Exception { // Use a DataTypeHandler that provides a FieldSalvager that always returns null - setupTest(NullSalvagingSimpleDataTypeHandler.class.getName()); + setupTest(NoopSalvagingSimpleDataTypeHandler.class.getName()); // Create a record with salvageable and unsalvageable fields HashMap fieldValues = createMapOfSalveagableFieldValues(); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java index 27d09732dc6..d1d2301b768 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java @@ -186,14 +186,14 @@ public void addsVirtualFields() throws Exception { IngestHelperInterface ingestHelper = new BasicWithVirtualFieldsIngestHelper(fields); // field map with single field and value - fieldHarvester.extractFields(this.fields, ingestHelper, value, offset, splitStart); + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); // Verify field returned - Assert.assertTrue(this.fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); // Verify field is used for virtual field creation - Assert.assertTrue(this.fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); assertContainsSupplementalFields(fields); - Assert.assertEquals(this.fields.toString(), 5, this.fields.size()); + Assert.assertEquals(fields.toString(), 5, fields.size()); // Verify there was no exception assertNoErrors(fieldHarvester); @@ -259,8 +259,8 @@ public void emptySalvagedFields() { @Test public void doubleException() { - // exception in getEventFields and in salvager - exceptionSwallowingExtractFields(fieldHarvester, fields, new DoubleErrorIngestHelper(), value, offset, splitStart); + // exception in both getEventFields implementations + exceptionSwallowingExtractFields(fieldHarvester, fields, new MinimalistIngestHelperImpl(), value, offset, splitStart); // Verify it contains expected fields assertContainsOnlySupplementalFields(); @@ -332,9 +332,9 @@ private Multimap createOneFieldMultiMap() { } private void assertContainsSupplementalFields(Multimap fields) { - Assert.assertTrue(fields.containsKey(LOAD_DATE)); - Assert.assertTrue(fields.containsKey(ORIG_FILE)); - Assert.assertTrue(fields.containsKey(RAW_FILE)); + Assert.assertTrue(fields.toString(), fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.toString(), fields.containsKey(ORIG_FILE)); + Assert.assertTrue(fields.toString(), fields.containsKey(RAW_FILE)); } private void assertContainsOnlySupplementalFields() { @@ -343,13 +343,6 @@ private void assertContainsOnlySupplementalFields() { Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS, fields.size()); } - private static class DoubleErrorIngestHelper extends MinimalistIngestHelperImpl implements FieldSalvager { - @Override - public Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer) { - throw new RuntimeException(); - } - } - private static class BasicIngestHelper extends MinimalistIngestHelperImpl { private final Multimap multiMap; @@ -357,9 +350,15 @@ public BasicIngestHelper(Multimap multiMap) { this.multiMap = multiMap; } + @Override public Multimap getEventFields(RawRecordContainer event) { return multiMap; } + + @Override + public void getEventFields(RawRecordContainer event, Multimap fields) { + fields.putAll(getEventFields(event)); + } } private static class BasicWithCompositeFieldsIngestHelper extends BasicIngestHelper implements CompositeIngest { @@ -417,7 +416,7 @@ public Multimap getVirtualFields(Multimap multiMap; ErroringSalvagableIngestHelper(Multimap multiMap) { @@ -425,8 +424,13 @@ private static class ErroringSalvagableIngestHelper extends MinimalistIngestHelp } @Override - public Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer) { - return this.multiMap; + public void getEventFields(RawRecordContainer value, Multimap fields) { + try { + fields.putAll(getEventFields(value)); + } catch (Exception e) { + fields.putAll(this.multiMap); + throw e; + } } @Override diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java index 4e14d46d9bd..3e035d5b3fd 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java @@ -43,7 +43,13 @@ public static IngestHelperInterface create() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("getEventFields")) { - return fields; + if (args.length == 1) { + return fields; + } else { + // see getEventFields(value, fields) + ((Multimap) args[1]).putAll(fields); + return Void.TYPE; + } } else { throw new UnsupportedOperationException("Sorry, " + this.getClass() + " does not currently support the " + method.getName() + " method. Feel free to implement it!"); From 1d38126e2c307fecc7949dd06e8b0a9a82d06557 Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Wed, 4 Dec 2024 19:55:08 +0000 Subject: [PATCH 8/8] GitHub Actions: Fix Formatting --- .../java/datawave/ingest/data/RawRecordContainerImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java index 7e01f765078..465ac6ae052 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java @@ -22,8 +22,8 @@ import datawave.ingest.config.IngestConfigurationFactory; import datawave.ingest.config.RawRecordContainerImpl; import datawave.ingest.data.config.MarkingsHelper; -import datawave.util.TypeRegistryTestSetup; import datawave.util.CompositeTimestamp; +import datawave.util.TypeRegistryTestSetup; public class RawRecordContainerImplTest {