Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1438 - optionally gather fields after ingest exception #1439

Open
wants to merge 47 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
45df65f
Fix #1438 - optionally gather fields after ingest exception
Feb 24, 2022
8638428
Fix #1438: remove getFields method after researching extensions
Feb 28, 2022
5c7359d
Fix #1438: consolidate FieldHarvester methods and adjust tests
Feb 28, 2022
539b3a2
Fix #1438: preserve original exception instead of wrapping it
Feb 28, 2022
9f09371
Fix #1438: adjust Mapper test to reflect supplemental fields addition…
Feb 28, 2022
a4015f9
Fix #1438: initial PR feedback
Mar 14, 2022
e17f5b3
Fix #1438: change interface as per PR feedback
Mar 14, 2022
d0389ab
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
70d7d1d
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
342c896
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
bf987e3
Branch was auto-updated on change of target.
github-actions[bot] Apr 28, 2022
64cd680
Branch was auto-updated on change of target.
github-actions[bot] Apr 28, 2022
1e1fe86
Branch was auto-updated on change of target.
datawave-bot-builder Apr 28, 2022
ea08a5b
Branch was auto-updated on change of target.
datawave-bot-builder May 2, 2022
09931d7
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
d426b49
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
cabe5f3
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
d02de12
Branch was auto-updated on change of target.
datawave-bot-builder May 10, 2022
99d7027
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
6be4d92
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
edff00d
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
3c76aa1
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
c37c3b0
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
3a4f54b
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
94bac43
Updating for current integration
mineralntl Nov 13, 2024
88b6595
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 13, 2024
dca9a10
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 18, 2024
7c6561d
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 20, 2024
044ccb1
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 20, 2024
54bd847
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 22, 2024
e38ae66
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 25, 2024
068ab7a
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 27, 2024
94c6b7f
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 4, 2024
1d38126
GitHub Actions: Fix Formatting
Dec 4, 2024
06fea78
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 4, 2024
090c411
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 6, 2024
6c99563
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 6, 2024
ce5887a
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 6, 2024
d0db7fc
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 9, 2024
ac12bf0
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 11, 2024
5ad3e96
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 12, 2024
b69628a
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 16, 2024
5b57f9c
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 18, 2024
751d71d
Merge branch 'integration' into task/resilient-error-handling
mineralntl Dec 31, 2024
d3676fa
Merge branch 'integration' into task/resilient-error-handling
mineralntl Jan 8, 2025
4c9e36a
Merge branch 'integration' into task/resilient-error-handling
hlgp Jan 9, 2025
68427e2
Merge branch 'integration' into task/resilient-error-handling
mineralntl Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import datawave.data.type.OneToManyNormalizerType;
import datawave.ingest.config.IngestConfiguration;
import datawave.ingest.config.IngestConfigurationFactory;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.Type;
import datawave.ingest.data.TypeRegistry;
import datawave.ingest.data.config.DataTypeHelperImpl;
Expand Down Expand Up @@ -697,6 +698,12 @@ protected NormalizedContentInterface normalizeFieldValue(NormalizedContentInterf
return copy;
}

@Override
public void getEventFields(RawRecordContainer value, Multimap<String,NormalizedContentInterface> fields) {
// default implementation calls legacy method
fields.putAll(this.getEventFields(value));
}

/**
* This is a helper routine that will create a normalize a field out of a base normalized field.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ default boolean isShardExcluded(String fieldName) {
}

/**
* Fully parse the raw record and return a map of field names and values.
*
* @param value
* @return
* Deprecated. Use #getEventFields(value, fields)
*/
@Deprecated
Multimap<String,NormalizedContentInterface> getEventFields(RawRecordContainer value);

/**
* Fully parse the raw record and update the provided multimap of field names and values, with a partial update in the event of an exception.
*
* @param value
* @param fields
*/
void getEventFields(RawRecordContainer value, Multimap<String,NormalizedContentInterface> fields);

Multimap<String,NormalizedContentInterface> normalizeMap(Multimap<String,NormalizedContentInterface> fields);

Multimap<String,NormalizedContentInterface> normalize(Multimap<String,String> fields);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package datawave.ingest.mapreduce;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import datawave.data.normalizer.DateNormalizer;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.NormalizedFieldAndValue;
import datawave.ingest.data.config.ingest.CompositeIngest;
import datawave.ingest.data.config.ingest.FilterIngest;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.data.config.ingest.VirtualIngest;
import datawave.ingest.time.Now;
import datawave.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;

import java.util.Date;
import java.util.Map;

