From 791712b0841766bcdf33e69de78510a5ebf5e032 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Sun, 24 Nov 2024 20:50:16 -0800 Subject: [PATCH] Added unit tests --- .../venice/hadoop/PushJobSetting.java | 2 +- .../linkedin/venice/hadoop/VenicePushJob.java | 2 +- .../datawriter/jobs/DataWriterMRJob.java | 4 +- .../datawriter/reduce/VeniceReducer.java | 20 +++++++ .../datawriter/AbstractPartitionWriter.java | 5 +- .../jobs/AbstractDataWriterSparkJob.java | 4 +- .../venice/hadoop/VenicePushJobTest.java | 44 ++++++++++++++ .../datawriter/reduce/TestVeniceReducer.java | 57 +++++++++++++++++++ .../{update => }/CompositeVeniceWriter.java | 6 +- .../writer/CompositeVeniceWriterTest.java | 52 +++++++++++++++++ 10 files changed, 183 insertions(+), 13 deletions(-) rename internal/venice-common/src/main/java/com/linkedin/venice/writer/{update => }/CompositeVeniceWriter.java (95%) create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java index 984a9f2053e..a5e62310d0c 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java @@ -158,5 +158,5 @@ public PushJobSetting() { this.jobStartTimeMs = System.currentTimeMillis(); } - public String viewConfigFlatMap; + public String materializedViewConfigFlatMap; } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index 40a57f5d62e..f74c1ffdd2d 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -1048,7 +1048,7 @@ private void configureJobPropertiesWithMaterializedViewConfigs() { .filter(vc -> Objects.equals(vc.getValue().getViewClassName(), MaterializedView.class.getCanonicalName())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!viewConfigMap.isEmpty()) { - pushJobSetting.viewConfigFlatMap = ViewUtils.flatViewConfigMapString(viewConfigMap); + pushJobSetting.materializedViewConfigFlatMap = ViewUtils.flatViewConfigMapString(viewConfigMap); } } catch (Exception e) { throw new VeniceException("Failed to configure job properties with view configs"); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java index 172bdc07314..faa6e3968e1 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java @@ -306,8 +306,8 @@ private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting) { } jobConf.setPartitionerClass(partitionerClass); } - if (pushJobSetting.viewConfigFlatMap != null) { - jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.viewConfigFlatMap); + if (pushJobSetting.materializedViewConfigFlatMap != null) { + jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap); } jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false)); int partitionCount = pushJobSetting.partitionCount; diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java index d600245cdd5..e202ced5d08 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java @@ -13,6 +13,8 @@ import com.linkedin.venice.utils.IteratorUtils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.AbstractVeniceWriter; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.BytesWritable; @@ -174,4 +176,22 @@ private long getTotalIncomingDataSizeInBytes(JobConf jobConfig) { protected void setHadoopJobClientProvider(HadoopJobClientProvider hadoopJobClientProvider) { this.hadoopJobClientProvider = hadoopJobClientProvider; } + + // Visible for testing + @Override + protected AbstractVeniceWriter createCompositeVeniceWriter( + VeniceWriterFactory factory, + VeniceWriter mainWriter, + String flatViewConfigMapString, + String topicName, + boolean chunkingEnabled, + boolean rmdChunkingEnabled) { + return super.createCompositeVeniceWriter( + factory, + mainWriter, + flatViewConfigMapString, + topicName, + chunkingEnabled, + rmdChunkingEnabled); + } } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java index 80edd0d38d3..87c522e1211 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java @@ -38,12 +38,12 @@ import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.AbstractVeniceWriter; +import com.linkedin.venice.writer.CompositeVeniceWriter; import com.linkedin.venice.writer.DeleteMetadata; import com.linkedin.venice.writer.PutMetadata; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; -import com.linkedin.venice.writer.update.CompositeVeniceWriter; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -381,7 +381,8 @@ private AbstractVeniceWriter createBasicVeniceWriter() { } } - AbstractVeniceWriter createCompositeVeniceWriter( + // protected and package private for testing purposes + protected AbstractVeniceWriter createCompositeVeniceWriter( VeniceWriterFactory factory, VeniceWriter mainWriter, String flatViewConfigMapString, diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java index adbb7bb575c..4595f746f20 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java @@ -127,8 +127,8 @@ private void setupSparkDataWriterJobFlow(PushJobSetting pushJobSetting) { Properties jobProps = new Properties(); sparkSession.conf().getAll().foreach(entry -> jobProps.setProperty(entry._1, entry._2)); - if (pushJobSetting.viewConfigFlatMap != null) { - jobProps.put(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.viewConfigFlatMap); + if (pushJobSetting.materializedViewConfigFlatMap != null) { + jobProps.put(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap); } JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); Broadcast broadcastProperties = sparkContext.broadcast(jobProps); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index c5161eb82a1..36c367c6525 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -74,6 +74,9 @@ import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.schema.AvroSchemaParseUtils; @@ -83,6 +86,9 @@ import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.views.ChangeCaptureView; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.VeniceWriter; import java.util.Collections; import java.util.HashMap; @@ -923,6 +929,44 @@ public void testGetVeniceWriter() { } } + @Test + public void testConfigureWithMaterializedViewConfigs() throws Exception { + Properties properties = getVpjRequiredProperties(); + properties.put(KEY_FIELD_PROP, "id"); + properties.put(VALUE_FIELD_PROP, "name"); + JobStatusQueryResponse response = mockJobStatusQuery(); + ControllerClient client = getClient(); + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), eq(null)); + try (final VenicePushJob vpj = getSpyVenicePushJob(properties, client)) { + skipVPJValidation(vpj); + vpj.run(); + PushJobSetting pushJobSetting = vpj.getPushJobSetting(); + Assert.assertNull(pushJobSetting.materializedViewConfigFlatMap); + } + Map viewConfigs = new HashMap<>(); + ViewParameters.Builder builder = new ViewParameters.Builder("testView").setPartitionCount(12) + .setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + viewConfigs.put("testView", new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build())); + viewConfigs + .put("dummyView", new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap())); + client = getClient(storeInfo -> { + storeInfo.setViewConfigs(viewConfigs); + }, true); + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), eq(null)); + try (final VenicePushJob vpj = getSpyVenicePushJob(properties, client)) { + skipVPJValidation(vpj); + vpj.run(); + PushJobSetting pushJobSetting = vpj.getPushJobSetting(); + Assert.assertNotNull(pushJobSetting.materializedViewConfigFlatMap); + Map viewConfigMap = + ViewUtils.parseViewConfigMapString(pushJobSetting.materializedViewConfigFlatMap); + // Ensure only materialized view configs are propagated to the job settings + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertTrue(viewConfigMap.containsKey("testView")); + Assert.assertEquals(viewConfigMap.get("testView").getViewClassName(), MaterializedView.class.getCanonicalName()); + } + } + private JobStatusQueryResponse mockJobStatusQuery() { JobStatusQueryResponse response = new JobStatusQueryResponse(); response.setStatus(ExecutionStatus.COMPLETED.toString()); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java index 1e2d1e0298a..8d6a6c95b92 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.core.JsonProcessingException; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.exceptions.RecordTooLargeException; import com.linkedin.venice.exceptions.VeniceException; @@ -30,20 +31,31 @@ import com.linkedin.venice.hadoop.task.datawriter.AbstractPartitionWriter; import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker; import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.AbstractVeniceWriter; import com.linkedin.venice.writer.DeleteMetadata; import com.linkedin.venice.writer.PutMetadata; import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.VeniceWriterOptions; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; @@ -618,6 +630,51 @@ public void close() throws IOException { Assert.assertThrows(VeniceException.class, () -> reducer.close()); } + @Test + public void testCreateCompositeVeniceWriter() throws JsonProcessingException { + VeniceReducer reducer = new VeniceReducer(); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + VeniceWriter mainWriter = mock(VeniceWriter.class); + Map viewConfigMap = new HashMap<>(); + String view1Name = "view1"; + ViewParameters.Builder builder = new ViewParameters.Builder(view1Name); + builder.setPartitionCount(6); + builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + ViewConfigImpl viewConfig1 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); + viewConfigMap.put(view1Name, viewConfig1); + String view2Name = "view2"; + builder = new ViewParameters.Builder(view2Name); + builder.setPartitionCount(12); + builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + ViewConfigImpl viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); + viewConfigMap.put(view2Name, viewConfig2); + String flatViewConfigMapString = ViewUtils.flatViewConfigMapString(viewConfigMap); + String topicName = "test_v1"; + reducer.createCompositeVeniceWriter(writerFactory, mainWriter, flatViewConfigMapString, topicName, true, true); + ArgumentCaptor vwOptionsCaptor = ArgumentCaptor.forClass(VeniceWriterOptions.class); + verify(writerFactory, times(2)).createVeniceWriter(vwOptionsCaptor.capture()); + Map verifyPartitionToViewsMap = new HashMap<>(); + verifyPartitionToViewsMap.put( + 6, + ViewUtils + .getVeniceView(viewConfig1.getViewClassName(), new Properties(), "test", viewConfig1.getViewParameters())); + verifyPartitionToViewsMap.put( + 12, + ViewUtils + .getVeniceView(viewConfig2.getViewClassName(), new Properties(), "test", viewConfig2.getViewParameters())); + for (VeniceWriterOptions options: vwOptionsCaptor.getAllValues()) { + int partitionCount = options.getPartitionCount(); + Assert.assertTrue(verifyPartitionToViewsMap.containsKey(partitionCount)); + VeniceView veniceView = verifyPartitionToViewsMap.get(partitionCount); + Assert.assertTrue(veniceView instanceof MaterializedView); + MaterializedView materializedView = (MaterializedView) veniceView; + Assert.assertEquals( + options.getTopicName(), + materializedView.getTopicNamesAndConfigsForVersion(1).keySet().stream().findAny().get()); + Assert.assertTrue(materializedView.getViewPartitioner() instanceof DefaultVenicePartitioner); + } + } + private Reporter createZeroCountReporterMock() { Reporter mockReporter = mock(Reporter.class); Counters.Counter mockCounters = mock(Counters.Counter.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/update/CompositeVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java similarity index 95% rename from internal/venice-common/src/main/java/com/linkedin/venice/writer/update/CompositeVeniceWriter.java rename to internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java index a2dbcddb963..410fb00d4f7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/update/CompositeVeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java @@ -1,4 +1,4 @@ -package com.linkedin.venice.writer.update; +package com.linkedin.venice.writer; import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; @@ -6,10 +6,6 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; -import com.linkedin.venice.writer.AbstractVeniceWriter; -import com.linkedin.venice.writer.DeleteMetadata; -import com.linkedin.venice.writer.PutMetadata; -import com.linkedin.venice.writer.VeniceWriter; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java new file mode 100644 index 00000000000..6765220d45e --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java @@ -0,0 +1,52 @@ +package com.linkedin.venice.writer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import java.util.concurrent.CompletableFuture; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class CompositeVeniceWriterTest { + @Test + public void testFlushChecksForLastWriteFuture() { + VeniceWriter mockWriter = mock(VeniceWriter.class); + CompletableFuture mainWriterFuture = new CompletableFuture<>(); + doReturn(mainWriterFuture).when(mockWriter).put(any(), any(), anyInt(), eq(null)); + mainWriterFuture.completeExceptionally(new VeniceException("Expected exception")); + AbstractVeniceWriter compositeVeniceWriter = + new CompositeVeniceWriter("test_v1", mockWriter, new VeniceWriter[0], null); + compositeVeniceWriter.put(new byte[1], new byte[1], 1, null); + VeniceException e = Assert.expectThrows(VeniceException.class, compositeVeniceWriter::flush); + Assert.assertTrue(e.getCause().getMessage().contains("Expected")); + } + + @Test + public void testWritesAreInOrder() throws InterruptedException { + VeniceWriter mockMainWriter = mock(VeniceWriter.class); + CompletableFuture mainWriterFuture = CompletableFuture.completedFuture(null); + doReturn(mainWriterFuture).when(mockMainWriter).put(any(), any(), anyInt(), eq(null)); + VeniceWriter mockChildWriter = mock(VeniceWriter.class); + CompletableFuture childWriterFuture = new CompletableFuture<>(); + doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(null)); + VeniceWriter[] childWriters = new VeniceWriter[1]; + childWriters[0] = mockChildWriter; + AbstractVeniceWriter compositeVeniceWriter = + new CompositeVeniceWriter("test_v1", mockMainWriter, childWriters, null); + compositeVeniceWriter.put(new byte[1], new byte[1], 1, null); + verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null)); + Thread.sleep(1000); + verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null)); + childWriterFuture.complete(null); + verify(mockMainWriter, timeout(1000)).put(any(), any(), anyInt(), eq(null)); + } +}