diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index d47cd3ebd5c..83994b8fb87 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -22,9 +22,9 @@ import com.linkedin.venice.kafka.protocol.ProducerMetadata; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; @@ -54,20 +54,20 @@ public class MaterializedViewWriterTest { public void testViewParametersBuilder() throws JsonProcessingException { String viewName = "testMaterializedView"; int partitionCount = 3; - ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName); + MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); Map viewParams = viewParamsBuilder.build(); Assert.assertEquals(viewParams.size(), 1); - Assert.assertEquals(viewParams.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()), viewName); + Assert.assertEquals(viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()), viewName); viewParamsBuilder.setPartitionCount(partitionCount); List projectionFields = Arrays.asList("field1", "field2"); viewParamsBuilder.setProjectionFields(projectionFields); viewParams = viewParamsBuilder.build(); Assert.assertEquals(viewParams.size(), 3); Assert.assertEquals( - viewParams.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()), + viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()), String.valueOf(partitionCount)); Assert.assertEquals( - viewParams.get(ViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()), + viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()), ObjectMapperFactory.getInstance().writeValueAsString(projectionFields)); } @@ -81,7 +81,7 @@ public void testBuildWriterOptions() { Store store = getMockStore(storeName, 1, version); doReturn(true).when(store).isNearlineProducerCompressionEnabled(); doReturn(3).when(store).getNearlineProducerCountPerWriter(); - ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName); + MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); viewParamsBuilder.setPartitionCount(6); viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); Map viewParamsMap = viewParamsBuilder.build(); @@ -107,7 +107,7 @@ public void testProcessIngestionHeartbeat() { doReturn(true).when(version).isChunkingEnabled(); doReturn(true).when(version).isRmdChunkingEnabled(); getMockStore(storeName, 1, version); - ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName); + MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); viewParamsBuilder.setPartitionCount(6); viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); Map viewParamsMap = viewParamsBuilder.build(); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 36c367c6525..f47ad21d229 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -71,12 +71,12 @@ import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.HybridStoreConfigImpl; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.schema.AvroSchemaParseUtils; @@ -944,8 +944,9 @@ public void testConfigureWithMaterializedViewConfigs() throws Exception { Assert.assertNull(pushJobSetting.materializedViewConfigFlatMap); } Map viewConfigs = new HashMap<>(); - ViewParameters.Builder builder = new ViewParameters.Builder("testView").setPartitionCount(12) - .setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + MaterializedViewParameters.Builder builder = + new MaterializedViewParameters.Builder("testView").setPartitionCount(12) + .setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); viewConfigs.put("testView", new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build())); viewConfigs .put("dummyView", new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap())); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java index 8d6a6c95b92..46f4c98abe0 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java @@ -30,10 +30,10 @@ import com.linkedin.venice.hadoop.mapreduce.engine.MapReduceEngineTaskConfigProvider; import com.linkedin.venice.hadoop.task.datawriter.AbstractPartitionWriter; import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl; import com.linkedin.venice.pubsub.api.PubSubProduceResult; @@ -637,13 +637,13 @@ public void testCreateCompositeVeniceWriter() throws JsonProcessingException { VeniceWriter mainWriter = mock(VeniceWriter.class); Map viewConfigMap = new HashMap<>(); String view1Name = "view1"; - ViewParameters.Builder builder = new ViewParameters.Builder(view1Name); + MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder(view1Name); builder.setPartitionCount(6); builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); ViewConfigImpl viewConfig1 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); viewConfigMap.put(view1Name, viewConfig1); String view2Name = "view2"; - builder = new ViewParameters.Builder(view2Name); + builder = new MaterializedViewParameters.Builder(view2Name); builder.setPartitionCount(12); builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); ViewConfigImpl viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java similarity index 99% rename from internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java rename to internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java index b4afa6af163..ce9d27204f5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java @@ -10,7 +10,7 @@ import java.util.Objects; -public enum ViewParameters { +public enum MaterializedViewParameters { /** * Parameter key used to specify the re-partition view name. */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java index 09960bf5c01..67e9b584204 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java @@ -3,10 +3,10 @@ import static com.linkedin.venice.views.ViewUtils.PARTITION_COUNT; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.utils.PartitionUtils; import com.linkedin.venice.utils.VeniceProperties; @@ -29,12 +29,14 @@ public class MaterializedView extends VeniceView { public MaterializedView(Properties props, String storeName, Map viewParameters) { super(props, storeName, viewParameters); // Override topic partition count config - viewPartitionCount = Integer.parseInt(viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + viewPartitionCount = + Integer.parseInt(viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); this.props.put(PARTITION_COUNT, viewPartitionCount); viewPartitioner = Lazy.of(() -> { - String viewPartitionerClass = this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); + String viewPartitionerClass = + this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); String viewPartitionerParamsString = - this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); + this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); return PartitionUtils.getVenicePartitioner(viewPartitionerClass, viewPartitionerParamsString); }); } @@ -52,7 +54,7 @@ public VeniceWriterOptions.Builder getWriterOptionsBuilder(String viewTopicName, @Override public Map getTopicNamesAndConfigsForVersion(int version) { VeniceProperties properties = new VeniceProperties(props); - String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()); + String viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()); return Collections.singletonMap( Version.composeKafkaTopic(storeName, version) + VIEW_TOPIC_SEPARATOR + viewName + MATERIALIZED_VIEW_TOPIC_SUFFIX, @@ -60,34 +62,37 @@ public Map getTopicNamesAndConfigsForVersion(int versi } /** - * {@link ViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view. - * {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level + * {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view. + * {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level * partitioner config if it's not specified in the view parameters. - * {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional. + * {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional. */ @Override public void validateConfigs(Store store) { - String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()); + String viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()); if (viewName == null) { - throw new VeniceException(String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_NAME.name())); + throw new VeniceException( + String.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name())); } if (store.getViewConfigs().containsKey(viewName)) { throw new VeniceException("A view config with the same view name already exist, view name: " + viewName); } - String viewPartitioner = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); + String viewPartitioner = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); if (viewPartitioner == null) { throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())); + String.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())); } try { Class.forName(viewPartitioner); } catch (ClassNotFoundException e) { throw new VeniceException("Cannot find partitioner class: " + viewPartitioner); } - String partitionCountString = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()); + String partitionCountString = + viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()); if (partitionCountString == null) { throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + String + .format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); } int viewPartitionCount = Integer.parseInt(partitionCountString); // A materialized view with the exact same partitioner and partition count as the store is not allwoed @@ -101,9 +106,9 @@ public void validateConfigs(Store store) { ViewConfig viewConfig = viewConfigEntries.getValue(); if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) { String configPartitioner = - viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); - int configPartitionCount = Integer - .parseInt(viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + viewConfig.getViewParameters().get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); + int configPartitionCount = Integer.parseInt( + viewConfig.getViewParameters().get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); if (configPartitionCount == viewPartitionCount && configPartitioner.equals(viewPartitioner)) { throw new VeniceException( "A view with identical view configs already exist, view name: " + viewConfigEntries.getKey()); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java index f831ea0a398..387887a9df7 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java @@ -1,18 +1,23 @@ package com.linkedin.venice.views; +import static com.linkedin.venice.partitioner.ConstantVenicePartitioner.CONSTANT_PARTITION; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; -import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.partitioner.ConstantVenicePartitioner; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,28 +34,30 @@ public void testValidateConfigs() { Store testStore = getMockStore("test-store", 12); // Fail due to missing view name assertThrows(() -> new MaterializedView(properties, storeName, viewParams).validateConfigs(testStore)); - viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), "test-view"); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name(), "test-view"); // Fail due to missing partition count assertThrows(() -> new MaterializedView(properties, storeName, viewParams).validateConfigs(testStore)); - viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); // Fail due to same partitioner and partition count assertThrows(() -> new MaterializedView(properties, storeName, viewParams).validateConfigs(testStore)); - viewParams - .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); + viewParams.put( + MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), + ConstantVenicePartitioner.class.getCanonicalName()); // Pass, same partition count but different partitioner new MaterializedView(properties, storeName, viewParams).validateConfigs(testStore); - viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); // Pass, same partitioner but different partition count new MaterializedView(properties, storeName, viewParams).validateConfigs(testStore); viewParams.put( - ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), + MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName() + "DNE"); // Fail due to invalid partitioner class assertThrows(() -> new MaterializedView(properties, storeName, viewParams).validateConfigs(testStore)); - viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); - viewParams - .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); + viewParams.put( + MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), + ConstantVenicePartitioner.class.getCanonicalName()); String newStoreName = "test-store-existing-config"; Store storeWithExistingViews = getMockStore(newStoreName, 12); ViewConfig viewConfig = mock(ViewConfig.class); @@ -59,16 +66,19 @@ public void testValidateConfigs() { assertThrows( () -> new MaterializedView(properties, newStoreName, viewParams).validateConfigs(storeWithExistingViews)); Map existingViewConfigParams = new HashMap<>(); + existingViewConfigParams.put( + MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), + ConstantVenicePartitioner.class.getCanonicalName()); existingViewConfigParams - .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); - existingViewConfigParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(12)); + .put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(12)); doReturn(existingViewConfigParams).when(viewConfig).getViewParameters(); doReturn(MaterializedView.class.getCanonicalName()).when(viewConfig).getViewClassName(); doReturn(Collections.singletonMap("old-view", viewConfig)).when(storeWithExistingViews).getViewConfigs(); // Fail due to existing identical view config assertThrows( () -> new MaterializedView(properties, newStoreName, viewParams).validateConfigs(storeWithExistingViews)); - existingViewConfigParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(36)); + existingViewConfigParams + .put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(36)); // Pass, same partitioner but different partition count new MaterializedView(properties, storeName, viewParams).validateConfigs(testStore); } @@ -79,8 +89,8 @@ public void testRePartitionViewTopicProcessing() { Map viewParams = new HashMap<>(); int version = 8; String rePartitionViewName = "test-view"; - viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), rePartitionViewName); - viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name(), rePartitionViewName); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); MaterializedView materializedView = new MaterializedView(new Properties(), storeName, viewParams); Map rePartitionViewTopicMap = materializedView.getTopicNamesAndConfigsForVersion(version); assertEquals(rePartitionViewTopicMap.size(), 1); @@ -93,6 +103,62 @@ public void testRePartitionViewTopicProcessing() { } } + @Test + public void testViewConfigMapFlattening() throws JsonProcessingException { + String viewName = "testView1"; + String viewName2 = "testView2"; + MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder(viewName); + builder.setPartitionCount(3).setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); + MaterializedViewParameters.Builder builder2 = new MaterializedViewParameters.Builder(viewName2); + builder2.setPartitionCount(6) + .setPartitioner(ConstantVenicePartitioner.class.getCanonicalName()) + .setPartitionerParams(Collections.singletonMap(CONSTANT_PARTITION, String.valueOf(0))); + ViewConfig viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder2.build()); + Map storeViewConfigMap = new HashMap<>(); + storeViewConfigMap.put(viewName, viewConfig); + storeViewConfigMap.put(viewName2, viewConfig2); + // Flatten the config map to a string, parse it back and ensure it can still be used to create the correct views + String flattenString = ViewUtils.flatViewConfigMapString(storeViewConfigMap); + Map parsedViewConfigMap = ViewUtils.parseViewConfigMapString(flattenString); + ViewConfig parsedViewConfig = parsedViewConfigMap.get(viewName); + ViewConfig parsedViewConfig2 = parsedViewConfigMap.get(viewName2); + VeniceView view = ViewUtils.getVeniceView( + parsedViewConfig.getViewClassName(), + new Properties(), + "testStore", + parsedViewConfig.getViewParameters()); + VeniceView view2 = ViewUtils.getVeniceView( + parsedViewConfig2.getViewClassName(), + new Properties(), + "testStore", + parsedViewConfig2.getViewParameters()); + assertTrue(view instanceof MaterializedView); + assertEquals(((MaterializedView) view).getViewPartitionCount(), 3); + assertTrue(((MaterializedView) view).getViewPartitioner() instanceof DefaultVenicePartitioner); + assertTrue(view2 instanceof MaterializedView); + assertEquals(((MaterializedView) view2).getViewPartitionCount(), 6); + assertTrue(((MaterializedView) view2).getViewPartitioner() instanceof ConstantVenicePartitioner); + } + + @Test + public void testGetWriterOptionsBuilder() { + MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder("testView"); + builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()).setPartitionCount(3); + VeniceView view = ViewUtils + .getVeniceView(MaterializedView.class.getCanonicalName(), new Properties(), "testStore", builder.build()); + Version version = mock(Version.class); + doReturn(true).when(version).isRmdChunkingEnabled(); + doReturn(true).when(version).isChunkingEnabled(); + String viewTopic = "dummyViewTopic"; + VeniceWriterOptions options = view.getWriterOptionsBuilder(viewTopic, version).build(); + assertEquals(options.getTopicName(), viewTopic); + assertEquals(options.getPartitionCount().intValue(), 3); + assertTrue(options.getPartitioner() instanceof DefaultVenicePartitioner); + assertTrue(options.isChunkingEnabled()); + assertTrue(options.isRmdChunkingEnabled()); + } + private Store getMockStore(String storeName, int partitionCount) { Store testStore = mock(Store.class); // We can remove this requirement from VeniceView into ChangeCaptureView once we refactor the ingestion path to diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java index a710c19e4b0..d304dadd34a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java @@ -17,9 +17,9 @@ import com.linkedin.venice.integration.utils.VeniceControllerWrapper; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; @@ -105,7 +105,8 @@ public void testLFIngestionWithMaterializedView() throws IOException { try (ControllerClient controllerClient = IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { String testViewName = "MaterializedViewTest"; - ViewParameters.Builder viewParamBuilder = new ViewParameters.Builder(testViewName).setPartitionCount(6); + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(6); UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) .setViewClassName(MaterializedView.class.getCanonicalName()) .setViewClassParams(viewParamBuilder.build()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index ba613eafee3..c9bc076ffa7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -183,6 +183,7 @@ import com.linkedin.venice.meta.ETLStoreConfig; import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.Instance; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.ReadWriteStoreRepository; import com.linkedin.venice.meta.RegionPushDetails; @@ -197,7 +198,6 @@ import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.persona.StoragePersona; import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -2836,14 +2836,15 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi String.format("Materialized View name cannot contain version separator: %s", VERSION_SEPARATOR)); } Map viewParams = viewConfig.getViewParameters(); - ViewParameters.Builder decoratedViewParamBuilder = new ViewParameters.Builder(viewName, viewParams); - if (!viewParams.containsKey(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())) { + MaterializedViewParameters.Builder decoratedViewParamBuilder = + new MaterializedViewParameters.Builder(viewName, viewParams); + if (!viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())) { decoratedViewParamBuilder.setPartitioner(store.getPartitionerConfig().getPartitionerClass()); if (!store.getPartitionerConfig().getPartitionerParams().isEmpty()) { decoratedViewParamBuilder.setPartitionerParams(store.getPartitionerConfig().getPartitionerParams()); } } - if (!viewParams.containsKey(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { + if (!viewParams.containsKey(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { decoratedViewParamBuilder.setPartitionCount(store.getPartitionCount()); } viewConfig.setViewParameters(decoratedViewParamBuilder.build()); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index ffc2db73a97..f469593b04f 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -58,6 +58,7 @@ import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.DataReplicationPolicy; import com.linkedin.venice.meta.HybridStoreConfigImpl; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.ReadStrategy; @@ -70,7 +71,6 @@ import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.InvalidKeySchemaPartitioner; @@ -2000,7 +2000,7 @@ public void testSetRePartitionViewConfig() { String viewString = String.format( rePartitionViewConfigString, MaterializedView.class.getCanonicalName(), - ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), + MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), rePartitionViewPartitionCount); // Invalid re-partition view name @@ -2017,13 +2017,14 @@ public void testSetRePartitionViewConfig() { Assert.assertTrue(updateStore.getViews().containsKey(rePartitionViewName)); Map rePartitionViewParameters = updateStore.getViews().get(rePartitionViewName).viewParameters; - Assert.assertNotNull(rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name())); + Assert.assertNotNull(rePartitionViewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name())); Assert.assertEquals( - rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), + rePartitionViewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), rePartitionViewName); Assert.assertEquals( Integer.parseInt( - rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), + rePartitionViewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()) + .toString()), rePartitionViewPartitionCount); } @@ -2058,8 +2059,9 @@ public void testInsertMaterializedViewConfig() { String rePartitionViewName = "rePartitionViewA"; int rePartitionViewPartitionCount = 10; Map viewClassParams = new HashMap<>(); - viewClassParams - .put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(rePartitionViewPartitionCount)); + viewClassParams.put( + MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), + Integer.toString(rePartitionViewPartitionCount)); // Invalid re-partition view name Assert.assertThrows( @@ -2083,13 +2085,14 @@ public void testInsertMaterializedViewConfig() { Assert.assertTrue(updateStore.getViews().containsKey(rePartitionViewName)); Map rePartitionViewParameters = updateStore.getViews().get(rePartitionViewName).viewParameters; - Assert.assertNotNull(rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name())); + Assert.assertNotNull(rePartitionViewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name())); Assert.assertEquals( - rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), + rePartitionViewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), rePartitionViewName); Assert.assertEquals( Integer.parseInt( - rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), + rePartitionViewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()) + .toString()), rePartitionViewPartitionCount); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java index 8e3abaaec3b..43904d3a056 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java @@ -36,6 +36,7 @@ import com.linkedin.venice.meta.DataReplicationPolicy; import com.linkedin.venice.meta.HybridStoreConfigImpl; import com.linkedin.venice.meta.Instance; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.Partition; import com.linkedin.venice.meta.PartitionAssignment; @@ -48,7 +49,6 @@ import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.utils.TestUtils; @@ -1106,10 +1106,12 @@ public void testEOPReceivedProcedures() { Map viewParams = new HashMap<>(); String viewName = "testView"; int viewPartitionCount = 10; - viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), viewName); - viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(viewPartitionCount)); + viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name(), viewName); viewParams - .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), DefaultVenicePartitioner.class.getCanonicalName()); + .put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(viewPartitionCount)); + viewParams.put( + MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), + DefaultVenicePartitioner.class.getCanonicalName()); ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParams); viewConfigMap.put(viewName, viewConfig); String topic = getTopic();