/**
* Encapsulates the logic for extracting fields from a record, making use of a provided IngestHelperInterface. Generates virtual fields, composite fields, and
* supplemental fields (like LOAD_DATE, ORIG_FILE, and RAW_FILE). Some logic for handling errors is also included here: extracting salvagable fields if any
* exception occurs, and detecting if there were field errors (indicating a normalization failure).
*/
public class FieldHarvester {
private static final Logger log = Logger.getLogger(FieldHarvester.class);

private static Now now = Now.getInstance();

public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE";
public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE";
public static final String RAW_FILE_FIELDNAME = "RAW_FILE";
public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename";
public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename";
public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename";

private boolean createSequenceFileName;
private boolean trimSequenceFileName;
private boolean createRawFileName;
private final DateNormalizer dateNormalizer = new DateNormalizer();

private static final String SRC_FILE_DEL = "|";
private Exception originalException;

public FieldHarvester(Configuration configuration) {
this.createSequenceFileName = configuration.getBoolean(LOAD_SEQUENCE_FILE_NAME, true);
this.trimSequenceFileName = configuration.getBoolean(TRIM_SEQUENCE_FILE_NAME, true);
this.createRawFileName = configuration.getBoolean(LOAD_RAW_FILE_NAME, true);
}

/**
* Updates "fields" with extracted, derived, and automatically generated fields. Will capture exception along the way and attempt to add salvaged fields
* before rethrowing the exception.
*
* @param fields
* the Multimap to modify with extracted and generated fields
* @param ingestHelper
* interface to use for field extraction
* @param value
* the record from which the fields will be extracted
* @param offset
* record offset within the source file
* @param splitStart
* the splitStart for the record
*/
public void extractFields(Multimap<String,NormalizedContentInterface> fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset,
String splitStart) throws Exception {
// reset exception-in-extraction tracking
this.originalException = null;

// "candidateFields" holds the fields that will eventually be added to "fields"
Multimap<String,NormalizedContentInterface> candidateFields = HashMultimap.create();

try {
// parse the record into its candidate field names and values using the IngestHelperInterface.
ingestHelper.getEventFields(value, candidateFields);
} catch (Exception exception) {
// delay throwing the exception to attempt salvaging
this.originalException = exception;
}

try {
// try adding supplemental fields to candidateFields, whether or not they were complete
addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields);
} catch (Exception exception) {
if (null == this.originalException) {
this.originalException = exception;
} else {
// preserve original exception and log the latest exception
log.error("A secondary exception occurred while adding supplemental fields", exception);
}
}

// add candidateFields to fields, even if there was an error
// identify if any individual fields contain an error
addFieldsAndDetectFieldErrors(fields, candidateFields);

if (null != this.originalException) {
log.error("Rethrowing original exception after completing field extraction.");
throw originalException;
}
}

@VisibleForTesting
boolean hasError() {
return null != this.originalException;
}

@VisibleForTesting
Exception getOriginalException() {
return this.originalException;
}

private void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper,
Multimap<String,NormalizedContentInterface> fields) {
addVirtualFields(ingestHelper, fields);
addCompositeFields(ingestHelper, fields);
addLoadDateField(fields);
addFileNameFields(value, offset, splitStart, fields);
applyFieldFilters(ingestHelper, fields);
}

private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the virtual fields, if applicable.
if (null != newFields && ingestHelper instanceof VirtualIngest) {
VirtualIngest vHelper = (VirtualIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> virtualFields = vHelper.getVirtualFields(newFields);
for (Map.Entry<String,NormalizedContentInterface> v : virtualFields.entries())
newFields.put(v.getKey(), v.getValue());
}
}

private void addCompositeFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the composite fields, if applicable
if (null != newFields && ingestHelper instanceof CompositeIngest) {
CompositeIngest vHelper = (CompositeIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> compositeFields = vHelper.getCompositeFields(newFields);
for (String fieldName : compositeFields.keySet()) {
// if this is an overloaded composite field, we are replacing the existing field data
if (vHelper.isOverloadedCompositeField(fieldName)) {
newFields.removeAll(fieldName);
}
newFields.putAll(fieldName, compositeFields.get(fieldName));
}
}
}

private void addLoadDateField(Multimap<String,NormalizedContentInterface> newFields) {
// Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes
long loadDate = now.get();
NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate));
// set an indexed field value for use by the date index data type handler
loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate)));
newFields.put(LOAD_DATE_FIELDNAME, loadDateValue);
}

private void addRawFileField(RawRecordContainer value, Multimap<String,NormalizedContentInterface> newFields, String seqFileName) {
if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) {
newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName()));
}
}

private void addOrigFileField(Multimap<String,NormalizedContentInterface> newFields, long offset, String splitStart, String seqFileName) {
if (null != seqFileName) {
StringBuilder seqFile = new StringBuilder(seqFileName);

seqFile.append(SRC_FILE_DEL).append(offset);

if (null != splitStart) {
seqFile.append(SRC_FILE_DEL).append(splitStart);
}

newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString()));
}
}

