Skip to content

Commit

Permalink
Unit test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Dec 2, 2024
1 parent 08c1275 commit 3c59c8c
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
Expand Down Expand Up @@ -54,20 +54,20 @@ public class MaterializedViewWriterTest {
public void testViewParametersBuilder() throws JsonProcessingException {
String viewName = "testMaterializedView";
int partitionCount = 3;
ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName);
MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName);
Map<String, String> viewParams = viewParamsBuilder.build();
Assert.assertEquals(viewParams.size(), 1);
Assert.assertEquals(viewParams.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()), viewName);
Assert.assertEquals(viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()), viewName);
viewParamsBuilder.setPartitionCount(partitionCount);
List<String> projectionFields = Arrays.asList("field1", "field2");
viewParamsBuilder.setProjectionFields(projectionFields);
viewParams = viewParamsBuilder.build();
Assert.assertEquals(viewParams.size(), 3);
Assert.assertEquals(
viewParams.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()),
viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()),
String.valueOf(partitionCount));
Assert.assertEquals(
viewParams.get(ViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()),
viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()),
ObjectMapperFactory.getInstance().writeValueAsString(projectionFields));
}

Expand All @@ -81,7 +81,7 @@ public void testBuildWriterOptions() {
Store store = getMockStore(storeName, 1, version);
doReturn(true).when(store).isNearlineProducerCompressionEnabled();
doReturn(3).when(store).getNearlineProducerCountPerWriter();
ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName);
MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName);
viewParamsBuilder.setPartitionCount(6);
viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
Map<String, String> viewParamsMap = viewParamsBuilder.build();
Expand All @@ -107,7 +107,7 @@ public void testProcessIngestionHeartbeat() {
doReturn(true).when(version).isChunkingEnabled();
doReturn(true).when(version).isRmdChunkingEnabled();
getMockStore(storeName, 1, version);
ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName);
MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName);
viewParamsBuilder.setPartitionCount(6);
viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
Map<String, String> viewParamsMap = viewParamsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
Expand Down Expand Up @@ -944,8 +944,9 @@ public void testConfigureWithMaterializedViewConfigs() throws Exception {
Assert.assertNull(pushJobSetting.materializedViewConfigFlatMap);
}
Map<String, ViewConfig> viewConfigs = new HashMap<>();
ViewParameters.Builder builder = new ViewParameters.Builder("testView").setPartitionCount(12)
.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
MaterializedViewParameters.Builder builder =
new MaterializedViewParameters.Builder("testView").setPartitionCount(12)
.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
viewConfigs.put("testView", new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()));
viewConfigs
.put("dummyView", new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import com.linkedin.venice.hadoop.mapreduce.engine.MapReduceEngineTaskConfigProvider;
import com.linkedin.venice.hadoop.task.datawriter.AbstractPartitionWriter;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
Expand Down Expand Up @@ -637,13 +637,13 @@ public void testCreateCompositeVeniceWriter() throws JsonProcessingException {
VeniceWriter<byte[], byte[], byte[]> mainWriter = mock(VeniceWriter.class);
Map<String, ViewConfig> viewConfigMap = new HashMap<>();
String view1Name = "view1";
ViewParameters.Builder builder = new ViewParameters.Builder(view1Name);
MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder(view1Name);
builder.setPartitionCount(6);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig1 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
viewConfigMap.put(view1Name, viewConfig1);
String view2Name = "view2";
builder = new ViewParameters.Builder(view2Name);
builder = new MaterializedViewParameters.Builder(view2Name);
builder.setPartitionCount(12);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Objects;


public enum ViewParameters {
public enum MaterializedViewParameters {
/**
* Parameter key used to specify the re-partition view name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import static com.linkedin.venice.views.ViewUtils.PARTITION_COUNT;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.VeniceProperties;
Expand All @@ -29,12 +29,14 @@ public class MaterializedView extends VeniceView {
public MaterializedView(Properties props, String storeName, Map<String, String> viewParameters) {
super(props, storeName, viewParameters);
// Override topic partition count config
viewPartitionCount = Integer.parseInt(viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
viewPartitionCount =
Integer.parseInt(viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
this.props.put(PARTITION_COUNT, viewPartitionCount);
viewPartitioner = Lazy.of(() -> {
String viewPartitionerClass = this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
String viewPartitionerClass =
this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
String viewPartitionerParamsString =
this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name());
this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name());
return PartitionUtils.getVenicePartitioner(viewPartitionerClass, viewPartitionerParamsString);
});
}
Expand All @@ -52,42 +54,45 @@ public VeniceWriterOptions.Builder getWriterOptionsBuilder(String viewTopicName,
@Override
public Map<String, VeniceProperties> getTopicNamesAndConfigsForVersion(int version) {
VeniceProperties properties = new VeniceProperties(props);
String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name());
String viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name());
return Collections.singletonMap(
Version.composeKafkaTopic(storeName, version) + VIEW_TOPIC_SEPARATOR + viewName
+ MATERIALIZED_VIEW_TOPIC_SUFFIX,
properties);
}

/**
* {@link ViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view.
* {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level
* {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view.
* {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level
* partitioner config if it's not specified in the view parameters.
* {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional.
* {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional.
*/
@Override
public void validateConfigs(Store store) {
String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name());
String viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name());
if (viewName == null) {
throw new VeniceException(String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_NAME.name()));
throw new VeniceException(
String.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()));
}
if (store.getViewConfigs().containsKey(viewName)) {
throw new VeniceException("A view config with the same view name already exist, view name: " + viewName);
}
String viewPartitioner = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
String viewPartitioner = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
if (viewPartitioner == null) {
throw new VeniceException(
String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()));
String.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()));
}
try {
Class.forName(viewPartitioner);
} catch (ClassNotFoundException e) {
throw new VeniceException("Cannot find partitioner class: " + viewPartitioner);
}
String partitionCountString = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name());
String partitionCountString =
viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name());
if (partitionCountString == null) {
throw new VeniceException(
String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
String
.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
}
int viewPartitionCount = Integer.parseInt(partitionCountString);
// A materialized view with the exact same partitioner and partition count as the store is not allwoed
Expand All @@ -101,9 +106,9 @@ public void validateConfigs(Store store) {
ViewConfig viewConfig = viewConfigEntries.getValue();
if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) {
String configPartitioner =
viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
int configPartitionCount = Integer
.parseInt(viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
viewConfig.getViewParameters().get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
int configPartitionCount = Integer.parseInt(
viewConfig.getViewParameters().get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
if (configPartitionCount == viewPartitionCount && configPartitioner.equals(viewPartitioner)) {
throw new VeniceException(
"A view with identical view configs already exist, view name: " + viewConfigEntries.getKey());
Expand Down
Loading

0 comments on commit 3c59c8c

Please sign in to comment.