Skip to content

Commit

Permalink
Added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Nov 25, 2024
1 parent 6b541a1 commit 791712b
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,5 @@ public PushJobSetting() {
this.jobStartTimeMs = System.currentTimeMillis();
}

public String viewConfigFlatMap;
public String materializedViewConfigFlatMap;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ private void configureJobPropertiesWithMaterializedViewConfigs() {
.filter(vc -> Objects.equals(vc.getValue().getViewClassName(), MaterializedView.class.getCanonicalName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!viewConfigMap.isEmpty()) {
pushJobSetting.viewConfigFlatMap = ViewUtils.flatViewConfigMapString(viewConfigMap);
pushJobSetting.materializedViewConfigFlatMap = ViewUtils.flatViewConfigMapString(viewConfigMap);
}
} catch (Exception e) {
throw new VeniceException("Failed to configure job properties with view configs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting) {
}
jobConf.setPartitionerClass(partitionerClass);
}
if (pushJobSetting.viewConfigFlatMap != null) {
jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.viewConfigFlatMap);
if (pushJobSetting.materializedViewConfigFlatMap != null) {
jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap);
}
jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false));
int partitionCount = pushJobSetting.partitionCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.linkedin.venice.utils.IteratorUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -174,4 +176,22 @@ private long getTotalIncomingDataSizeInBytes(JobConf jobConfig) {
protected void setHadoopJobClientProvider(HadoopJobClientProvider hadoopJobClientProvider) {
this.hadoopJobClientProvider = hadoopJobClientProvider;
}

// Visible for testing
@Override
protected AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter(
VeniceWriterFactory factory,
VeniceWriter<byte[], byte[], byte[]> mainWriter,
String flatViewConfigMapString,
String topicName,
boolean chunkingEnabled,
boolean rmdChunkingEnabled) {
return super.createCompositeVeniceWriter(
factory,
mainWriter,
flatViewConfigMapString,
topicName,
chunkingEnabled,
rmdChunkingEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.views.ViewUtils;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.CompositeVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import com.linkedin.venice.writer.update.CompositeVeniceWriter;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -381,7 +381,8 @@ private AbstractVeniceWriter<byte[], byte[], byte[]> createBasicVeniceWriter() {
}
}

AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter(
// protected and package private for testing purposes
protected AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter(
VeniceWriterFactory factory,
VeniceWriter<byte[], byte[], byte[]> mainWriter,
String flatViewConfigMapString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ private void setupSparkDataWriterJobFlow(PushJobSetting pushJobSetting) {

Properties jobProps = new Properties();
sparkSession.conf().getAll().foreach(entry -> jobProps.setProperty(entry._1, entry._2));
if (pushJobSetting.viewConfigFlatMap != null) {
jobProps.put(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.viewConfigFlatMap);
if (pushJobSetting.materializedViewConfigFlatMap != null) {
jobProps.put(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap);
}
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Broadcast<Properties> broadcastProperties = sparkContext.broadcast(jobProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
Expand All @@ -83,6 +86,9 @@
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.views.ChangeCaptureView;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.ViewUtils;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -923,6 +929,44 @@ public void testGetVeniceWriter() {
}
}

@Test
public void testConfigureWithMaterializedViewConfigs() throws Exception {
Properties properties = getVpjRequiredProperties();
properties.put(KEY_FIELD_PROP, "id");
properties.put(VALUE_FIELD_PROP, "name");
JobStatusQueryResponse response = mockJobStatusQuery();
ControllerClient client = getClient();
doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), eq(null));
try (final VenicePushJob vpj = getSpyVenicePushJob(properties, client)) {
skipVPJValidation(vpj);
vpj.run();
PushJobSetting pushJobSetting = vpj.getPushJobSetting();
Assert.assertNull(pushJobSetting.materializedViewConfigFlatMap);
}
Map<String, ViewConfig> viewConfigs = new HashMap<>();
ViewParameters.Builder builder = new ViewParameters.Builder("testView").setPartitionCount(12)
.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
viewConfigs.put("testView", new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()));
viewConfigs
.put("dummyView", new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap()));
client = getClient(storeInfo -> {
storeInfo.setViewConfigs(viewConfigs);
}, true);
doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), eq(null));
try (final VenicePushJob vpj = getSpyVenicePushJob(properties, client)) {
skipVPJValidation(vpj);
vpj.run();
PushJobSetting pushJobSetting = vpj.getPushJobSetting();
Assert.assertNotNull(pushJobSetting.materializedViewConfigFlatMap);
Map<String, ViewConfig> viewConfigMap =
ViewUtils.parseViewConfigMapString(pushJobSetting.materializedViewConfigFlatMap);
// Ensure only materialized view configs are propagated to the job settings
Assert.assertEquals(viewConfigMap.size(), 1);
Assert.assertTrue(viewConfigMap.containsKey("testView"));
Assert.assertEquals(viewConfigMap.get("testView").getViewClassName(), MaterializedView.class.getCanonicalName());
}
}