private String getSeqFileName() {
String seqFileName;
seqFileName = NDC.peek();

if (trimSequenceFileName) {
seqFileName = StringUtils.substringAfterLast(seqFileName, "/");
}
return seqFileName;
}

private void addFileNameFields(RawRecordContainer value, long offset, String splitStart, Multimap<String,NormalizedContentInterface> newFields) {
String seqFileName = null;

if (createSequenceFileName) {
seqFileName = getSeqFileName();

// place the sequence filename into the event
addOrigFileField(newFields, offset, splitStart, seqFileName);
}

addRawFileField(value, newFields, seqFileName);
}

private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also if this helper needs to filter the fields before returning, apply now
if (ingestHelper instanceof FilterIngest) {
FilterIngest fHelper = (FilterIngest) ingestHelper;
fHelper.filter(newFields);
}
}

/**
* Adds candidateFields to fields. Looks at each of the candidate fields, inspection for field errors. Sets the field harvester's exception field if any
* field errors were found.
*/
private void addFieldsAndDetectFieldErrors(Multimap<String,NormalizedContentInterface> fields, Multimap<String,NormalizedContentInterface> candidateFields) {
if (null == candidateFields) {
return;
}
Throwable fieldError = null;
for (Map.Entry<String,NormalizedContentInterface> entry : candidateFields.entries()) {
// noinspection ThrowableResultOfMethodCallIgnored
if (null != entry.getValue().getError()) {
fieldError = entry.getValue().getError();
}
fields.put(entry.getKey(), entry.getValue());
}
if (null != fieldError) {
if (null == this.originalException) {
this.originalException = new FieldNormalizationError("Failed getting all fields", fieldError);
} else {
// preserve original exception
log.error("A field exception was observed while adding fields", fieldError);
}
}
}

public static class FieldNormalizationError extends Exception {

private static final long serialVersionUID = 1L;

public FieldNormalizationError(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import datawave.ingest.data.config.GroupedNormalizedContentInterface;
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.mapreduce.EventMapper;
import datawave.ingest.mapreduce.FieldHarvester;
import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDataBundle;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDefinition;
Expand Down Expand Up @@ -500,7 +500,7 @@ public long process(KEYIN key, RawRecordContainer event, Multimap<String,Normali
}

// Get the load date of the event from the fields map
Collection<NormalizedContentInterface> loadDates = fields.get(EventMapper.LOAD_DATE_FIELDNAME);
Collection<NormalizedContentInterface> loadDates = fields.get(FieldHarvester.LOAD_DATE_FIELDNAME);
if (!loadDates.isEmpty()) {
NormalizedContentInterface nci = loadDates.iterator().next();
Date date = new Date(Long.parseLong(nci.getEventFieldValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datawave.ingest.config.RawRecordContainerImpl;
import datawave.ingest.data.config.MarkingsHelper;

import datawave.util.TypeRegistryTestSetup;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -53,8 +54,7 @@ public void setUp() throws Exception {
conf.set("samplecsv" + TypeRegistry.INGEST_HELPER, TestCSVIngestHelper.class.getName());
conf.set("samplecsv.reader.class", TestCSVReader.class.getName());
conf.set("samplecsv" + MarkingsHelper.DEFAULT_MARKING, "PUBLIC|PRIVATE");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
dataType = TypeRegistry.getType("samplecsv");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package datawave.ingest.data.config;

import datawave.ingest.data.TypeRegistry;

import datawave.policy.IngestPolicyEnforcer;
import datawave.util.TypeRegistryTestSetup;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -25,8 +24,7 @@ public void setup() {
@Test(expected = IllegalArgumentException.class)
public void testInvalidConfig() {
DataTypeHelperImpl helper = new DataTypeHelperImpl();
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
helper.setup(conf);
}

Expand All @@ -36,8 +34,7 @@ public void testValidConfig() throws Exception {
Assert.assertNotNull(configStream);
conf.addResource(configStream);
Assert.assertThat(conf.get("data.name"), is("fake"));
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand All @@ -54,8 +51,7 @@ public void testDowncaseFields() throws Exception {
Assert.assertNotNull(configStream);
conf.addResource(configStream);
conf.set("fake" + DataTypeHelper.Properties.DOWNCASE_FIELDS, "one,two,three,FOUR");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static class NonGroupedInstance implements NormalizedContentInterface {
private Map<String,String> _markings;
private Throwable _error;

protected NonGroupedInstance() {
public NonGroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down Expand Up @@ -141,7 +141,7 @@ public static class GroupedInstance implements GroupedNormalizedContentInterface
private String _group;
private String _subGroup;

protected GroupedInstance() {
public GroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down
Loading