diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java index c3d28d3a2d..08ad1da3ca 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java @@ -31,6 +31,7 @@ import datawave.data.type.OneToManyNormalizerType; import datawave.ingest.config.IngestConfiguration; import datawave.ingest.config.IngestConfigurationFactory; +import datawave.ingest.data.RawRecordContainer; import datawave.ingest.data.Type; import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelperImpl; @@ -765,6 +766,12 @@ protected NormalizedContentInterface normalizeFieldValue(NormalizedContentInterf return copy; } + @Override + public void getEventFields(RawRecordContainer value, Multimap fields) { + // default implementation calls legacy method + fields.putAll(this.getEventFields(value)); + } + /** * This is a helper routine that will create and normalize a field out of a base normalized field. * diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java index be12b02467..fde6121ebb 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/IngestHelperInterface.java @@ -45,13 +45,18 @@ default boolean isShardExcluded(String fieldName) { } /** - * Fully parse the raw record and return a map of field names and values. + * Deprecated. Use #getEventFields(value, fields) + */ + @Deprecated + Multimap getEventFields(RawRecordContainer value); + + /** + * Fully parse the raw record and update the provided multimap of field names and values, with a partial update in the event of an exception. * * @param value - * a {@link RawRecordContainer} - * @return a MultiMap of normalized field values + * @param fields */ - Multimap getEventFields(RawRecordContainer value); + void getEventFields(RawRecordContainer value, Multimap fields); Multimap normalizeMap(Multimap fields); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java index edce34ef64..1aa40943e8 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java @@ -6,12 +6,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.Stack; @@ -40,12 +38,8 @@ import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelper; import datawave.ingest.data.config.NormalizedContentInterface; -import datawave.ingest.data.config.NormalizedFieldAndValue; import datawave.ingest.data.config.filter.KeyValueFilter; -import datawave.ingest.data.config.ingest.CompositeIngest; -import datawave.ingest.data.config.ingest.FilterIngest; import datawave.ingest.data.config.ingest.IngestHelperInterface; -import datawave.ingest.data.config.ingest.VirtualIngest; import datawave.ingest.input.reader.event.EventErrorSummary; import datawave.ingest.mapreduce.handler.DataTypeHandler; import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler; @@ -167,6 +161,8 @@ public class EventMapper extends StatsDE private MetricsService metricsService; private ReusableMetricsLabels metricsLabels; + private FieldHarvester fieldHarvester; + /** * Set up the datatype handlers */ @@ -191,6 +187,9 @@ public void setup(Context context) throws IOException, InterruptedException { // Initialize the Type Registry TypeRegistry.getInstance(context.getConfiguration()); + // FieldHarvester encapsulates the addition of virtual fields, composite fields, LOAD_DATE, etc. + fieldHarvester = new FieldHarvester(context.getConfiguration()); + // load the predicates applied to all types predicates = new HashSet<>(context.getConfiguration().getTrimmedStringCollection(RECORD_PREDICATES)); // always add the discard interval predicates @@ -302,7 +301,7 @@ public void setup(Context context) throws IOException, InterruptedException { * the context * @return the data type handlers */ - private List> loadDataType(String typeStr, Context context) { + private List> loadDataTypeHandlers(String typeStr, Context context) { // Do not load the type twice if (!typeMap.containsKey(typeStr)) { @@ -427,8 +426,8 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt eventMapperTimer.start(); } - // ensure this datatype's handlers etc are loaded such that the predicates and validators are filled as well - List> typeHandlers = loadDataType(value.getDataType().typeName(), context); + // ensure this datatype's handlers etc are loaded such that the dataTypeDiscardIntervalCache and validators are filled as well + List> typeHandlers = loadDataTypeHandlers(value.getDataType().typeName(), context); // This is a little bit fragile, but there is no other way // to get the context on a partitioner, and we are only @@ -469,13 +468,12 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt try { // Load error dataType into typeMap - loadDataType(TypeRegistry.ERROR_PREFIX, context); - + loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context); // purge event errorSummary.purge(contextWriter, context, value, typeMap); // Set the original file value from the event in the error table - Collection origFiles = errorSummary.getEventFields().get(SEQUENCE_FILE_FIELDNAME); + Collection origFiles = errorSummary.getEventFields().get(FieldHarvester.SEQUENCE_FILE_FIELDNAME); if (!origFiles.isEmpty()) { NDC.push(origFiles.iterator().next()); reprocessedNDCPush = true; @@ -510,7 +508,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt // Add the list of handlers with the ALL specified handlers List> handlers = new ArrayList<>(); handlers.addAll(typeHandlers); - handlers.addAll(loadDataType(TypeRegistry.ALL_PREFIX, context)); + handlers.addAll(loadDataTypeHandlers(TypeRegistry.ALL_PREFIX, context)); // Always include any event errors in the counters for (String error : value.getErrors()) { @@ -523,8 +521,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt handlers.clear(); if (!value.ignorableError()) { // since this is not an ignorable error, lets add the error handlers back into the list - handlers.addAll(loadDataType(TypeRegistry.ERROR_PREFIX, context)); - + handlers.addAll(loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context)); getCounter(context, IngestInput.EVENT_FATAL_ERROR).increment(1); getCounter(context, IngestInput.EVENT_FATAL_ERROR.name(), "ValidationError").increment(1); } else { @@ -542,42 +539,12 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt // Rollback anything written for this event contextWriter.rollback(); - // Fail job on constraint violations - if (e instanceof ConstraintChecker.ConstraintViolationException) { - throw ((RuntimeException) e); - } + failJobOnConstraintViolations(e); // ensure they know we are still working on it context.progress(); - // log error - log.error("Runtime exception processing event", e); - - // now lets dump to the errors table - // first set the exception on the event if not a field normalization error in which case the fields contain the errors - if (!(e instanceof FieldNormalizationError)) { - value.setAuxData(e); - } - for (DataTypeHandler handler : loadDataType(TypeRegistry.ERROR_PREFIX, context)) { - if (log.isTraceEnabled()) - log.trace("executing handler: " + handler.getClass().getName()); - try { - executeHandler(key, value, fields, handler, context); - context.progress(); - } catch (Exception e2) { - // This is a real bummer, we had a critical exception attempting to throw the event into the error table. - // lets terminate this job - log.error("Failed to process error data handlers for an event", e2); - throw new IOException("Failed to process error data handlers for an event", e2); - } - } - - // now create some counters - getCounter(context, IngestProcess.RUNTIME_EXCEPTION).increment(1); - List exceptions = getExceptionSynopsis(e); - for (String exception : exceptions) { - getCounter(context, IngestProcess.RUNTIME_EXCEPTION.name(), exception).increment(1); - } + handleProcessingError(key, value, context, fields, e); } finally { // Remove ORIG_FILE from NDC that was populated by reprocessing events from the error tables if (reprocessedNDCPush) { @@ -588,10 +555,60 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt context.progress(); } + incrementEventCount(value, context); + + updateMetrics(value, eventMapperTimer, fields); + } + + private void failJobOnConstraintViolations(Exception e) { + if (e instanceof ConstraintChecker.ConstraintViolationException) { + throw ((RuntimeException) e); + } + } + + private void handleProcessingError(K1 key, V1 value, Context context, Multimap fields, Exception e) throws IOException { + log.error("Runtime exception processing event", e); + // first set the exception on the event if not a field normalization error in which case the fields contain the errors + if (!(e instanceof FieldHarvester.FieldNormalizationError)) { + value.setAuxData(e); + } + writeToErrorTables(key, value, context, fields); + incrementExceptionCounters(context, e); + } + + private void writeToErrorTables(K1 key, V1 value, Context context, Multimap fields) throws IOException { + // now lets dump to the errors table + for (DataTypeHandler handler : loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context)) { + if (log.isTraceEnabled()) + log.trace("executing handler: " + handler.getClass().getName()); + try { + executeHandler(key, value, fields, handler, context); + context.progress(); + } catch (Exception e2) { + // This is a real bummer, we had a critical exception attempting to throw the event into the error table. + // lets terminate this job + log.error("Failed to process error data handlers for an event", e2); + throw new IOException("Failed to process error data handlers for an event", e2); + } + } + } + + private void incrementExceptionCounters(Context context, Exception e) { + // now create some counters + getCounter(context, IngestProcess.RUNTIME_EXCEPTION).increment(1); + List exceptions = getExceptionSynopsis(e); + for (String exception : exceptions) { + getCounter(context, IngestProcess.RUNTIME_EXCEPTION.name(), exception).increment(1); + } + } + + private void incrementEventCount(V1 value, Context context) { getCounter(context, IngestOutput.EVENTS_PROCESSED.name(), value.getDataType().typeName().toUpperCase()).increment(1); offset++; + } + private void updateMetrics(V1 value, TraceStopwatch eventMapperTimer, Multimap fields) { if (metricsEnabled && eventMapperTimer != null) { eventMapperTimer.stop(); long timeInEventMapper = eventMapperTimer.elapsed(TimeUnit.MILLISECONDS); @@ -740,26 +757,11 @@ public void processEvent(K1 key, RawRecordContainer value, List entry : getFields(value, handler).entries()) { - // noinspection ThrowableResultOfMethodCallIgnored - if (entry.getValue().getError() != null) { - e = entry.getValue().getError(); - } - fields.put(entry.getKey(), entry.getValue()); - } - if (e != null) { - throw new FieldNormalizationError("Failed getting all fields", e); - } - // Event based metrics - if (metricsEnabled) { - metricsLabels.clear(); - metricsLabels.put("dataType", value.getDataType().typeName()); + // populates fields by parsing value and using IngestHelper + fieldHarvester.extractFields(fields, thisHelper, value, offset, splitStart); - metricsService.collect(Metric.EVENT_COUNT, metricsLabels.get(), fields, 1L); - metricsService.collect(Metric.BYTE_COUNT, metricsLabels.get(), fields, (long) value.getRawData().length); - } + updateMetrics(value, fields); previousHelper = thisHelper; } @@ -775,79 +777,15 @@ public void processEvent(K1 key, RawRecordContainer value, List getFields(RawRecordContainer value, DataTypeHandler handler) throws Exception { - Multimap newFields; - // Parse the event into its field names and field values using the DataTypeHandler's BaseIngestHelper object. - newFields = handler.getHelper(value.getDataType()).getEventFields(value); - - // Also get the virtual fields, if applicable. - if (handler.getHelper(value.getDataType()) instanceof VirtualIngest) { - VirtualIngest vHelper = (VirtualIngest) handler.getHelper(value.getDataType()); - Multimap virtualFields = vHelper.getVirtualFields(newFields); - for (Entry v : virtualFields.entries()) - newFields.put(v.getKey(), v.getValue()); - } - // Also get the composite fields, if applicable - if (handler.getHelper(value.getDataType()) instanceof CompositeIngest) { - CompositeIngest vHelper = (CompositeIngest) handler.getHelper(value.getDataType()); - Multimap compositeFields = vHelper.getCompositeFields(newFields); - for (String fieldName : compositeFields.keySet()) { - // if this is an overloaded composite field, we are replacing the existing field data - if (vHelper.isOverloadedCompositeField(fieldName)) - newFields.removeAll(fieldName); - newFields.putAll(fieldName, compositeFields.get(fieldName)); - } - } - - // Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes - long loadDate = now.get(); - NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate)); - // set an indexed field value for use by the date index data type handler - loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate))); - newFields.put(LOAD_DATE_FIELDNAME, loadDateValue); - - String seqFileName = null; - - // place the sequence filename into the event - if (createSequenceFileName) { - seqFileName = NDC.peek(); - - if (trimSequenceFileName) { - seqFileName = StringUtils.substringAfterLast(seqFileName, "/"); - } - - if (null != seqFileName) { - StringBuilder seqFile = new StringBuilder(seqFileName); - - seqFile.append(SRC_FILE_DEL).append(offset); - - if (null != splitStart) { - seqFile.append(SRC_FILE_DEL).append(splitStart); - } - - newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString())); - } - } - - if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) { - newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName())); - } + private void updateMetrics(RawRecordContainer value, Multimap fields) { + // Event based metrics + if (metricsEnabled) { + metricsLabels.clear(); + metricsLabels.put("dataType", value.getDataType().typeName()); - // Also if this helper needs to filter the fields before returning, apply now - if (handler.getHelper(value.getDataType()) instanceof FilterIngest) { - FilterIngest fHelper = (FilterIngest) handler.getHelper(value.getDataType()); - fHelper.filter(newFields); + metricsService.collect(Metric.EVENT_COUNT, metricsLabels.get(), fields, 1L); + metricsService.collect(Metric.BYTE_COUNT, metricsLabels.get(), fields, (long) value.getRawData().length); } - - return newFields; } @SuppressWarnings("unchecked") diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java new file mode 100644 index 0000000000..ceae515400 --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java @@ -0,0 +1,248 @@ +package datawave.ingest.mapreduce; + +import java.util.Date; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +import datawave.data.normalizer.DateNormalizer; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValue; +import datawave.ingest.data.config.ingest.CompositeIngest; +import datawave.ingest.data.config.ingest.FilterIngest; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.VirtualIngest; +import datawave.ingest.time.Now; +import datawave.util.StringUtils; + +/** + * Encapsulates the logic for extracting fields from a record, making use of a provided IngestHelperInterface. Generates virtual fields, composite fields, and + * supplemental fields (like LOAD_DATE, ORIG_FILE, and RAW_FILE). Some logic for handling errors is also included here: extracting salvagable fields if any + * exception occurs, and detecting if there were field errors (indicating a normalization failure). + */ +public class FieldHarvester { + private static final Logger log = Logger.getLogger(FieldHarvester.class); + + private static Now now = Now.getInstance(); + + public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE"; + public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE"; + public static final String RAW_FILE_FIELDNAME = "RAW_FILE"; + public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename"; + public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename"; + public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename"; + + private boolean createSequenceFileName; + private boolean trimSequenceFileName; + private boolean createRawFileName; + private final DateNormalizer dateNormalizer = new DateNormalizer(); + + private static final String SRC_FILE_DEL = "|"; + private Exception originalException; + + public FieldHarvester(Configuration configuration) { + this.createSequenceFileName = configuration.getBoolean(LOAD_SEQUENCE_FILE_NAME, true); + this.trimSequenceFileName = configuration.getBoolean(TRIM_SEQUENCE_FILE_NAME, true); + this.createRawFileName = configuration.getBoolean(LOAD_RAW_FILE_NAME, true); + } + + /** + * Updates "fields" with extracted, derived, and automatically generated fields. Will capture exception along the way and attempt to add salvaged fields + * before rethrowing the exception. + * + * @param fields + * the Multimap to modify with extracted and generated fields + * @param ingestHelper + * interface to use for field extraction + * @param value + * the record from which the fields will be extracted + * @param offset + * record offset within the source file + * @param splitStart + * the splitStart for the record + */ + public void extractFields(Multimap fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset, + String splitStart) throws Exception { + // reset exception-in-extraction tracking + this.originalException = null; + + // "candidateFields" holds the fields that will eventually be added to "fields" + Multimap candidateFields = HashMultimap.create(); + + try { + // parse the record into its candidate field names and values using the IngestHelperInterface. + ingestHelper.getEventFields(value, candidateFields); + } catch (Exception exception) { + // delay throwing the exception to attempt salvaging + this.originalException = exception; + } + + try { + // try adding supplemental fields to candidateFields, whether or not they were complete + addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields); + } catch (Exception exception) { + if (null == this.originalException) { + this.originalException = exception; + } else { + // preserve original exception and log the latest exception + log.error("A secondary exception occurred while adding supplemental fields", exception); + } + } + + // add candidateFields to fields, even if there was an error + // identify if any individual fields contain an error + addFieldsAndDetectFieldErrors(fields, candidateFields); + + if (null != this.originalException) { + log.error("Rethrowing original exception after completing field extraction."); + throw originalException; + } + } + + @VisibleForTesting + boolean hasError() { + return null != this.originalException; + } + + @VisibleForTesting + Exception getOriginalException() { + return this.originalException; + } + + private void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper, + Multimap fields) { + addVirtualFields(ingestHelper, fields); + addCompositeFields(ingestHelper, fields); + addLoadDateField(fields); + addFileNameFields(value, offset, splitStart, fields); + applyFieldFilters(ingestHelper, fields); + } + + private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also get the virtual fields, if applicable. + if (null != newFields && ingestHelper instanceof VirtualIngest) { + VirtualIngest vHelper = (VirtualIngest) ingestHelper; + Multimap virtualFields = vHelper.getVirtualFields(newFields); + for (Map.Entry v : virtualFields.entries()) + newFields.put(v.getKey(), v.getValue()); + } + } + + private void addCompositeFields(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also get the composite fields, if applicable + if (null != newFields && ingestHelper instanceof CompositeIngest) { + CompositeIngest vHelper = (CompositeIngest) ingestHelper; + Multimap compositeFields = vHelper.getCompositeFields(newFields); + for (String fieldName : compositeFields.keySet()) { + // if this is an overloaded composite field, we are replacing the existing field data + if (vHelper.isOverloadedCompositeField(fieldName)) { + newFields.removeAll(fieldName); + } + newFields.putAll(fieldName, compositeFields.get(fieldName)); + } + } + } + + private void addLoadDateField(Multimap newFields) { + // Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes + long loadDate = now.get(); + NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate)); + // set an indexed field value for use by the date index data type handler + loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate))); + newFields.put(LOAD_DATE_FIELDNAME, loadDateValue); + } + + private void addRawFileField(RawRecordContainer value, Multimap newFields, String seqFileName) { + if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) { + newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName())); + } + } + + private void addOrigFileField(Multimap newFields, long offset, String splitStart, String seqFileName) { + if (null != seqFileName) { + StringBuilder seqFile = new StringBuilder(seqFileName); + + seqFile.append(SRC_FILE_DEL).append(offset); + + if (null != splitStart) { + seqFile.append(SRC_FILE_DEL).append(splitStart); + } + + newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString())); + } + } + + private String getSeqFileName() { + String seqFileName; + seqFileName = NDC.peek(); + + if (trimSequenceFileName) { + seqFileName = StringUtils.substringAfterLast(seqFileName, "/"); + } + return seqFileName; + } + + private void addFileNameFields(RawRecordContainer value, long offset, String splitStart, Multimap newFields) { + String seqFileName = null; + + if (createSequenceFileName) { + seqFileName = getSeqFileName(); + + // place the sequence filename into the event + addOrigFileField(newFields, offset, splitStart, seqFileName); + } + + addRawFileField(value, newFields, seqFileName); + } + + private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also if this helper needs to filter the fields before returning, apply now + if (ingestHelper instanceof FilterIngest) { + FilterIngest fHelper = (FilterIngest) ingestHelper; + fHelper.filter(newFields); + } + } + + /** + * Adds candidateFields to fields. Looks at each of the candidate fields, inspection for field errors. Sets the field harvester's exception field if any + * field errors were found. + */ + private void addFieldsAndDetectFieldErrors(Multimap fields, + Multimap candidateFields) { + if (null == candidateFields) { + return; + } + Throwable fieldError = null; + for (Map.Entry entry : candidateFields.entries()) { + // noinspection ThrowableResultOfMethodCallIgnored + if (null != entry.getValue().getError()) { + fieldError = entry.getValue().getError(); + } + fields.put(entry.getKey(), entry.getValue()); + } + if (null != fieldError) { + if (null == this.originalException) { + this.originalException = new FieldNormalizationError("Failed getting all fields", fieldError); + } else { + // preserve original exception + log.error("A field exception was observed while adding fields", fieldError); + } + } + } + + public static class FieldNormalizationError extends Exception { + + private static final long serialVersionUID = 1L; + + public FieldNormalizationError(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java index 090e43f407..e59aaa95f5 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java @@ -56,7 +56,7 @@ import datawave.ingest.data.config.NormalizedContentInterface; import datawave.ingest.data.config.ingest.IngestHelperInterface; import datawave.ingest.data.normalizer.SimpleGroupFieldNameParser; -import datawave.ingest.mapreduce.EventMapper; +import datawave.ingest.mapreduce.FieldHarvester; import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler; import datawave.ingest.mapreduce.handler.edge.define.EdgeDataBundle; import datawave.ingest.mapreduce.handler.edge.define.EdgeDefinition; @@ -808,7 +808,7 @@ private void setupPreconditionEvaluation(Multimap fields) { String loadDateStr; - Collection loadDates = fields.get(EventMapper.LOAD_DATE_FIELDNAME); + Collection loadDates = fields.get(FieldHarvester.LOAD_DATE_FIELDNAME); if (!loadDates.isEmpty()) { NormalizedContentInterface nci = loadDates.iterator().next(); Date date = new Date(Long.parseLong(nci.getEventFieldValue())); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java index a8f191526b..465ac6ae05 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java @@ -23,6 +23,7 @@ import datawave.ingest.config.RawRecordContainerImpl; import datawave.ingest.data.config.MarkingsHelper; import datawave.util.CompositeTimestamp; +import datawave.util.TypeRegistryTestSetup; public class RawRecordContainerImplTest { @@ -54,8 +55,7 @@ public void setUp() throws Exception { conf.set("samplecsv" + TypeRegistry.INGEST_HELPER, TestCSVIngestHelper.class.getName()); conf.set("samplecsv.reader.class", TestCSVReader.class.getName()); conf.set("samplecsv" + MarkingsHelper.DEFAULT_MARKING, "PUBLIC|PRIVATE"); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); dataType = TypeRegistry.getType("samplecsv"); } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java index b03a12a39f..540467d53e 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java @@ -9,8 +9,8 @@ import org.junit.Before; import org.junit.Test; -import datawave.ingest.data.TypeRegistry; import datawave.policy.IngestPolicyEnforcer; +import datawave.util.TypeRegistryTestSetup; public class DataTypeHelperImplTest { @@ -25,8 +25,7 @@ public void setup() { @Test(expected = IllegalArgumentException.class) public void testInvalidConfig() { DataTypeHelperImpl helper = new DataTypeHelperImpl(); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); helper.setup(conf); } @@ -35,9 +34,8 @@ public void testValidConfig() throws Exception { InputStream configStream = getClass().getResourceAsStream("/fake-datatype-config.xml"); Assert.assertNotNull(configStream); conf.addResource(configStream); - Assert.assertThat(conf.get("data.name"), is("fake")); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + Assert.assertEquals(conf.get("data.name"), ("fake")); + TypeRegistryTestSetup.resetTypeRegistry(conf); DataTypeHelperImpl helper = new DataTypeHelperImpl(); helper.setup(conf); @@ -54,8 +52,7 @@ public void testDowncaseFields() throws Exception { Assert.assertNotNull(configStream); conf.addResource(configStream); conf.set("fake" + DataTypeHelper.Properties.DOWNCASE_FIELDS, "one,two,three,FOUR"); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); DataTypeHelperImpl helper = new DataTypeHelperImpl(); helper.setup(conf); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java index f51e3eec71..1374f9b587 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java @@ -22,7 +22,7 @@ public static class NonGroupedInstance implements NormalizedContentInterface { private Map _markings; private Throwable _error; - protected NonGroupedInstance() { + public NonGroupedInstance() { _fieldName = "TestNonGroupedInstance"; _indexedFieldName = "TestIndexedField"; @@ -142,7 +142,7 @@ public static class GroupedInstance implements GroupedNormalizedContentInterface private String _group; private String _subGroup; - protected GroupedInstance() { + public GroupedInstance() { _fieldName = "TestNonGroupedInstance"; _indexedFieldName = "TestIndexedField"; diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java index 4a15f4f824..68999adb19 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java @@ -11,6 +11,7 @@ import datawave.TestBaseIngestHelper; import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelper.Properties; +import datawave.util.TypeRegistryTestSetup; public class FieldNameAliaserNormalizerTest { @@ -21,10 +22,7 @@ public void setup() { conf = new Configuration(); conf.set(Properties.DATA_NAME, "test"); conf.set("test" + TypeRegistry.INGEST_HELPER, TestBaseIngestHelper.class.getName()); - - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); - + TypeRegistryTestSetup.resetTypeRegistry(conf); } @Test(expected = IllegalArgumentException.class) diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java new file mode 100644 index 0000000000..a53f310bc6 --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperImpl.java @@ -0,0 +1,254 @@ +package datawave.ingest.data.config.ingest; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Multimap; + +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.config.DataTypeHelperImpl; +import datawave.ingest.data.config.MaskedFieldHelper; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.policy.IngestPolicyEnforcer; + +// For testing only, this implementation fulfills the interface contract with methods that throw UnsupportedOperationException. +// It is is extendable when implementations are needed for specific tests. +public class MinimalistIngestHelperImpl implements IngestHelperInterface { + @Override + public Type getType() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public IngestPolicyEnforcer getPolicyEnforcer() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Configuration conf) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Set getShardExclusions() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShardExcluded(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getEventFields(RawRecordContainer value) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void getEventFields(RawRecordContainer value, Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap normalizeMap(Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap normalize(Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public List> getDataTypes(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public String getNormalizedMaskedValue(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMappings() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public String get(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean getDeleteMode() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean getReplaceMalformedUTF8() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmbeddedHelperMaskedFieldHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public MaskedFieldHelper getEmbeddedHelperAsMaskedFieldHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public DataTypeHelperImpl getEmbeddedHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReverseIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isIndexOnlyField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addShardExclusionField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addReverseIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addIndexOnlyField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCompositeField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOverloadedCompositeField(String fieldName) { + return true; + } + + @Override + public boolean isNormalizedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addNormalizedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAliasedIndexField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public HashSet getAliasesForIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDataTypeField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFieldDefinitions() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Map getCompositeFieldSeparators() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isVirtualIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Map getVirtualNameAndIndex(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean shouldHaveBeenIndexed(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean shouldHaveBeenReverseIndexed(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java index c495b0d086..e52b409ed5 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java @@ -13,6 +13,7 @@ import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.NormalizedContentInterface; import datawave.ingest.data.config.NormalizedFieldAndValue; +import datawave.util.TypeRegistryTestSetup; public class AbstractNormalizerTest { public AbstractNormalizer normalizer = new AbstractNormalizer() { @@ -31,8 +32,7 @@ public String convertFieldRegex(String fieldName, String fieldRegex) { public void setUp() { Configuration conf = new Configuration(); conf.set("test" + TypeRegistry.INGEST_HELPER, TestBaseIngestHelper.class.getName()); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); normalizer.setup(TypeRegistry.getType("test"), "test", conf); } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java new file mode 100644 index 0000000000..7a39fcb1eb --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java @@ -0,0 +1,243 @@ +package datawave.ingest.mapreduce; + +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.easymock.EasyMockRule; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.config.BaseNormalizedContent; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValue; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.MinimalistIngestHelperImpl; +import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.writer.ContextWriter; + +/** + * This test class aims to test only the scenarios in which EventMapper encounters an error, to verify its behavior with the FieldSalvager interface. It's + * modeled after EventMapperTest. + */ +public class EventMapperSalvageFieldsOnErrorTest { + private static final String[] SALVAGED_FIELDS = {"ISBN", "author", "borrower", "dueDate", "libraryBranch"}; + private static final String[] SUPPLEMENTAL_FIELDS = {FieldHarvester.LOAD_DATE_FIELDNAME, FieldHarvester.RAW_FILE_FIELDNAME, + FieldHarvester.SEQUENCE_FILE_FIELDNAME}; + + @Rule + public EasyMockRule easyMockRule = new EasyMockRule(this); + + @Mock + private Mapper.Context mapContext; + + private Configuration conf; + private SimpleRawRecord record; + private EventMapper eventMapper; + + public void setupTest(String dataTypeHandler) throws Exception { + eventMapper = new EventMapper<>(); + + conf = new Configuration(); + conf.setClass(EventMapper.CONTEXT_WRITER_CLASS, TestContextWriter.class, ContextWriter.class); + EventMapperTest.setupMapContextMock(mapContext, conf); + + record = IngestTestSetup.createRecord(dataTypeHandler, conf); + } + + /** + * SalvagingDataTypeHandler provides a FieldSalvager implementation that deserializes rawData as a Map<String, String>, then returns a Multimap + * containing only the SALVAGED_FIELDS that were encountered, skipping all others. + */ + public static class SalvagingDataTypeHandler extends SimpleDataTypeHandler { + @Override + public IngestHelperInterface getHelper(Type datatype) { + MinimalistIngestHelperImpl fakeSalvagingIngestHelper = new MinimalistIngestHelperImpl() { + @Override + public Multimap getEventFields(RawRecordContainer value) { + throw new RuntimeException("Simulated exception while getting event fields for value."); + } + + @Override + public void getEventFields(RawRecordContainer value, Multimap fields) { + try { + fields.putAll(getEventFields(value)); + } catch (Exception exception) { + // salvage fields implementation + byte[] rawData = value.getRawData(); + + try (ByteArrayInputStream bytArrayInputStream = new ByteArrayInputStream(rawData); + ObjectInputStream objectInputStream = new ObjectInputStream(bytArrayInputStream);) { + Map deserializedData = (Map) objectInputStream.readObject(); + for (String fieldToSalvage : SALVAGED_FIELDS) { + String fieldValue = deserializedData.get(fieldToSalvage); + if (null != fieldValue) { + try { + fields.put(fieldToSalvage, new BaseNormalizedContent(fieldToSalvage, fieldValue)); + } catch (Exception fieldException) { + // skip this field and proceed to the next + } + } + } + } catch (IOException | ClassNotFoundException e) { + throw new IllegalStateException(e); + } + throw exception; + } + } + }; + return fakeSalvagingIngestHelper; + } + } + + /** + * NoopSalvagingSimpleDataTypeHandler provides a MinimalistIngestHelperImpl implementation that does nothing to fields. + */ + public static class NoopSalvagingSimpleDataTypeHandler extends SimpleDataTypeHandler { + @Override + public IngestHelperInterface getHelper(Type datatype) { + return new MinimalistIngestHelperImpl() { + @Override + public void getEventFields(RawRecordContainer value, Multimap fields) {} + }; + } + } + + @After + public void checkMock() { + verify(mapContext); + } + + @Test + public void shouldSalvageAllFields() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Add both salvageable and unsalvageable fields as rawData + HashMap fieldValues = createMapOfSalveagableFieldValues(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect only the salvageable fields, each exactly once + assertEquals(written.toString(), SALVAGED_FIELDS.length + SUPPLEMENTAL_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + for (String expectedFieldName : SALVAGED_FIELDS) { + assertTrue(fieldNameOccurrences.toString(), fieldNameOccurrences.containsKey(expectedFieldName)); + assertEquals(1, (int) fieldNameOccurrences.get(expectedFieldName)); + } + verifyContainsSupplementalFields(fieldNameOccurrences); + } + + private void verifyContainsSupplementalFields(Map fieldNameOccurrences) { + for (String expectedFieldName : SUPPLEMENTAL_FIELDS) { + assertTrue(fieldNameOccurrences.toString(), fieldNameOccurrences.containsKey(expectedFieldName)); + assertEquals(1, (int) fieldNameOccurrences.get(expectedFieldName)); + } + } + + @Test + public void shouldTolerateNullSalvagedFieldsMap() throws Exception { + // Use a DataTypeHandler that provides a FieldSalvager that always returns null + setupTest(NoopSalvagingSimpleDataTypeHandler.class.getName()); + + // Create a record with salvageable and unsalvageable fields + HashMap fieldValues = createMapOfSalveagableFieldValues(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect all of the salvageable fields to be missing but supplemental fields to appear + assertEquals(written.toString(), SUPPLEMENTAL_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + verifyContainsSupplementalFields(fieldNameOccurrences); + } + + @Test + public void shouldIgnoreNonSalvagedFields() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Add only unsalvagable fields + HashMap fieldValues = new HashMap<>(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect all of the salvageable fields to be missing but supplemental fields to appear + assertEquals(written.toString(), SUPPLEMENTAL_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + verifyContainsSupplementalFields(fieldNameOccurrences); + } + + @Test + public void shouldTolerateErrorInSalvager() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Set raw data to an invalid format, causing an error in the FieldSalvager implementation + record.setRawData("Not a map".getBytes()); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect all of the salvageable fields to be missing but supplemental fields to appear + assertEquals(written.toString(), SUPPLEMENTAL_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + verifyContainsSupplementalFields(fieldNameOccurrences); + } + + private void runMapper() throws IOException, InterruptedException { + eventMapper.setup(mapContext); + eventMapper.map(new LongWritable(1), record, mapContext); + eventMapper.cleanup(mapContext); + } + + private HashMap createMapOfSalveagableFieldValues() { + HashMap fieldValues = new HashMap<>(); + fieldValues.put("ISBN", "0-123-00000-1"); + fieldValues.put("dueDate", "20211126"); + fieldValues.put("author", "Henry Hope Reed"); + fieldValues.put("borrower", "Edward Clark Potter"); + fieldValues.put("libraryBranch", "Grand Central"); + return fieldValues; + } + + private void addUnsalvageableFieldsToMap(HashMap fieldValues) { + fieldValues.put("genre", "FICTION"); + fieldValues.put("format", "Hardcover"); + } + + /** + * Extracts field names from persisted data, creating a map containing the number of occurrences per field name. + */ + private Map countPersistedFieldsByName(Multimap entries) { + Map result = new HashMap<>(); + for (BulkIngestKey key : entries.keys()) { + String fieldName = key.getKey().getColumnFamily().toString(); + // create or increment + result.merge(fieldName, 1, Integer::sum); + } + return result; + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java index 39e6046c65..675754aa9e 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java @@ -41,7 +41,9 @@ import datawave.ingest.mapreduce.job.metrics.MetricsConfiguration; import datawave.ingest.mapreduce.job.metrics.TestEventCountMetricsReceiver; import datawave.ingest.mapreduce.job.writer.ContextWriter; +import datawave.util.TypeRegistryTestSetup; +// Also see EventMapperSalvageFieldsOnErrorTest public class EventMapperTest { @Rule @@ -63,12 +65,11 @@ public void setUp() throws Exception { conf = new Configuration(); conf.setClass(EventMapper.CONTEXT_WRITER_CLASS, TestContextWriter.class, ContextWriter.class); - Type type = new Type("file", null, null, new String[] {SimpleDataTypeHandler.class.getName()}, 10, null); - Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, new String[] {SimpleDataTypeHandler.class.getName()}, 20, null); + String[] defaultDataTypeHandlers = {SimpleDataTypeHandler.class.getName()}; - TypeRegistry registry = TypeRegistry.getInstance(conf); - registry.put(type.typeName(), type); - registry.put(errorType.typeName(), errorType); + Type type = new Type("file", null, null, defaultDataTypeHandlers, 10, null); + Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, defaultDataTypeHandlers, 20, null); + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type, errorType); Multimap fields = HashMultimap.create(); fields.put("fileExtension", new BaseNormalizedContent("fileExtension", "gz")); @@ -84,7 +85,7 @@ record = new SimpleRawRecord(); record.setRawData("some data".getBytes()); record.generateId(null); - errorRecord = new SimpleRawRecord(); + errorRecord = IngestTestSetup.createBasicRecord(eventTime, type); errorRecord.setRawFileTimestamp(0); errorRecord.setDataType(type); errorRecord.setTimestamp(eventTime); @@ -95,6 +96,10 @@ record = new SimpleRawRecord(); errorRecord.addError("EVENT_DATE_MISSING"); errorRecord.setFatalError(true); + EventMapperTest.setupMapContextMock(mapContext, conf); + } + + static void setupMapContextMock(Mapper.Context mapContext, Configuration conf) throws IOException, InterruptedException { expect(mapContext.getConfiguration()).andReturn(conf).anyTimes(); mapContext.progress(); @@ -408,7 +413,7 @@ private Map.Entry getMetric(Multimap w } private Map.Entry getRawFileName(Multimap written) { - return getFieldEntry(written, EventMapper.RAW_FILE_FIELDNAME.toString()); + return getFieldEntry(written, FieldHarvester.RAW_FILE_FIELDNAME.toString()); } private Map.Entry getFieldEntry(Multimap written, String field) { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java new file mode 100644 index 0000000000..7fcc92d9e9 --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java @@ -0,0 +1,481 @@ +package datawave.ingest.mapreduce; + +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.NDC; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValueTest; +import datawave.ingest.data.config.ingest.CompositeIngest; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.MinimalistIngestHelperImpl; +import datawave.ingest.data.config.ingest.VirtualIngest; + +public class FieldHarvesterTest { + private static final String SAMPLE_FIELD_NAME = "SAMPLE_FIELD_NAME"; + private static final String COMPOSITE_FIELD = "COMPOSITE_FIELD"; + private static final String VIRTUAL_FIELD = "VIRTUAL_FIELD"; + private static final String SEQ_FILENAME_ONLY = "InputFile.seq"; + private static final String SEQ_FILENAME_AND_DIR = "/input/directory/" + SEQ_FILENAME_ONLY; + private static final int NUM_SUPPLEMENTAL_FIELDS = 3; + private static final String LOAD_DATE = "LOAD_DATE"; + private static final String ORIG_FILE = "ORIG_FILE"; + private static final String RAW_FILE = "RAW_FILE"; + + private final Multimap fields = HashMultimap.create(); + private FieldHarvester fieldHarvester; + private long offset = 0; + private String splitStart = null; + private RawRecordContainer value; + + @Before + public void before() { + value = IngestTestSetup.createBasicRecord(); + value.setRawData(null); // rawData is ignored by below implementations of getEventFields + fieldHarvester = new FieldHarvester(new Configuration()); + NDC.push(SEQ_FILENAME_AND_DIR); + } + + @After + public void after() { + NDC.pop(); + } + + @Test + public void reusableFieldHarvester() throws Exception { + // The first call to extractFields produces an error, adding only supplemental fields + exceptionSwallowingExtractFields(fieldHarvester, fields, null, value, offset, splitStart); + + // Verify error is captured (NullPointerException because null provided as the IngestHelperInterface param) + assertExceptionCaptured(fieldHarvester, NullPointerException.class); + assertContainsOnlySupplementalFields(); + + // The second extractFields calls an IngestHelper that doesn't error + // There should be no residue from the prior call (prior errors cleared, prior fields cleared) + // field map with single field and value + fields.clear(); + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + assertNoErrors(fieldHarvester); + + // The third call is just like the first call, throwing an exception + fields.clear(); + exceptionSwallowingExtractFields(fieldHarvester, fields, null, value, offset, splitStart); + + // Verify error is captured (NullPointerException because null provided as IngestHelperInterface) + assertExceptionCaptured(fieldHarvester, NullPointerException.class); + assertContainsOnlySupplementalFields(); + } + + private void exceptionSwallowingExtractFields(FieldHarvester fieldHarvester, Multimap fields, + IngestHelperInterface ingestHelper, RawRecordContainer value, long offset, String splitStart) { + try { + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + } catch (Exception e) { + // expected case + return; + } + Assert.fail("Expected an exception"); + } + + @Test + public void disableSeqFileNameCreation() throws Exception { + // Configuration disables seq file name creation + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.LOAD_SEQUENCE_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(RAW_FILE)); + // excluded due to config + Assert.assertFalse(fields.containsKey(ORIG_FILE)); + Assert.assertEquals(fields.toString(), 3, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void disableTrimmingSeqFileName() throws Exception { + // Configuration disables trimming of seq file name + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.TRIM_SEQUENCE_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + Collection result = fields.get(ORIG_FILE); + NormalizedContentInterface fieldValue = result.iterator().next(); + Assert.assertEquals(SEQ_FILENAME_AND_DIR + "|0", fieldValue.getEventFieldValue()); + Assert.assertEquals(SEQ_FILENAME_AND_DIR + "|0", fieldValue.getIndexedFieldValue()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void enableTrimmingSeqFileName() throws Exception { + // Default configuration enables trimming of seq file name + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + Collection result = fields.get(ORIG_FILE); + NormalizedContentInterface fieldValue = result.iterator().next(); + Assert.assertEquals(SEQ_FILENAME_ONLY + "|0", fieldValue.getEventFieldValue()); + Assert.assertEquals(SEQ_FILENAME_ONLY + "|0", fieldValue.getIndexedFieldValue()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void disableRawFileName() throws Exception { + // Configuration disables raw file name creation + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.LOAD_RAW_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(ORIG_FILE)); + // excluded due to config + Assert.assertFalse(fields.containsKey(RAW_FILE)); + Assert.assertEquals(fields.toString(), 3, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void addsVirtualFields() throws Exception { + // Ensure that a virtual field is added + Multimap fields = createOneFieldMultiMap(); + IngestHelperInterface ingestHelper = new BasicWithVirtualFieldsIngestHelper(fields); + + // field map with single field and value + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // Verify field returned + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + // Verify field is used for virtual field creation + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), 5, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void addsCompositeFields() throws Exception { + // cause exception in getEventFields, get salvaged fields and ensure they're used for composite field creation + Multimap salvagableFields = createOneFieldMultiMap(); + IngestHelperInterface ingestHelper = new BasicWithCompositeFieldsIngestHelper(salvagableFields); + + // field map with single field and value + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // Verify field returned + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + // Verify field is used for composite + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + COMPOSITE_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), 5, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void supplementsSalvagedFields() { + // cause exception in getEventFields, get salvaged fields and ensure they're used for virtual and composite + Multimap salvagableFields = createOneFieldMultiMap(); + ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(salvagableFields); + + // field map with single field and value + exceptionSwallowingExtractFields(fieldHarvester, fields, ingestHelper, value, offset, splitStart); + + // Verify salvaged fields returned + // Verify salvaged fields are used for virtual, composite + // Not virtual field is used by composite field implementation + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + COMPOSITE_FIELD)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD + COMPOSITE_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), 7, fields.size()); + + // Verify an exception was detected (for getEventFields) + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void emptySalvagedFields() { + // cause exception in getEventFields, causing retrieval of empty multimap of salvaged fields + ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(HashMultimap.create()); + + // field map with empty fields + exceptionSwallowingExtractFields(fieldHarvester, fields, ingestHelper, value, offset, splitStart); + + // empty salvaged fields + assertContainsOnlySupplementalFields(); + + // Verify an exception was detected (for getEventFields) + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void doubleException() { + // exception in both getEventFields implementations + exceptionSwallowingExtractFields(fieldHarvester, fields, new MinimalistIngestHelperImpl(), value, offset, splitStart); + + // Verify it contains expected fields + assertContainsOnlySupplementalFields(); + + // Verify the original exception was captured + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void extractFields() throws Exception { + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + assertNoErrors(fieldHarvester); + } + + @Test + public void erroredFieldExcluded() { + // create a field containing a field error + NormalizedFieldAndValueTest.NonGroupedInstance fieldWithError = new NormalizedFieldAndValueTest.NonGroupedInstance(); + fieldWithError.setError(new Exception()); + + // field map contains a field containing a field error + Multimap multiMap = HashMultimap.create(); + multiMap.put(SAMPLE_FIELD_NAME, fieldWithError); + + exceptionSwallowingExtractFields(fieldHarvester, fields, new BasicIngestHelper(multiMap), this.value, offset, splitStart); + + // Verify fields contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + // Verify there was a FieldNormalizationError due to the field error + assertExceptionCaptured(this.fieldHarvester, FieldHarvester.FieldNormalizationError.class); + } + + @Test + public void nullIngestHelper() { + // field map with single field and value + exceptionSwallowingExtractFields(fieldHarvester, fields, null, value, offset, splitStart); + + // Verify it contains expected fields + assertContainsOnlySupplementalFields(); + + // Verify it captured the null pointer exception + assertExceptionCaptured(this.fieldHarvester, NullPointerException.class); + } + + private void assertNoErrors(FieldHarvester fieldHarvester) { + Assert.assertFalse("Unexpected exception: " + fieldHarvester.getOriginalException(), fieldHarvester.hasError()); + Assert.assertNull(fieldHarvester.getOriginalException()); + } + + private void assertExceptionCaptured(FieldHarvester fieldHarvester, Class exceptionClass) { + Assert.assertTrue(fieldHarvester.hasError()); + Assert.assertEquals(exceptionClass, fieldHarvester.getOriginalException().getClass()); + } + + private Multimap createOneFieldMultiMap() { + Multimap multiMap = HashMultimap.create(); + multiMap.put(SAMPLE_FIELD_NAME, new NormalizedFieldAndValueTest.NonGroupedInstance()); + return multiMap; + } + + private void assertContainsSupplementalFields(Multimap fields) { + Assert.assertTrue(fields.toString(), fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.toString(), fields.containsKey(ORIG_FILE)); + Assert.assertTrue(fields.toString(), fields.containsKey(RAW_FILE)); + } + + private void assertContainsOnlySupplementalFields() { + assertContainsSupplementalFields(fields); + // and only supplemental + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS, fields.size()); + } + + private static class BasicIngestHelper extends MinimalistIngestHelperImpl { + private final Multimap multiMap; + + public BasicIngestHelper(Multimap multiMap) { + this.multiMap = multiMap; + } + + @Override + public Multimap getEventFields(RawRecordContainer event) { + return multiMap; + } + + @Override + public void getEventFields(RawRecordContainer event, Multimap fields) { + fields.putAll(getEventFields(event)); + } + } + + private static class BasicWithCompositeFieldsIngestHelper extends BasicIngestHelper implements CompositeIngest { + public BasicWithCompositeFieldsIngestHelper(Multimap multiMap) { + super(multiMap); + } + + @Override + public void setCompositeFieldDefinitions(Multimap compositeFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + COMPOSITE_FIELD, entry.getValue()); + } + return compositeFields; + } + } + + private static class BasicWithVirtualFieldsIngestHelper extends BasicIngestHelper implements VirtualIngest { + public BasicWithVirtualFieldsIngestHelper(Multimap multiMap) { + super(multiMap); + } + + @Override + public Map getVirtualFieldDefinitions() { + return null; + } + + @Override + public void setVirtualFieldDefinitions(Map virtualFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public String getDefaultVirtualFieldSeparator() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDefaultVirtualFieldSeparator(String separator) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getVirtualFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + VIRTUAL_FIELD, entry.getValue()); + } + return compositeFields; + } + } + + private static class ErroringSalvagableIngestHelper extends MinimalistIngestHelperImpl implements VirtualIngest, CompositeIngest { + private final Multimap multiMap; + + ErroringSalvagableIngestHelper(Multimap multiMap) { + this.multiMap = multiMap; + } + + @Override + public void getEventFields(RawRecordContainer value, Multimap fields) { + try { + fields.putAll(getEventFields(value)); + } catch (Exception e) { + fields.putAll(this.multiMap); + throw e; + } + } + + @Override + public void setCompositeFieldDefinitions(Multimap compositeFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + COMPOSITE_FIELD, entry.getValue()); + } + return compositeFields; + } + + @Override + public Map getVirtualFieldDefinitions() { + return null; + } + + @Override + public void setVirtualFieldDefinitions(Map virtualFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public String getDefaultVirtualFieldSeparator() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDefaultVirtualFieldSeparator(String separator) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getVirtualFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + VIRTUAL_FIELD, entry.getValue()); + } + return compositeFields; + } + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java new file mode 100644 index 0000000000..21382a42dd --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java @@ -0,0 +1,66 @@ +package datawave.ingest.mapreduce; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +import org.apache.hadoop.conf.Configuration; + +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.TypeRegistry; +import datawave.util.TypeRegistryTestSetup; + +/** + * Test class containing some reusable test code for creating a test Type and test records. + */ +public class IngestTestSetup { + + static SimpleRawRecord createRecord(String dataTypeHandler, Configuration conf) { + Type type = setupTypeAndTypeRegistry(dataTypeHandler, conf); + long eventTime = System.currentTimeMillis(); + return createBasicRecord(eventTime, type); + } + + public static RawRecordContainer createBasicRecord() { + Type type = IngestTestSetup.createBasicType(); + long eventTime = System.currentTimeMillis(); + return createBasicRecord(eventTime, type); + } + + static Type setupTypeAndTypeRegistry(String dataTypeHandler, Configuration conf) { + String[] defaultDataTypeHandlers = {dataTypeHandler}; + Type type = new Type("file", null, null, defaultDataTypeHandlers, 10, null); + + String[] errorDataTypeHandlers = {SimpleDataTypeHandler.class.getName()}; + Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, errorDataTypeHandlers, 20, null); + + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type, errorType); + return type; + } + + static Type createBasicType() { + return createBasicType(new String[] {}); + } + + static Type createBasicType(String[] defaultDataTypeHandlers) { + return new Type("file", null, null, defaultDataTypeHandlers, 10, null); + } + + static SimpleRawRecord createBasicRecord(long eventTime, Type type) { + SimpleRawRecord result = new SimpleRawRecord(); + result.setDate(eventTime); + result.setRawFileTimestamp(eventTime); + result.setDataType(type); + result.setRawFileName("/some/filename"); + result.setRawData("some data".getBytes()); + result.generateId(null); + return result; + } + + static byte[] objectToRawBytes(Object map) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + new ObjectOutputStream(outputStream).writeObject(map); + return outputStream.toByteArray(); + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java index 6dfd775866..80a460ad9b 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/SimpleDataTypeHelper.java @@ -44,7 +44,13 @@ public static IngestHelperInterface create() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("getEventFields")) { - return fields; + if (args.length == 1) { + return fields; + } else { + // see getEventFields(value, fields) + ((Multimap) args[1]).putAll(fields); + return Void.TYPE; + } } else { throw new UnsupportedOperationException( "Sorry, " + this.getClass() + " does not currently support the " + method.getName() + " method. Feel free to implement it!"); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java index ca13486bb7..d0bcfaadbd 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java @@ -31,6 +31,7 @@ import datawave.ingest.table.config.DateIndexTableConfigHelper; import datawave.policy.IngestPolicyEnforcer; import datawave.util.TableName; +import datawave.util.TypeRegistryTestSetup; public class DateIndexDataTypeHandlerTest { @@ -58,8 +59,7 @@ public void setup() throws Exception { conf.set("testdatatype.date.index.type.to.field.map", "ACTIVITY=ACTIVITY_DATE,LOADED=LOAD_DATE"); conf.set("all" + Properties.INGEST_POLICY_ENFORCER_CLASS, IngestPolicyEnforcer.NoOpIngestPolicyEnforcer.class.getName()); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); handler = new DateIndexDataTypeHandler<>(); handler.setup(new TaskAttemptContextImpl(conf, new TaskAttemptID())); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java index cb85541f9b..41ef2c9522 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java @@ -39,7 +39,6 @@ import datawave.ingest.config.RawRecordContainerImpl; import datawave.ingest.data.RawRecordContainer; import datawave.ingest.data.Type; -import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.BaseNormalizedContent; import datawave.ingest.data.config.GroupedNormalizedContentInterface; import datawave.ingest.data.config.NormalizedContentInterface; @@ -53,6 +52,7 @@ import datawave.ingest.mapreduce.job.writer.ContextWriter; import datawave.ingest.time.Now; import datawave.metadata.protobuf.EdgeMetadata; +import datawave.util.TypeRegistryTestSetup; import datawave.util.time.DateHelper; public class ProtobufEdgeDeleteModeTest { @@ -119,13 +119,11 @@ public static void tearDown() { @Before public void setup() { - TypeRegistry.reset(); conf = new Configuration(); conf.addResource(ClassLoader.getSystemResource("config/all-config.xml")); conf.addResource(ClassLoader.getSystemResource("config/edge-ingest-config.xml")); conf.addResource(ClassLoader.getSystemResource("config/metadata-config.xml")); - TypeRegistry registry = TypeRegistry.getInstance(conf); - registry.put(type.typeName(), type); + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type); } private RawRecordContainer getEvent(Configuration conf) { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java index e87bde0a22..3af84b0205 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java @@ -27,6 +27,7 @@ import datawave.data.hash.UID; import datawave.ingest.data.TypeRegistry; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; +import datawave.util.TypeRegistryTestSetup; public class CBMutationOutputFormatterTest { @@ -58,7 +59,6 @@ protected boolean processOutputContains(List output, String message) { public void setup() { testDriverLevel = CBMutationOutputFormatterTest.logger.getLevel(); CBMutationOutputFormatterTest.logger.setLevel(Level.ALL); - TypeRegistry.reset(); } @After @@ -311,6 +311,8 @@ public void testRecordWriterWriteWithUpdatesAndTypes() throws IOException, Inter conf.addResource(url); + TypeRegistryTestSetup.resetTypeRegistry(conf); + TypeRegistry.getInstance(conf); OutputConfigurator.setSimulationMode(AccumuloOutputFormat.class, conf, true); @@ -368,6 +370,8 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTyped() throws IOExcept conf.addResource(url); + TypeRegistryTestSetup.resetTypeRegistry(conf); + TypeRegistry.getInstance(conf); OutputConfigurator.setSimulationMode(AccumuloOutputFormat.class, conf, true); @@ -425,6 +429,8 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithUID() throws I conf.addResource(url); + TypeRegistryTestSetup.resetTypeRegistry(conf); + TypeRegistry.getInstance(conf); OutputConfigurator.setSimulationMode(AccumuloOutputFormat.class, conf, true); @@ -484,6 +490,8 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithBadUID() throw conf.addResource(url); + TypeRegistryTestSetup.resetTypeRegistry(conf); + TypeRegistry.getInstance(conf); OutputConfigurator.setSimulationMode(AccumuloOutputFormat.class, conf, true); @@ -543,6 +551,8 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithoutUID() throw conf.addResource(url); + TypeRegistryTestSetup.resetTypeRegistry(conf); + TypeRegistry.getInstance(conf); OutputConfigurator.setSimulationMode(AccumuloOutputFormat.class, conf, true); diff --git a/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java b/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java new file mode 100644 index 0000000000..ccc6cca26a --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java @@ -0,0 +1,21 @@ +package datawave.util; + +import org.apache.hadoop.conf.Configuration; + +import datawave.ingest.data.Type; +import datawave.ingest.data.TypeRegistry; + +public class TypeRegistryTestSetup { + public static TypeRegistry resetTypeRegistry(Configuration conf) { + TypeRegistry.reset(); + return TypeRegistry.getInstance(conf); + } + + public static TypeRegistry resetTypeRegistryWithTypes(Configuration conf, Type... types) { + TypeRegistry registry = resetTypeRegistry(conf); + for (Type type : types) { + registry.put(type.typeName(), type); + } + return registry; + } +}