private JobStatusQueryResponse mockJobStatusQuery() {
JobStatusQueryResponse response = new JobStatusQueryResponse();
response.setStatus(ExecutionStatus.COMPLETED.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.RecordTooLargeException;
import com.linkedin.venice.exceptions.VeniceException;
Expand All @@ -30,20 +31,31 @@
import com.linkedin.venice.hadoop.task.datawriter.AbstractPartitionWriter;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.views.ViewUtils;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -618,6 +630,51 @@ public void close() throws IOException {
Assert.assertThrows(VeniceException.class, () -> reducer.close());
}

@Test
public void testCreateCompositeVeniceWriter() throws JsonProcessingException {
VeniceReducer reducer = new VeniceReducer();
VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class);
VeniceWriter<byte[], byte[], byte[]> mainWriter = mock(VeniceWriter.class);
Map<String, ViewConfig> viewConfigMap = new HashMap<>();
String view1Name = "view1";
ViewParameters.Builder builder = new ViewParameters.Builder(view1Name);
builder.setPartitionCount(6);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig1 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
viewConfigMap.put(view1Name, viewConfig1);
String view2Name = "view2";
builder = new ViewParameters.Builder(view2Name);
builder.setPartitionCount(12);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
viewConfigMap.put(view2Name, viewConfig2);
String flatViewConfigMapString = ViewUtils.flatViewConfigMapString(viewConfigMap);
String topicName = "test_v1";
reducer.createCompositeVeniceWriter(writerFactory, mainWriter, flatViewConfigMapString, topicName, true, true);
ArgumentCaptor<VeniceWriterOptions> vwOptionsCaptor = ArgumentCaptor.forClass(VeniceWriterOptions.class);
verify(writerFactory, times(2)).createVeniceWriter(vwOptionsCaptor.capture());
Map<Integer, VeniceView> verifyPartitionToViewsMap = new HashMap<>();
verifyPartitionToViewsMap.put(
6,
ViewUtils
.getVeniceView(viewConfig1.getViewClassName(), new Properties(), "test", viewConfig1.getViewParameters()));
verifyPartitionToViewsMap.put(
12,
ViewUtils
.getVeniceView(viewConfig2.getViewClassName(), new Properties(), "test", viewConfig2.getViewParameters()));
for (VeniceWriterOptions options: vwOptionsCaptor.getAllValues()) {
int partitionCount = options.getPartitionCount();
Assert.assertTrue(verifyPartitionToViewsMap.containsKey(partitionCount));
VeniceView veniceView = verifyPartitionToViewsMap.get(partitionCount);
Assert.assertTrue(veniceView instanceof MaterializedView);
MaterializedView materializedView = (MaterializedView) veniceView;
Assert.assertEquals(
options.getTopicName(),
materializedView.getTopicNamesAndConfigsForVersion(1).keySet().stream().findAny().get());
Assert.assertTrue(materializedView.getViewPartitioner() instanceof DefaultVenicePartitioner);
}
}

private Reporter createZeroCountReporterMock() {
Reporter mockReporter = mock(Reporter.class);
Counters.Counter mockCounters = mock(Counters.Counter.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package com.linkedin.venice.writer.update;
package com.linkedin.venice.writer;

import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.linkedin.venice.writer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import java.util.concurrent.CompletableFuture;
import org.testng.Assert;
import org.testng.annotations.Test;


public class CompositeVeniceWriterTest {
@Test
public void testFlushChecksForLastWriteFuture() {
VeniceWriter<byte[], byte[], byte[]> mockWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> mainWriterFuture = new CompletableFuture<>();
doReturn(mainWriterFuture).when(mockWriter).put(any(), any(), anyInt(), eq(null));
mainWriterFuture.completeExceptionally(new VeniceException("Expected exception"));
AbstractVeniceWriter<byte[], byte[], byte[]> compositeVeniceWriter =
new CompositeVeniceWriter("test_v1", mockWriter, new VeniceWriter[0], null);
compositeVeniceWriter.put(new byte[1], new byte[1], 1, null);
VeniceException e = Assert.expectThrows(VeniceException.class, compositeVeniceWriter::flush);
Assert.assertTrue(e.getCause().getMessage().contains("Expected"));
}

@Test
public void testWritesAreInOrder() throws InterruptedException {
VeniceWriter<byte[], byte[], byte[]> mockMainWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> mainWriterFuture = CompletableFuture.completedFuture(null);
doReturn(mainWriterFuture).when(mockMainWriter).put(any(), any(), anyInt(), eq(null));
VeniceWriter<byte[], byte[], byte[]> mockChildWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> childWriterFuture = new CompletableFuture<>();
doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(null));
VeniceWriter[] childWriters = new VeniceWriter[1];
childWriters[0] = mockChildWriter;
AbstractVeniceWriter<byte[], byte[], byte[]> compositeVeniceWriter =
new CompositeVeniceWriter<byte[], byte[], byte[]>("test_v1", mockMainWriter, childWriters, null);
compositeVeniceWriter.put(new byte[1], new byte[1], 1, null);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
Thread.sleep(1000);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
childWriterFuture.complete(null);
verify(mockMainWriter, timeout(1000)).put(any(), any(), anyInt(), eq(null));
}
}

0 comments on commit 791712b

Please sign in to comment.