From 95563190d3f6022ee4e8d1ba12e06a22aa7e4e42 Mon Sep 17 00:00:00 2001 From: foster33 <84727868+foster33@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:47:23 -0500 Subject: [PATCH] Fix erroneous LastUpdated timestamps appearing in data dictionary (#2489) * Remove code that makes incorrect timestamps appear in the Data Dictionary * Add unit test to validate incorrect timestamps do not appear. --------- Co-authored-by: palindrome <31748527+hlgp@users.noreply.github.com> --- .../BulkIngestKeyAggregatingReducer.java | 9 ---- .../BulkIngestKeyAggregatingReducerTest.java | 47 +++++++++++++------ 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java index 1eed6e5f53b..922431869c6 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java @@ -182,15 +182,6 @@ public void doReduce(BulkIngestKey key, Iterable values, TaskInputOutputC } ctx.getCounter(IngestOutput.TIMESTAMP_DUPLICATE).increment(duplicates); } else { - /** - * Aggregator values if ts < 0, it is a by product of the ts deduper (combiner) - * - */ - ts = outKey.getKey().getTimestamp(); - - if (usingCombiner && (ts < 0)) { - outKey.getKey().setTimestamp(-1 * ts * MILLISPERDAY); - } Iterator valueItr = values.iterator(); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducerTest.java index b66faa8c54f..116338c896d 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducerTest.java @@ -1,13 +1,6 @@ package datawave.ingest.mapreduce.job.reduce; -import static datawave.ingest.config.TableConfigCache.ACCUMULO_CONFIG_CACHE_PATH_PROPERTY; -import static datawave.ingest.config.TableConfigCache.DEFAULT_ACCUMULO_CONFIG_CACHE_PATH; -import static datawave.ingest.data.config.ingest.AccumuloHelper.INSTANCE_NAME; -import static datawave.ingest.data.config.ingest.AccumuloHelper.PASSWORD; -import static datawave.ingest.data.config.ingest.AccumuloHelper.USERNAME; -import static datawave.ingest.data.config.ingest.AccumuloHelper.ZOOKEEPERS; import static datawave.ingest.mapreduce.job.TableConfigurationUtil.ITERATOR_CLASS_MARKER; -import static datawave.ingest.mapreduce.job.reduce.AggregatingReducer.INGEST_VALUE_DEDUP_AGGREGATION_KEY; import static datawave.ingest.mapreduce.job.reduce.AggregatingReducer.MILLISPERDAY; import static datawave.ingest.mapreduce.job.reduce.AggregatingReducer.USE_AGGREGATOR_PROPERTY; import static datawave.ingest.mapreduce.job.reduce.BulkIngestKeyAggregatingReducer.CONTEXT_WRITER_CLASS; @@ -19,7 +12,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -32,7 +24,6 @@ import org.apache.accumulo.core.iterators.Combiner; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.commons.math3.analysis.function.Pow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; @@ -41,13 +32,10 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.counters.GenericCounter; -import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; -import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.api.easymock.PowerMock; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -56,9 +44,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; -import datawave.ingest.config.TableConfigCache; -import datawave.ingest.data.config.ConfigurationHelper; -import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.ingest.mapreduce.job.BulkIngestKey; import datawave.ingest.mapreduce.job.TableConfigurationUtil; import datawave.ingest.mapreduce.job.writer.BulkContextWriter; @@ -88,6 +73,7 @@ public class BulkIngestKeyAggregatingReducerTest { private Counter tab2Counter; private Counter tab3Counter; private Counter combinerCounter; + private Counter negativeTimestampCounter; private Counter dupCounter; private int expectedDuplicateKey; @@ -98,6 +84,7 @@ public class BulkIngestKeyAggregatingReducerTest { private int expectedTab2Counter; private int expectedTab3Counter; private int expectedCombinerCounter; + private int expectedNegativeTimestampCounter; private int expectedDupCounter; private TaskID taskID; @@ -121,6 +108,7 @@ public void setup() throws Exception { tab2Counter = (Counter) new GenericCounter(); tab3Counter = (Counter) new GenericCounter(); combinerCounter = (Counter) new GenericCounter(); + negativeTimestampCounter = (Counter) new GenericCounter(); dupCounter = (Counter) new GenericCounter(); expectedDuplicateKey = 0; @@ -131,6 +119,7 @@ public void setup() throws Exception { expectedTab2Counter = 0; expectedTab3Counter = 0; expectedCombinerCounter = 0; + expectedNegativeTimestampCounter = 0; expectedDupCounter = 0; conf = (Configuration) PowerMockito.mock(Configuration.class); @@ -255,6 +244,7 @@ private void checkCounterValues() { assertEquals(expectedTab2Counter, tab2Counter.getValue()); assertEquals(expectedTab3Counter, tab3Counter.getValue()); assertEquals(expectedCombinerCounter, combinerCounter.getValue()); + assertEquals(expectedNegativeTimestampCounter, negativeTimestampCounter.getValue()); } @Test @@ -554,6 +544,29 @@ public void testUsingCombinerWithVerbosePartitioningCounters() throws Exception assertEquals(expected, output); } + @Test + public void testUsingCombinerWithNegativeTimestamps() throws Exception { + setupUsingCombiner(); + reducer.setup(conf); + + performDoReduce("table1", "r1", 4, -3 * MILLISPERDAY + MILLISPERDAY / 2, ExpectedValueType.COMBINED_VALUES); + performDoReduce("table1", "r2", 3, 3 * MILLISPERDAY + MILLISPERDAY / 3, ExpectedValueType.COMBINED_VALUES); + performDoReduce("table1", "r3", 1, -3 * MILLISPERDAY, ExpectedValueType.COMBINED_VALUES); + performDoReduce("table2", "r1", 2, -2 * MILLISPERDAY + MILLISPERDAY, ExpectedValueType.FIRST_VALUE); + performDoReduce("table2", "r2", 0, -2 * MILLISPERDAY + MILLISPERDAY, ExpectedValueType.ALL_VALUES); + performDoReduce("table2", "r3", 3, -2 * MILLISPERDAY + MILLISPERDAY / 3, ExpectedValueType.FIRST_VALUE); + performDoReduce("table3", "r1", 3, -4 * MILLISPERDAY + MILLISPERDAY / 3, ExpectedValueType.COMBINED_VALUES); + performDoReduce("table3", "r2", 0, -4 * MILLISPERDAY, ExpectedValueType.COMBINED_VALUES); + performDoReduce("table1", "r1", 4, 4 * MILLISPERDAY + MILLISPERDAY / 2, ExpectedValueType.COMBINED_VALUES); + performDoReduce("table1", "r2", 3, 2 * MILLISPERDAY + MILLISPERDAY, ExpectedValueType.COMBINED_VALUES); + + expectedDuplicateKey = 2; + expectedCombinerCounter = 7; + expectedNegativeTimestampCounter = 7; + checkCounterValues(); + assertEquals(expected, output); + } + private void performDoReduce(String table, String row, int numberOfValues) throws Exception { performDoReduce(table, row, numberOfValues, 1L, ExpectedValueType.FIRST_VALUE); } @@ -586,6 +599,10 @@ private void performDoReduce(String table, String row, int numberOfValues, long } reducer.doReduce(bulkIngestKey, values, context); + + if (bulkIngestKey.getKey().getTimestamp() < 0) { + negativeTimestampCounter.increment(1); + } } public static Value combineValues(Iterator iter) {