From 198c3ae23aa42d65158e9de29ac939c2b3f77178 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Fri, 1 Nov 2024 13:49:12 -0700 Subject: [PATCH] [server][controller] Add MaterializedViewWriter and support view writers in L/F 1. View writers will be invoked in L/F SIT too instead of only in A/A SIT. We rely on view config validation to ensure views that do require A/A are only added to stores with A/A enabled. 2. This PR only includes creation of materialized view topics, writing of data records and control messages to the materialized view topics in server and controller. - Materialized view topics are created during version creation time along with other view topics. - SOP is sent during view topic creation time with same chunking and compression configs as the store version. - EOP is sent when servers have reported EOP in every partition. - Incremental push control messages SOIP and EOIP are not propagated to the view topic for now because the end to end incremental push tracking story for view topics is not clear yet. Store owners will likely just disable the requirement to wait for view consumers to fully ingest the incremental push. - Ingestion heartbeats will be propagated in a broadcast manner. See implementation for details. - Version swap for CDC users will be implemented in a separate PR to keep this PR somewhat short for review. 3. TODO: one pending issue to be resolved is that during processing of batch records in the native replication source fabric, where we consume local VT, a leader transfer could result in missing records in the materialized view topic. This is because we don't do any global checkpointing across leader and followers when consuming local VT. --- .../ActiveActiveStoreIngestionTask.java | 50 ++-- .../LeaderFollowerStoreIngestionTask.java | 145 ++++++++++- .../kafka/consumer/StoreIngestionTask.java | 15 +- .../store/view/ChangeCaptureViewWriter.java | 16 +- .../store/view/MaterializedViewWriter.java | 237 ++++++++++++++++++ .../davinci/store/view/VeniceViewWriter.java | 46 +++- .../view/ChangeCaptureViewWriterTest.java | 83 ++++-- .../venice/meta/ViewParameterKeys.java | 20 -- .../linkedin/venice/meta/ViewParameters.java | 80 ++++++ .../venice/pushmonitor/OfflinePushStatus.java | 7 +- .../linkedin/venice/utils/PartitionUtils.java | 16 ++ .../venice/views/ChangeCaptureView.java | 6 +- .../venice/views/MaterializedView.java | 60 +++-- .../com/linkedin/venice/views/VeniceView.java | 7 +- .../linkedin/venice/writer/VeniceWriter.java | 19 ++ .../pushmonitor/OfflinePushStatusTest.java | 2 +- ...iewTest.java => TestMaterializedView.java} | 37 ++- .../venice/endToEnd/MaterializedViewTest.java | 151 +++++++++++ .../linkedin/venice/view/TestViewWriter.java | 14 ++ .../HelixVeniceClusterResources.java | 3 +- .../venice/controller/VeniceHelixAdmin.java | 71 ++++-- .../controller/VeniceParentHelixAdmin.java | 23 +- .../pushmonitor/AbstractPushMonitor.java | 102 ++++++-- .../PartitionStatusBasedPushMonitor.java | 7 +- .../pushmonitor/PushMonitorDelegator.java | 7 +- .../TestVeniceParentHelixAdmin.java | 21 +- .../pushmonitor/AbstractPushMonitorTest.java | 90 ++++++- .../PartitionStatusBasedPushMonitorTest.java | 6 +- 28 files changed, 1121 insertions(+), 220 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java delete mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameterKeys.java create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java rename internal/venice-common/src/test/java/com/linkedin/venice/views/{MaterializedViewTest.java => TestMaterializedView.java} (77%) create mode 100644 internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 1ceaa95b653..582591544c8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -638,7 +638,7 @@ protected void processMessageAndMaybeProduceToKafka( // call in this context much less obtrusive, however, it implies that all views can only work for AA stores // Write to views - if (this.viewWriters.size() > 0) { + if (!this.viewWriters.isEmpty()) { /** * The ordering guarantees we want is the following: * @@ -647,24 +647,9 @@ protected void processMessageAndMaybeProduceToKafka( * producer (but not necessarily acked). */ long preprocessingTime = System.currentTimeMillis(); - CompletableFuture currentVersionTopicWrite = new CompletableFuture(); - CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; - int index = 0; - // The first future is for the previous write to VT - viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); - ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); - int oldValueSchemaId = - oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); - for (VeniceViewWriter writer: viewWriters.values()) { - viewWriterFutures[index++] = writer.processRecord( - mergeConflictResult.getNewValue(), - oldValueBB, - keyBytes, - versionNumber, - mergeConflictResult.getValueSchemaId(), - oldValueSchemaId, - mergeConflictResult.getRmdRecord()); - } + CompletableFuture currentVersionTopicWrite = new CompletableFuture(); + CompletableFuture[] viewWriterFutures = + processViewWriters(partitionConsumptionState, keyBytes, mergeConflictResultWrapper, null); CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); if (exception == null) { @@ -1512,6 +1497,33 @@ public boolean isReadyToServeAnnouncedWithRTLag() { return false; } + @Override + protected CompletableFuture[] processViewWriters( + PartitionConsumptionState partitionConsumptionState, + byte[] keyBytes, + MergeConflictResultWrapper mergeConflictResultWrapper, + WriteComputeResultWrapper writeComputeResultWrapper) { + CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; + MergeConflictResult mergeConflictResult = mergeConflictResultWrapper.getMergeConflictResult(); + int index = 0; + // The first future is for the previous write to VT + viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); + ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); + int oldValueSchemaId = + oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); + for (VeniceViewWriter writer: viewWriters.values()) { + viewWriterFutures[index++] = writer.processRecord( + mergeConflictResult.getNewValue(), + oldValueBB, + keyBytes, + versionNumber, + mergeConflictResult.getValueSchemaId(), + oldValueSchemaId, + mergeConflictResult.getRmdRecord()); + } + return viewWriterFutures; + } + Runnable buildRepairTask( String sourceKafkaUrl, PubSubTopicPartition sourceTopicPartition, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 9e0415d8755..3ff2bd51dd5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -9,6 +9,7 @@ import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT; +import static com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE; import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER; import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; @@ -30,6 +31,7 @@ import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.view.ChangeCaptureViewWriter; +import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.validation.KafkaDataIntegrityValidator; import com.linkedin.davinci.validation.PartitionTracker; @@ -55,6 +57,7 @@ import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; @@ -196,6 +199,8 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { protected final Map viewWriters; protected final boolean hasChangeCaptureView; + protected final boolean hasMaterializedView; + protected final boolean hasVersionCompleted; protected final AvroStoreDeserializerCache storeDeserializerCache; @@ -326,17 +331,25 @@ public LeaderFollowerStoreIngestionTask( version.getNumber(), schemaRepository.getKeySchema(store.getName()).getSchema()); boolean tmpValueForHasChangeCaptureViewWriter = false; + boolean tmpValueForHasMaterializedViewWriter = false; for (Map.Entry viewWriter: viewWriters.entrySet()) { if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) { tmpValueForHasChangeCaptureViewWriter = true; + } else if (viewWriter.getValue() instanceof MaterializedViewWriter) { + tmpValueForHasMaterializedViewWriter = true; + } + if (tmpValueForHasChangeCaptureViewWriter && tmpValueForHasMaterializedViewWriter) { break; } } hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter; + hasMaterializedView = tmpValueForHasMaterializedViewWriter; } else { viewWriters = Collections.emptyMap(); hasChangeCaptureView = false; + hasMaterializedView = false; } + hasVersionCompleted = version.getStatus().equals(VersionStatus.ONLINE); this.storeDeserializerCache = new AvroStoreDeserializerCache( builder.getSchemaRepo(), getStoreName(), @@ -2389,7 +2402,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState); // UPDATE message is only expected in LEADER which must be produced to kafka. MessageType msgType = MessageType.valueOf(kafkaValue); - if (msgType == MessageType.UPDATE && !produceToLocalKafka) { + if (msgType == UPDATE && !produceToLocalKafka) { throw new VeniceMessageException( ingestionTaskName + " hasProducedToKafka: Received UPDATE message in non-leader for: " + consumerRecord.getTopicPartition() + " Offset " + consumerRecord.getOffset()); @@ -2423,6 +2436,39 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( partitionConsumptionState.getVeniceWriterLazyRef().ifPresent(vw -> vw.flush()); partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime); } + /** + * For materialized view we still need to produce to the view topic when we are consuming batch data from the + * local VT for the very first time for a new version. Once the version has completed any new SIT will have its + * {@link hasVersionCompleted} set to true to prevent duplicate processing of batch records by the view writers. + * Duplicate processing is still possible in the case when leader replica crashes before the version becomes + * ONLINE, but it should be harmless since eventually it will be correct. + * TODO We need to introduce additional checkpointing and rewind for L/F transition during batch processing to + * prevent data loss in the view topic for the following scenario: + * 1. VT contains 100 batch records between SOP and EOP + * 2. Initial leader consumed to 50th record while followers consumed to 80th + * 3. Initial leader crashes and one of the followers becomes the new leader and start processing and writing + * to view topic from 80th record until ingestion is completed and version becomes ONLINE. + * 4. View topic will have a gap for batch data between 50th record and 80th record. + */ + if (hasMaterializedView && shouldViewWritersProcessBatchRecords(partitionConsumptionState) + && msgType == MessageType.PUT) { + WriteComputeResultWrapper writeComputeResultWrapper = + new WriteComputeResultWrapper((Put) kafkaValue.payloadUnion, null, false); + long preprocessingTime = System.currentTimeMillis(); + CompletableFuture currentVersionTopicWrite = new CompletableFuture(); + CompletableFuture[] viewWriterFutures = + processViewWriters(partitionConsumptionState, kafkaKey.getKey(), null, writeComputeResultWrapper); + CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { + hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + if (exception == null) { + currentVersionTopicWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + currentVersionTopicWrite.completeExceptionally(veniceException); + } + }); + } return DelegateConsumerRecordResult.QUEUED_TO_DRAINER; } @@ -3321,9 +3367,60 @@ protected void processMessageAndMaybeProduceToKafka( beforeProcessingRecordTimestampNs, beforeProcessingBatchRecordsTimestampMs).getWriteComputeResultWrapper(); } + if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) { + return; + } + // Write to views + if (viewWriters != null && !viewWriters.isEmpty()) { + long preprocessingTime = System.currentTimeMillis(); + CompletableFuture currentVersionTopicWrite = new CompletableFuture(); + CompletableFuture[] viewWriterFutures = + processViewWriters(partitionConsumptionState, keyBytes, null, writeComputeResultWrapper); + CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { + hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + if (exception == null) { + produceToLocalKafkaHelper( + consumerRecord, + partitionConsumptionState, + writeComputeResultWrapper, + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingRecordTimestampNs); + currentVersionTopicWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + currentVersionTopicWrite.completeExceptionally(veniceException); + } + }); + partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite); + } else { + produceToLocalKafkaHelper( + consumerRecord, + partitionConsumptionState, + writeComputeResultWrapper, + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingRecordTimestampNs); + } + } - Put newPut = writeComputeResultWrapper.getNewPut(); + private void produceToLocalKafkaHelper( + PubSubMessage consumerRecord, + PartitionConsumptionState partitionConsumptionState, + WriteComputeResultWrapper writeComputeResultWrapper, + int partition, + String kafkaUrl, + int kafkaClusterId, + long beforeProcessingRecordTimestampNs) { + KafkaKey kafkaKey = consumerRecord.getKey(); + KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); + byte[] keyBytes = kafkaKey.getKey(); + MessageType msgType = MessageType.valueOf(kafkaValue.messageType); LeaderProducedRecordContext leaderProducedRecordContext; + Put newPut = writeComputeResultWrapper.getNewPut(); switch (msgType) { case PUT: leaderProducedRecordContext = @@ -3377,10 +3474,6 @@ protected void processMessageAndMaybeProduceToKafka( break; case UPDATE: - if (writeComputeResultWrapper.isSkipProduce()) { - return; - } - leaderProducedRecordContext = LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, newPut); BiConsumer produceFunction = @@ -3594,15 +3687,22 @@ protected long measureRTOffsetLagForSingleRegion( } @Override - protected void processVersionSwapMessage( + protected void processControlMessageForViews( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) { // Iterate through list of views for the store and process the control message. for (VeniceViewWriter viewWriter: viewWriters.values()) { - // TODO: at some point, we should do this on more or all control messages potentially as we add more view types - viewWriter.processControlMessage(controlMessage, partition, partitionConsumptionState, this.versionNumber); + viewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + controlMessage, + partition, + partitionConsumptionState, + this.versionNumber); } } @@ -3861,6 +3961,33 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio } } + protected CompletableFuture[] processViewWriters( + PartitionConsumptionState partitionConsumptionState, + byte[] keyBytes, + MergeConflictResultWrapper mergeConflictResultWrapper, + WriteComputeResultWrapper writeComputeResultWrapper) { + CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; + int index = 0; + // The first future is for the previous write to VT + viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); + for (VeniceViewWriter writer: viewWriters.values()) { + viewWriterFutures[index++] = writer.processRecord( + writeComputeResultWrapper.getNewPut().putValue, + keyBytes, + versionNumber, + writeComputeResultWrapper.getNewPut().schemaId); + } + return viewWriterFutures; + } + + private boolean shouldViewWritersProcessBatchRecords(PartitionConsumptionState partitionConsumptionState) { + if (hasVersionCompleted) { + return false; + } + return Objects.equals(partitionConsumptionState.getLeaderFollowerState(), LEADER) + || Objects.equals(partitionConsumptionState.getLeaderFollowerState(), IN_TRANSITION_FROM_STANDBY_TO_LEADER); + } + /** * Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp} * such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 1e544f62f32..6270ee5e3b0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2976,10 +2976,12 @@ protected void processEndOfIncrementalPush( } /** - * This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation in - * {@link ActiveActiveStoreIngestionTask} + * This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation in + * {@link LeaderFollowerStoreIngestionTask} */ - protected void processVersionSwapMessage( + protected void processControlMessageForViews( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) { @@ -3001,6 +3003,7 @@ protected boolean processTopicSwitch( * offset is stale and is not updated until the very end */ private boolean processControlMessage( + KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, @@ -3034,6 +3037,7 @@ private boolean processControlMessage( break; case START_OF_SEGMENT: case END_OF_SEGMENT: + case VERSION_SWAP: /** * Nothing to do here as all the processing is being done in {@link StoreIngestionTask#delegateConsumerRecord(ConsumerRecord, int, String)}. */ @@ -3048,13 +3052,11 @@ private boolean processControlMessage( checkReadyToServeAfterProcess = processTopicSwitch(controlMessage, partition, offset, partitionConsumptionState); break; - case VERSION_SWAP: - processVersionSwapMessage(controlMessage, partition, partitionConsumptionState); - break; default: throw new UnsupportedMessageTypeException( "Unrecognized Control message type " + controlMessage.controlMessageType); } + processControlMessageForViews(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState); return checkReadyToServeAfterProcess; } @@ -3172,6 +3174,7 @@ private int internalProcessConsumerRecord( ? (ControlMessage) kafkaValue.payloadUnion : (ControlMessage) leaderProducedRecordContext.getValueUnion()); checkReadyToServeAfterProcess = processControlMessage( + kafkaKey, kafkaValue, controlMessage, consumerRecord.getTopicPartition().getPartitionNumber(), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index 3bbba39b2cc..8cfe6a211fe 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -8,8 +8,10 @@ import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent; import com.linkedin.venice.client.change.capture.protocol.ValueBytes; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.VersionSwap; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; @@ -81,8 +83,20 @@ public CompletableFuture processRecord( return veniceWriter.put(key, recordChangeEvent, 1); } + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId) { + // No op + return CompletableFuture.completedFuture(null); + } + @Override public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState, @@ -176,7 +190,7 @@ VeniceWriterOptions buildWriterOptions(int version) { } configBuilder.setChunkingEnabled(storeVersionConfig.isChunkingEnabled()); - return configBuilder.build(); + return setProducerOptimizations(configBuilder).build(); } synchronized private void initializeVeniceWriter(int version) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java new file mode 100644 index 00000000000..96e5a7ba249 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -0,0 +1,237 @@ +package com.linkedin.davinci.store.view; + +import com.linkedin.davinci.config.VeniceConfigLoader; +import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.utils.RedundantExceptionFilter; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.writer.LeaderCompleteState; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.nio.ByteBuffer; +import java.time.Clock; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class MaterializedViewWriter extends VeniceViewWriter { + private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory; + private final MaterializedView internalView; + private final ReentrantLock broadcastHBLock = new ReentrantLock(); + private final Map partitionToHeartbeatTimestampMap = new HashMap<>(); + private final Clock clock; + private VeniceWriter veniceWriter; + private String materializedViewTopicName; + private long lastHBBroadcastTimestamp; + + /** + * These configs can be exposed to view parameters if or server configs if needed + */ + private static final long DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); + private static final long DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD = TimeUnit.MINUTES.toMillis(5); + private static final int DEFAULT_PARTITION_TO_ALWAYS_BROADCAST = 0; + private static final Logger LOGGER = LogManager.getLogger(MaterializedViewWriter.class); + private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = + RedundantExceptionFilter.getRedundantExceptionFilter(); + + public MaterializedViewWriter( + VeniceConfigLoader props, + Store store, + Schema keySchema, + Map extraViewParameters, + Clock clock) { + super(props, store, keySchema, extraViewParameters); + pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); + internalView = new MaterializedView(props.getCombinedProperties().toProperties(), store, extraViewParameters); + this.clock = clock; + } + + public MaterializedViewWriter( + VeniceConfigLoader props, + Store store, + Schema keySchema, + Map extraViewParameters) { + this(props, store, keySchema, extraViewParameters, Clock.systemUTC()); + } + + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + ByteBuffer oldValue, + byte[] key, + int version, + int newValueSchemaId, + int oldValueSchemaId, + GenericRecord replicationMetadataRecord) { + return processRecord(newValue, key, version, newValueSchemaId); + } + + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId) { + if (veniceWriter == null) { + initializeVeniceWriter(version); + } + return veniceWriter.put(key, newValue.array(), newValueSchemaId); + } + + @Override + public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, + ControlMessage controlMessage, + int partition, + PartitionConsumptionState partitionConsumptionState, + int version) { + final ControlMessageType type = ControlMessageType.valueOf(controlMessage); + // Ignore other control messages for materialized view. + if (type == ControlMessageType.START_OF_SEGMENT && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) { + maybePropagateHeartbeatLowWatermarkToViewTopic( + partition, + partitionConsumptionState, + kafkaMessageEnvelope.producerMetadata.messageTimestamp, + version); + } + } + + @Override + public String getWriterClassName() { + return internalView.getWriterClassName(); + } + + // package private for testing purposes. + VeniceWriterOptions buildWriterOptions(int version) { + // We need to change this and have a map of writers if one materialized view will have many topics. + materializedViewTopicName = + internalView.getTopicNamesAndConfigsForVersion(version).keySet().stream().findAny().get(); + VeniceWriterOptions.Builder configBuilder = new VeniceWriterOptions.Builder(materializedViewTopicName); + Version storeVersionConfig = store.getVersionOrThrow(version); + configBuilder.setPartitionCount(internalView.getViewPartitionCount()); + configBuilder.setChunkingEnabled(storeVersionConfig.isChunkingEnabled()); + configBuilder.setRmdChunkingEnabled(storeVersionConfig.isRmdChunkingEnabled()); + configBuilder.setPartitioner(internalView.getViewPartitioner()); + return setProducerOptimizations(configBuilder).build(); + } + + synchronized private void initializeVeniceWriter(int version) { + if (veniceWriter == null) { + veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null) + .createVeniceWriter(buildWriterOptions(version)); + } + } + + /** + * View topic's partitioner and partition count could be different from the VT. In order to ensure we are capturing + * all potential lag in the VT ingestion from the view topic, we will broadcast the low watermark observed from every + * VT leader to all partitions of the view topic. To reduce the heartbeat spam we can use a strategy as follows: + * 1. Leader of partition 0 always broadcasts its low watermark timestamp to all view topic partitions. + * 2. Leader of other partitions will only broadcast its heartbeat low watermark timestamp if it's sufficiently + * stale. This is configurable but by default it could be >= 5 minutes. This is because broadcasting redundant + * up-to-date heartbeat in view topic is not meaningful when the main goal here is just to identify if there + * are any lagging partitions or the largest lag amongst all VT partitions. Since lag in any VT partition could + * result in lag in one or more view topic partitions. + * 3. This broadcasting heartbeat mechanism will only provide lag info to view topic consumers if the corresponding + * VT consumption is not stuck. e.g. if one VT partition is stuck we won't be able to detect such issue from the + * view topic heartbeats because VT partitions that are not stuck will be broadcasting heartbeats. Due to this + * reason we can also clear and rebuild the partition to timestamp map to simplify the maintenance logic. + */ + private void maybePropagateHeartbeatLowWatermarkToViewTopic( + int partition, + PartitionConsumptionState partitionConsumptionState, + long heartbeatTimestamp, + int version) { + boolean propagate = false; + long oldestHeartbeatTimestamp; + broadcastHBLock.lock(); + try { + partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp); + long now = clock.millis(); + if (now > lastHBBroadcastTimestamp + DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS + && !partitionToHeartbeatTimestampMap.isEmpty()) { + oldestHeartbeatTimestamp = Collections.min(partitionToHeartbeatTimestampMap.values()); + if (partition == DEFAULT_PARTITION_TO_ALWAYS_BROADCAST + || now - oldestHeartbeatTimestamp > DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD) { + propagate = true; + lastHBBroadcastTimestamp = now; + } + partitionToHeartbeatTimestampMap.clear(); + } + } finally { + broadcastHBLock.unlock(); + } + if (propagate) { + if (veniceWriter == null) { + initializeVeniceWriter(version); + } + LeaderCompleteState leaderCompleteState = + LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported()); + Set failedPartitions = VeniceConcurrentHashMap.newKeySet(); + Set> heartbeatFutures = VeniceConcurrentHashMap.newKeySet(); + AtomicReference completionException = new AtomicReference<>(); + for (int p = 0; p < internalView.getViewPartitionCount(); p++) { + // Due to the intertwined partition mapping, the actual LeaderMetadataWrapper is meaningless for materialized + // view consumers. Similarly, we will propagate the LeaderCompleteState, but it will only guarantee that at + // least + // one partition leader has completed. + final int viewPartitionNumber = p; + CompletableFuture heartBeatFuture = veniceWriter.sendHeartbeat( + materializedViewTopicName, + viewPartitionNumber, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + true, + leaderCompleteState, + heartbeatTimestamp); + heartBeatFuture.whenComplete((ignore, throwable) -> { + if (throwable != null) { + completionException.set(new CompletionException(throwable)); + failedPartitions.add(String.valueOf(viewPartitionNumber)); + } + }); + heartbeatFutures.add(heartBeatFuture); + } + if (!heartbeatFutures.isEmpty()) { + CompletableFuture.allOf(heartbeatFutures.toArray(new CompletableFuture[0])) + .whenCompleteAsync((ignore, throwable) -> { + if (!failedPartitions.isEmpty()) { + int failedCount = failedPartitions.size(); + String logMessage = String.format( + "Broadcast materialized view heartbeat for %d partitions of topic %s: %d succeeded, %d failed for partitions %s", + heartbeatFutures.size(), + materializedViewTopicName, + heartbeatFutures.size() - failedCount, + failedCount, + String.join(",", failedPartitions)); + if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) { + LOGGER.error(logMessage, completionException.get()); + } + } + }); + } + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 64824b19e71..3a02a435a7d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -3,9 +3,12 @@ import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Store; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -45,36 +48,65 @@ public VeniceViewWriter( * @param oldValueSchemaId the schemaId of the old record * @param replicationMetadataRecord the associated RMD for the incoming record. */ - public CompletableFuture processRecord( + public abstract CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, int version, int newValueSchemaId, int oldValueSchemaId, - GenericRecord replicationMetadataRecord) { - return CompletableFuture.completedFuture(null); - } + GenericRecord replicationMetadataRecord); + + /** + * To be called as a given ingestion task consumes each record. This is called prior to writing to a + * VT or to persistent storage. + * + * @param newValue the incoming fully specified value which hasn't yet been committed to Venice + * @param key the key of the record that designates newValue and oldValue + * @param version the version of the store taking this record + * @param newValueSchemaId the schemaId of the incoming record + */ + public abstract CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId); /** * Called when the server encounters a control message. There isn't (today) a strict ordering * on if the rest of the server alters it's state completely or not based on the incoming control message * relative to the given view. * - * TODO: Today this is only invoked for VERSION_SWAP control message, but we - * may in the future call this method for all control messages so that certain - * view types can act accordingly. + * Different view types may be interested in different control messages and act differently. The corresponding + * view writer should implement this method accordingly. * + * @param kafkaKey the corresponding kafka key of this control message + * @param kafkaMessageEnvelope the corresponding kafka message envelope of this control message * @param controlMessage the control message we're processing * @param partition the partition this control message was delivered to * @param partitionConsumptionState the pcs of the consuming node * @param version the store version that received this message */ public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState, int version) { // Optionally act on Control Message } + + /** + * A store could have many views and to reduce the impact to write throughput we want to check and enable producer + * optimizations that can be configured at the store level. To change the producer optimization configs the ingestion + * task needs to be re-initialized. Meaning either a new version push or server restart after the store level config + * change and this is by design. + * @param configBuilder to be configured with the producer optimizations + * @return + */ + protected VeniceWriterOptions.Builder setProducerOptimizations(VeniceWriterOptions.Builder configBuilder) { + return configBuilder.setProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled()) + .setProducerCount(store.getNearlineProducerCountPerWriter()); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java index f65552b5267..644159a7c61 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME; import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_NAME; import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; +import static org.mockito.Mockito.mock; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.davinci.config.VeniceConfigLoader; @@ -12,7 +13,9 @@ import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.EndOfIncrementalPush; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; @@ -60,14 +63,14 @@ public void testConstructVersionSwapMessage() { highWaterMarks.put(LOR_1, 111L); highWaterMarks.put(LTX_1, 99L); highWaterMarks.put(LVA_1, 22222L); - PartitionConsumptionState mockLeaderPartitionConsumptionState = Mockito.mock(PartitionConsumptionState.class); + PartitionConsumptionState mockLeaderPartitionConsumptionState = mock(PartitionConsumptionState.class); Mockito.when(mockLeaderPartitionConsumptionState.getLeaderFollowerState()) .thenReturn(LeaderFollowerStateType.LEADER); Mockito.when(mockLeaderPartitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap()) .thenReturn(highWaterMarks); Mockito.when(mockLeaderPartitionConsumptionState.getPartition()).thenReturn(1); - PartitionConsumptionState mockFollowerPartitionConsumptionState = Mockito.mock(PartitionConsumptionState.class); + PartitionConsumptionState mockFollowerPartitionConsumptionState = mock(PartitionConsumptionState.class); Mockito.when(mockFollowerPartitionConsumptionState.getLeaderFollowerState()) .thenReturn(LeaderFollowerStateType.STANDBY); Mockito.when(mockFollowerPartitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap()) @@ -78,29 +81,31 @@ public void testConstructVersionSwapMessage() { versionSwapMessage.oldServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 1); versionSwapMessage.newServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 2); + KafkaKey kafkaKey = mock(KafkaKey.class); + KafkaMessageEnvelope kafkaMessageEnvelope = mock(KafkaMessageEnvelope.class); ControlMessage controlMessage = new ControlMessage(); controlMessage.controlMessageUnion = versionSwapMessage; - Store mockStore = Mockito.mock(Store.class); + Store mockStore = mock(Store.class); VeniceProperties props = VeniceProperties.empty(); Object2IntMap urlMappingMap = new Object2IntOpenHashMap<>(); // Add ID's to the region's to name the sort order of the RMD urlMappingMap.put(LTX_1, 0); urlMappingMap.put(LVA_1, 1); urlMappingMap.put(LOR_1, 2); - CompletableFuture mockFuture = Mockito.mock(CompletableFuture.class); + CompletableFuture mockFuture = mock(CompletableFuture.class); - VeniceWriter mockVeniceWriter = Mockito.mock(VeniceWriter.class); + VeniceWriter mockVeniceWriter = mock(VeniceWriter.class); Mockito.when(mockVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(mockFuture); - VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); Mockito.when(mockVeniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(urlMappingMap); - PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = Mockito.mock(PubSubProducerAdapterFactory.class); - PubSubClientsFactory mockPubSubClientsFactory = Mockito.mock(PubSubClientsFactory.class); + PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + PubSubClientsFactory mockPubSubClientsFactory = mock(PubSubClientsFactory.class); Mockito.when(mockPubSubClientsFactory.getProducerAdapterFactory()).thenReturn(mockPubSubProducerAdapterFactory); Mockito.when(mockVeniceServerConfig.getPubSubClientsFactory()).thenReturn(mockPubSubClientsFactory); - VeniceConfigLoader mockVeniceConfigLoader = Mockito.mock(VeniceConfigLoader.class); + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); Mockito.when(mockVeniceConfigLoader.getCombinedProperties()).thenReturn(props); Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); @@ -110,14 +115,26 @@ public void testConstructVersionSwapMessage() { changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter); // Verify that we never produce the version swap from a follower replica - changeCaptureViewWriter.processControlMessage(controlMessage, 1, mockFollowerPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + controlMessage, + 1, + mockFollowerPartitionConsumptionState, + 1); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); // Verify that we never produce anything if it's not a VersionSwap Message ControlMessage ignoredControlMessage = new ControlMessage(); ignoredControlMessage.controlMessageUnion = new EndOfIncrementalPush(); - changeCaptureViewWriter.processControlMessage(ignoredControlMessage, 1, mockLeaderPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + ignoredControlMessage, + 1, + mockLeaderPartitionConsumptionState, + 1); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); @@ -126,11 +143,23 @@ public void testConstructVersionSwapMessage() { ignoredVersionSwapMessage.oldServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 2); ignoredVersionSwapMessage.newServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 3); ignoredControlMessage.controlMessageUnion = ignoredVersionSwapMessage; - changeCaptureViewWriter.processControlMessage(ignoredControlMessage, 1, mockLeaderPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + ignoredControlMessage, + 1, + mockLeaderPartitionConsumptionState, + 1); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); - changeCaptureViewWriter.processControlMessage(controlMessage, 1, mockLeaderPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + controlMessage, + 1, + mockLeaderPartitionConsumptionState, + 1); ArgumentCaptor messageArgumentCaptor = ArgumentCaptor.forClass(ControlMessage.class); // Verify and capture input @@ -156,7 +185,7 @@ public void testConstructVersionSwapMessage() { @Test public void testBuildWriterOptions() { // Set up mocks - Store mockStore = Mockito.mock(Store.class); + Store mockStore = mock(Store.class); Version version = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID); Mockito.when(mockStore.getVersionOrThrow(1)).thenReturn(version); @@ -164,18 +193,18 @@ public void testBuildWriterOptions() { VeniceProperties props = VeniceProperties.empty(); Object2IntMap urlMappingMap = new Object2IntOpenHashMap<>(); - CompletableFuture mockFuture = Mockito.mock(CompletableFuture.class); + CompletableFuture mockFuture = mock(CompletableFuture.class); - VeniceWriter mockVeniceWriter = Mockito.mock(VeniceWriter.class); + VeniceWriter mockVeniceWriter = mock(VeniceWriter.class); Mockito.when(mockVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(mockFuture); - VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); Mockito.when(mockVeniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(urlMappingMap); - VeniceConfigLoader mockVeniceConfigLoader = Mockito.mock(VeniceConfigLoader.class); + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); Mockito.when(mockVeniceConfigLoader.getCombinedProperties()).thenReturn(props); - PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = Mockito.mock(PubSubProducerAdapterFactory.class); - PubSubClientsFactory mockPubSubClientsFactory = Mockito.mock(PubSubClientsFactory.class); + PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + PubSubClientsFactory mockPubSubClientsFactory = mock(PubSubClientsFactory.class); Mockito.when(mockPubSubClientsFactory.getProducerAdapterFactory()).thenReturn(mockPubSubProducerAdapterFactory); Mockito.when(mockVeniceServerConfig.getPubSubClientsFactory()).thenReturn(mockPubSubClientsFactory); Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); @@ -196,20 +225,20 @@ public void testBuildWriterOptions() { @Test public void testProcessRecord() throws ExecutionException, InterruptedException { // Set up mocks - Store mockStore = Mockito.mock(Store.class); + Store mockStore = mock(Store.class); VeniceProperties props = VeniceProperties.empty(); Object2IntMap urlMappingMap = new Object2IntOpenHashMap<>(); - CompletableFuture mockFuture = Mockito.mock(CompletableFuture.class); + CompletableFuture mockFuture = mock(CompletableFuture.class); - VeniceWriter mockVeniceWriter = Mockito.mock(VeniceWriter.class); + VeniceWriter mockVeniceWriter = mock(VeniceWriter.class); Mockito.when(mockVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(mockFuture); - VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); Mockito.when(mockVeniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(urlMappingMap); - VeniceConfigLoader mockVeniceConfigLoader = Mockito.mock(VeniceConfigLoader.class); - PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = Mockito.mock(PubSubProducerAdapterFactory.class); - PubSubClientsFactory mockPubSubClientsFactory = Mockito.mock(PubSubClientsFactory.class); + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); + PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + PubSubClientsFactory mockPubSubClientsFactory = mock(PubSubClientsFactory.class); Mockito.when(mockPubSubClientsFactory.getProducerAdapterFactory()).thenReturn(mockPubSubProducerAdapterFactory); Mockito.when(mockVeniceServerConfig.getPubSubClientsFactory()).thenReturn(mockPubSubClientsFactory); Mockito.when(mockVeniceConfigLoader.getCombinedProperties()).thenReturn(props); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameterKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameterKeys.java deleted file mode 100644 index 1944ca1d508..00000000000 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameterKeys.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.linkedin.venice.meta; - -public enum ViewParameterKeys { - /** - * Parameter key used to specify the re-partition view name. - */ - MATERIALIZED_VIEW_NAME, - /** - * Parameter key used to specify the partitioner for the re-partition view. - */ - MATERIALIZED_VIEW_PARTITIONER, - /** - * Parameter key used to specify the partitioner parameters for the partitioner associated with the re-partition view. - */ - MATERIALIZED_VIEW_PARTITIONER_PARAMS, - /** - * Parameter key used to specify the partition count for the re-partition view. - */ - MATERIALIZED_VIEW_PARTITION_COUNT; -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java new file mode 100644 index 00000000000..a8001c21c78 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java @@ -0,0 +1,80 @@ +package com.linkedin.venice.meta; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.linkedin.venice.utils.ObjectMapperFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + + +public enum ViewParameters { + /** + * Parameter key used to specify the re-partition view name. + */ + MATERIALIZED_VIEW_NAME, + /** + * Parameter key used to specify the partitioner for the re-partition view. + */ + MATERIALIZED_VIEW_PARTITIONER, + /** + * Parameter key used to specify the partitioner parameters for the partitioner associated with the re-partition view. + */ + MATERIALIZED_VIEW_PARTITIONER_PARAMS, + /** + * Parameter key used to specify the partition count for the re-partition view. + */ + MATERIALIZED_VIEW_PARTITION_COUNT; + + public static class Builder { + private String viewName; + private String partitioner; + private String partitionerParams; + private String partitionCount; + + public Builder(String viewName) { + this.viewName = Objects.requireNonNull(viewName, "View name cannot be null for ViewParameters"); + } + + public Builder(String viewName, Map viewParams) { + this.viewName = viewName; + this.partitioner = viewParams.get(MATERIALIZED_VIEW_PARTITIONER.name()); + this.partitionerParams = viewParams.get(MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); + this.partitionCount = viewParams.get(MATERIALIZED_VIEW_PARTITION_COUNT.name()); + } + + public Builder setPartitioner(String partitioner) { + this.partitioner = partitioner; + return this; + } + + public Builder setPartitionerParams(String partitionerParams) { + this.partitionerParams = partitionerParams; + return this; + } + + public Builder setPartitionerParams(Map partitionerParams) throws JsonProcessingException { + this.partitionerParams = ObjectMapperFactory.getInstance().writeValueAsString(partitionerParams); + return this; + } + + public Builder setPartitionCount(int partitionCount) { + this.partitionCount = Integer.toString(partitionCount); + return this; + } + + public Map build() { + Map viewParams = new HashMap<>(); + viewParams.put(MATERIALIZED_VIEW_NAME.name(), viewName); + if (partitioner != null) { + viewParams.put(MATERIALIZED_VIEW_PARTITIONER.name(), partitioner); + } + if (partitionerParams != null) { + viewParams.put(MATERIALIZED_VIEW_PARTITIONER_PARAMS.name(), partitionerParams); + } + if (partitionCount != null) { + viewParams.put(MATERIALIZED_VIEW_PARTITION_COUNT.name(), partitionCount); + } + return viewParams; + } + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java index 70492cdfea2..3cbfdf5726b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java @@ -430,14 +430,13 @@ private void addHistoricStatus(ExecutionStatus status, String incrementalPushVer * Checks whether at least one replica of each partition has returned {@link ExecutionStatus#END_OF_PUSH_RECEIVED} * * This is intended for {@link OfflinePushStatus} instances which belong to Hybrid Stores, though there - * should be no negative side-effects if called on an instance tied to a non-hybrid store, as the logic - * should consistently return false in that case. + * should be no negative side-effects if called on an instance tied to a non-hybrid store. * * @return true if at least one replica of each partition has consumed an EOP control message, false otherwise */ - public boolean isReadyToStartBufferReplay(boolean isDataRecovery) { + public boolean isEOPReceivedInEveryPartition(boolean isDataRecovery) { // Only allow the push in STARTED status to start buffer replay. It could avoid: - // 1. Send duplicated start buffer replay message. + // 1. Send duplicated CM such as the start buffer replay message. // 2. Send start buffer replay message when a push had already been terminated. if (!getCurrentStatus().equals(STARTED)) { return false; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java index 4ecf0f4456c..68b14e69583 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java @@ -1,5 +1,6 @@ package com.linkedin.venice.utils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.PartitionerConfig; @@ -7,6 +8,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; +import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; @@ -95,6 +97,20 @@ public static VenicePartitioner getUserPartitionLevelVenicePartitioner(Partition return getVenicePartitioner(config.getPartitionerClass(), new VeniceProperties(params)); } + public static VenicePartitioner getVenicePartitioner(String partitionerClass, String partitionerParamsString) { + Properties params = new Properties(); + if (partitionerParamsString != null) { + Map partitionerParamsMap = null; + try { + partitionerParamsMap = ObjectMapperFactory.getInstance().readValue(partitionerParamsString, Map.class); + } catch (JsonProcessingException e) { + throw new VeniceException("Invalid partitioner params string: " + partitionerParamsString, e); + } + params.putAll(partitionerParamsMap); + } + return getVenicePartitioner(partitionerClass, new VeniceProperties(params), null); + } + public static VenicePartitioner getVenicePartitioner(String partitionerClass, VeniceProperties params) { return getVenicePartitioner(partitionerClass, params, null); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java index 5bc07fb71c3..05f743b9bbb 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java @@ -33,10 +33,12 @@ public String getWriterClassName() { @Override public void validateConfigs() { - super.validateConfigs(); - // change capture requires chunking to be enabled. This is because it's logistically difficult to insist + // change capture requires A/A and chunking to be enabled. This is because it's logistically difficult to insist // that all rows be under 50% the chunking threshhold (since we have to publish the before and after image of the // record to the change capture topic). So we make a blanket assertion. + if (!store.isActiveActiveReplicationEnabled()) { + throw new VeniceException("Views are not supported with non Active/Active stores!"); + } if (!store.isChunkingEnabled()) { throw new VeniceException("Change capture view are not supported with stores that don't have chunking enabled!"); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java index 87bf644c384..79db7698a55 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java @@ -1,11 +1,16 @@ package com.linkedin.venice.views; +import static com.linkedin.venice.views.ViewUtils.PARTITION_COUNT; + import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.partitioner.VenicePartitioner; +import com.linkedin.venice.utils.PartitionUtils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; import java.util.Collections; import java.util.Map; import java.util.Properties; @@ -13,16 +18,30 @@ public class MaterializedView extends VeniceView { public static final String MATERIALIZED_VIEW_TOPIC_SUFFIX = "_mv"; + public static final String MATERIALIZED_VIEW_WRITER_CLASS_NAME = + "com.linkedin.davinci.store.view.MaterializedViewWriter"; private static final String MISSING_PARAMETER_MESSAGE = "%s is required for materialized view!"; + private final int viewPartitionCount; + + private Lazy viewPartitioner; public MaterializedView(Properties props, Store store, Map viewParameters) { super(props, store, viewParameters); + // Override topic partition count config + viewPartitionCount = Integer.parseInt(viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + this.props.put(PARTITION_COUNT, viewPartitionCount); + viewPartitioner = Lazy.of(() -> { + String viewPartitionerClass = this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); + String viewPartitionerParamsString = + this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); + return PartitionUtils.getVenicePartitioner(viewPartitionerClass, viewPartitionerParamsString); + }); } @Override public Map getTopicNamesAndConfigsForVersion(int version) { VeniceProperties properties = new VeniceProperties(props); - String viewName = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()); + String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()); return Collections.singletonMap( Version.composeKafkaTopic(store.getName(), version) + VIEW_TOPIC_SEPARATOR + viewName + MATERIALIZED_VIEW_TOPIC_SUFFIX, @@ -30,36 +49,34 @@ public Map getTopicNamesAndConfigsForVersion(int versi } /** - * {@link ViewParameterKeys#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view. - * {@link ViewParameterKeys#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level + * {@link ViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view. + * {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level * partitioner config if it's not specified in the view parameters. - * {@link ViewParameterKeys#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional. + * {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional. */ @Override public void validateConfigs() { - super.validateConfigs(); - String viewName = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()); + String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()); if (viewName == null) { - throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameterKeys.MATERIALIZED_VIEW_NAME.name())); + throw new VeniceException(String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_NAME.name())); } if (store.getViewConfigs().containsKey(viewName)) { throw new VeniceException("A view config with the same view name already exist, view name: " + viewName); } - String viewPartitioner = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name()); + String viewPartitioner = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); if (viewPartitioner == null) { throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name())); + String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())); } try { Class.forName(viewPartitioner); } catch (ClassNotFoundException e) { throw new VeniceException("Cannot find partitioner class: " + viewPartitioner); } - String partitionCountString = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name()); + String partitionCountString = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()); if (partitionCountString == null) { throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); } int viewPartitionCount = Integer.parseInt(partitionCountString); // A materialized view with the exact same partitioner and partition count as the store is not allwoed @@ -73,9 +90,9 @@ public void validateConfigs() { ViewConfig viewConfig = viewConfigEntries.getValue(); if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) { String configPartitioner = - viewConfig.getViewParameters().get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name()); + viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); int configPartitionCount = Integer - .parseInt(viewConfig.getViewParameters().get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + .parseInt(viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); if (configPartitionCount == viewPartitionCount && configPartitioner.equals(viewPartitioner)) { throw new VeniceException( "A view with identical view configs already exist, view name: " + viewConfigEntries.getKey()); @@ -83,4 +100,17 @@ public void validateConfigs() { } } } + + @Override + public String getWriterClassName() { + return MATERIALIZED_VIEW_WRITER_CLASS_NAME; + } + + public int getViewPartitionCount() { + return viewPartitionCount; + } + + public VenicePartitioner getViewPartitioner() { + return viewPartitioner.get(); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java index 381db477fa8..7b5d2f5fe8d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java @@ -65,13 +65,10 @@ public String getWriterClassName() { /** * Validate that the configs set up for this view for this store are valid. If not, throw an exception. Implementors - * should override this function to add their own validation logic, and need only call this base implementation optionally. + * should override this function to add their own validation logic. */ public void validateConfigs() { - // All views which publish data only work with A/A. Views which don't publish data should override this validation - if (!store.isActiveActiveReplicationEnabled()) { - throw new VeniceException("Views are not supported with non Active/Active stores!"); - } + // validation based on view implementation } public void close() { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 5d8cd79cb28..6c938d828f2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -1906,6 +1906,25 @@ public CompletableFuture sendHeartbeat( callback); } + public CompletableFuture sendHeartbeat( + String topicName, + int partitionNumber, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + boolean addLeaderCompleteState, + LeaderCompleteState leaderCompleteState, + long originTimeStampMs) { + KafkaMessageEnvelope kafkaMessageEnvelope = + getHeartbeatKME(originTimeStampMs, leaderMetadataWrapper, heartBeatMessage, writerId); + return producerAdapter.sendMessage( + topicName, + partitionNumber, + KafkaKey.HEART_BEAT, + kafkaMessageEnvelope, + getHeaders(kafkaMessageEnvelope.getProducerMetadata(), addLeaderCompleteState, leaderCompleteState), + callback); + } + /** * The Key part of the {@link KafkaKey} needs to be unique in order to avoid clobbering each other during * Kafka's Log Compaction. Since there is no key per se associated with control messages, we generate one diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java index 4e05a0453fb..159ec39c0e7 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java @@ -109,7 +109,7 @@ public void testIsReadyToStartBufferReplay() { } offlinePushStatus.setPartitionStatuses(Collections.singletonList(partitionStatus)); Assert.assertTrue( - offlinePushStatus.isReadyToStartBufferReplay(false), + offlinePushStatus.isEOPReceivedInEveryPartition(false), "Buffer replay should be allowed to start since END_OF_PUSH_RECEIVED was already received"); } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/views/MaterializedViewTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java similarity index 77% rename from internal/venice-common/src/test/java/com/linkedin/venice/views/MaterializedViewTest.java rename to internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java index 394216ef9eb..44566c64f98 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/views/MaterializedViewTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java @@ -9,7 +9,7 @@ import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.ViewConfig; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.ConstantVenicePartitioner; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.utils.VeniceProperties; @@ -20,7 +20,7 @@ import org.testng.annotations.Test; -public class MaterializedViewTest { +public class TestMaterializedView { @Test public void testValidateConfigs() { Properties properties = new Properties(); @@ -28,46 +28,43 @@ public void testValidateConfigs() { Store testStore = getMockStore("test-store", 12); // Fail due to missing view name assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name(), "test-view"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), "test-view"); // Fail due to missing partition count assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); // Fail due to same partitioner and partition count assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - ConstantVenicePartitioner.class.getCanonicalName()); + viewParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); // Pass, same partition count but different partitioner new MaterializedView(properties, testStore, viewParams).validateConfigs(); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); // Pass, same partitioner but different partition count new MaterializedView(properties, testStore, viewParams).validateConfigs(); viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), + ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName() + "DNE"); // Fail due to invalid partitioner class assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - ConstantVenicePartitioner.class.getCanonicalName()); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); + viewParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); Store storeWithExistingViews = getMockStore("test-store-existing-config", 12); ViewConfig viewConfig = mock(ViewConfig.class); doReturn(Collections.singletonMap("test-view", viewConfig)).when(storeWithExistingViews).getViewConfigs(); // Fail due to same view name assertThrows(() -> new MaterializedView(properties, storeWithExistingViews, viewParams).validateConfigs()); Map existingViewConfigParams = new HashMap<>(); - existingViewConfigParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - ConstantVenicePartitioner.class.getCanonicalName()); - existingViewConfigParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(12)); + existingViewConfigParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); + existingViewConfigParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(12)); doReturn(existingViewConfigParams).when(viewConfig).getViewParameters(); doReturn(MaterializedView.class.getCanonicalName()).when(viewConfig).getViewClassName(); doReturn(Collections.singletonMap("old-view", viewConfig)).when(storeWithExistingViews).getViewConfigs(); // Fail due to existing identical view config assertThrows(() -> new MaterializedView(properties, storeWithExistingViews, viewParams).validateConfigs()); - existingViewConfigParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(36)); + existingViewConfigParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(36)); // Pass, same partitioner but different partition count new MaterializedView(properties, testStore, viewParams).validateConfigs(); } @@ -79,8 +76,8 @@ public void testRePartitionViewTopicProcessing() { int version = 8; Store testStore = getMockStore(storeName, 6); String rePartitionViewName = "test-view"; - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name(), rePartitionViewName); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), rePartitionViewName); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); MaterializedView materializedView = new MaterializedView(new Properties(), testStore, viewParams); Map rePartitionViewTopicMap = materializedView.getTopicNamesAndConfigsForVersion(version); assertEquals(rePartitionViewTopicMap.size(), 1); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java new file mode 100644 index 00000000000..7f172acab1f --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java @@ -0,0 +1,151 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; +import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; +import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; +import static com.linkedin.venice.views.MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX; +import static com.linkedin.venice.views.VeniceView.VIEW_TOPIC_SEPARATOR; +import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP; +import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP; + +import com.linkedin.venice.ConfigKeys; +import com.linkedin.venice.controller.VeniceHelixAdmin; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.views.MaterializedView; +import it.unimi.dsi.fastutil.ints.Int2LongMap; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.apache.avro.Schema; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class MaterializedViewTest { + private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE; + private static final String[] CLUSTER_NAMES = + IntStream.range(0, 1).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); + + private List childDatacenters; + private List parentControllers; + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + private String clusterName; + + @BeforeClass(alwaysRun = true) + public void setUp() { + Properties serverProperties = new Properties(); + serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); + serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); + serverProperties.put( + CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + DEFAULT_PARENT_DATA_CENTER_REGION_NAME, + "localhost:" + TestUtils.getFreePort()); + multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + 1, + 1, + 1, + 1, + 1, + 1, + 1, + Optional.empty(), + Optional.empty(), + Optional.of(serverProperties), + false); + + childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); + parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); + clusterName = CLUSTER_NAMES[0]; + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + multiRegionMultiClusterWrapper.close(); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testLFIngestionWithMaterializedView() throws IOException { + // Create a non-A/A store with materialized view and run batch push job with 100 records + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("store"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setHybridRewindSeconds(500) + .setHybridOffsetLagThreshold(8) + .setChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3); + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + String testViewName = "MaterializedViewTest"; + ViewParameters.Builder viewParamBuilder = new ViewParameters.Builder(testViewName).setPartitionCount(6); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + Assert.assertEquals(viewConfigMap.get(testViewName).getViewParameters().size(), 3); + }); + + TestWriteUtils.runPushJob("Run push job", props); + // TODO we will verify the actual content once the DVC consumption part of the view topic is completed. + // For now just check for topic existence and that they contain some records. + String viewTopicName = Version.composeKafkaTopic(storeName, 1) + VIEW_TOPIC_SEPARATOR + testViewName + + MATERIALIZED_VIEW_TOPIC_SUFFIX; + String versionTopicName = Version.composeKafkaTopic(storeName, 1); + for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); + PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); + PubSubTopic versionPubSubTopic = admin.getPubSubTopicRepository().getTopic(versionTopicName); + Assert.assertTrue(admin.getTopicManager().containsTopic(viewPubSubTopic)); + long records = 0; + long versionTopicRecords = 0; + Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); + Int2LongMap versionTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(versionPubSubTopic); + Assert.assertEquals(versionTopicOffsetMap.keySet().size(), 3, "Unexpected version partition count"); + Assert.assertEquals(viewTopicOffsetMap.keySet().size(), 6, "Unexpected view partition count"); + for (long endOffset: viewTopicOffsetMap.values()) { + records += endOffset; + } + for (long endOffset: versionTopicOffsetMap.values()) { + versionTopicRecords += endOffset; + } + Assert.assertTrue(versionTopicRecords > 100, "Version topic records size: " + versionTopicRecords); + Assert.assertTrue(records > 100, "View topic records size: " + records); + } + } + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index f8f67d2a9e1..f429fe9e59e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -5,7 +5,9 @@ import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.api.PubSubProduceResult; @@ -43,8 +45,20 @@ public CompletableFuture processRecord( } + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId) { + internalView.incrementRecordCount(store.getName()); + return CompletableFuture.completedFuture(null); + } + @Override public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java index 0a1182c9be7..429e2c6fe9e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java @@ -165,7 +165,8 @@ public HelixVeniceClusterResources( helixAdminClient, config, admin.getPushStatusStoreReader(), - admin.getDisabledPartitionStats(clusterName)); + admin.getDisabledPartitionStats(clusterName), + admin.getVeniceWriterFactory()); this.leakedPushStatusCleanUpService = new LeakedPushStatusCleanUpService( clusterName, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 9c65c13eded..e5f381ec118 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -216,6 +216,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.utils.locks.AutoCloseableLock; +import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.VeniceWriter; @@ -2409,25 +2410,25 @@ public void createHelixResourceAndStartMonitoring(String clusterName, String sto * @param store the store to create these resources for * @param version the store version to create these resources for */ - private void constructViewResources(Properties params, Store store, int version) { - Map viewConfigs = store.getViewConfigs(); + private void constructViewResources(Properties params, Store store, Version version, String compressionDictionary) { + Map viewConfigs = version.getViewConfigs(); if (viewConfigs == null || viewConfigs.isEmpty()) { return; } // Construct Kafka topics // TODO: Today we only have support for creating Kafka topics as a resource for a given view, but later we would - // like - // to add support for potentially other resource types (maybe helix RG's as an example?) + // like to add support for potentially other resource types (maybe helix RG's as an example?) Map topicNamesAndConfigs = new HashMap<>(); for (ViewConfig rawView: viewConfigs.values()) { VeniceView adminView = ViewUtils.getVeniceView(rawView.getViewClassName(), params, store, rawView.getViewParameters()); - topicNamesAndConfigs.putAll(adminView.getTopicNamesAndConfigsForVersion(version)); + topicNamesAndConfigs.putAll(adminView.getTopicNamesAndConfigsForVersion(version.getNumber())); } TopicManager topicManager = getTopicManager(); for (Map.Entry topicNameAndConfigs: topicNamesAndConfigs.entrySet()) { - PubSubTopic kafkaTopic = pubSubTopicRepository.getTopic(topicNameAndConfigs.getKey()); + String materializedViewTopicName = topicNameAndConfigs.getKey(); + PubSubTopic kafkaTopic = pubSubTopicRepository.getTopic(materializedViewTopicName); VeniceProperties kafkaTopicConfigs = topicNameAndConfigs.getValue(); topicManager.createTopic( kafkaTopic, @@ -2437,6 +2438,26 @@ private void constructViewResources(Properties params, Store store, int version) kafkaTopicConfigs.getBoolean(LOG_COMPACTION_ENABLED), kafkaTopicConfigs.getOptionalInt(KAFKA_MIN_IN_SYNC_REPLICAS), kafkaTopicConfigs.getBoolean(USE_FAST_KAFKA_OPERATION_TIMEOUT)); + if (topicNameAndConfigs.getKey().endsWith(MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX)) { + // Send SOP CM to materialized view topic with sorted flag equal to false due to reshuffle + VeniceWriterOptions.Builder vwOptionsBuilder = + new VeniceWriterOptions.Builder(materializedViewTopicName).setUseKafkaKeySerializer(true) + .setPartitionCount(kafkaTopicConfigs.getInt(PARTITION_COUNT)); + ByteBuffer compressionDictBuffer = null; + if (compressionDictionary != null) { + compressionDictBuffer = ByteBuffer.wrap(EncodingUtils.base64DecodeFromString(compressionDictionary)); + } else if (version.getCompressionStrategy().equals(CompressionStrategy.ZSTD_WITH_DICT)) { + compressionDictBuffer = emptyPushZSTDDictionary.get(); + } + try (VeniceWriter veniceWriter = getVeniceWriterFactory().createVeniceWriter(vwOptionsBuilder.build())) { + veniceWriter.broadcastStartOfPush( + false, + version.isChunkingEnabled(), + version.getCompressionStrategy(), + Optional.ofNullable(compressionDictBuffer), + Collections.emptyMap()); + } + } } } @@ -2803,16 +2824,6 @@ private Pair addVersion( version.setRepushSourceVersion(repushSourceVersion); } - Properties veniceViewProperties = new Properties(); - veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions); - veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout); - veniceViewProperties.putAll(clusterConfig.getProps().toProperties()); - veniceViewProperties.put(LOG_COMPACTION_ENABLED, false); - veniceViewProperties.put(KAFKA_REPLICATION_FACTOR, clusterConfig.getKafkaReplicationFactor()); - veniceViewProperties.put(ETERNAL_TOPIC_RETENTION_ENABLED, true); - - constructViewResources(veniceViewProperties, store, version.getNumber()); - repository.updateStore(store); LOGGER.info("Add version: {} for store: {}", version.getNumber(), storeName); @@ -2838,12 +2849,25 @@ private Pair addVersion( useFastKafkaOperationTimeout); } + // We shouldn't need to create view resources in parent fabric + if (!multiClusterConfigs.isParent()) { + Properties veniceViewProperties = new Properties(); + veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions); + veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout); + veniceViewProperties.putAll(clusterConfig.getProps().toProperties()); + veniceViewProperties.put(LOG_COMPACTION_ENABLED, false); + veniceViewProperties.put(KAFKA_REPLICATION_FACTOR, clusterConfig.getKafkaReplicationFactor()); + veniceViewProperties.put(ETERNAL_TOPIC_RETENTION_ENABLED, true); + + constructViewResources(veniceViewProperties, store, version, compressionDictionary); + } + if (sendStartOfPush) { ByteBuffer compressionDictionaryBuffer = null; if (compressionDictionary != null) { compressionDictionaryBuffer = ByteBuffer.wrap(EncodingUtils.base64DecodeFromString(compressionDictionary)); - } else if (store.getCompressionStrategy().equals(CompressionStrategy.ZSTD_WITH_DICT)) { + } else if (version.getCompressionStrategy().equals(CompressionStrategy.ZSTD_WITH_DICT)) { // This compression strategy needs a dictionary even if there is no input data, // so we generate a dictionary based on synthetic data. This is done in vpj driver // as well, but this code will be triggered in cases like Samza batch push job @@ -2851,28 +2875,27 @@ private Pair addVersion( compressionDictionaryBuffer = emptyPushZSTDDictionary.get(); } - final Version finalVersion = version; VeniceWriter veniceWriter = null; try { VeniceWriterOptions.Builder vwOptionsBuilder = - new VeniceWriterOptions.Builder(finalVersion.kafkaTopicName()).setUseKafkaKeySerializer(true) + new VeniceWriterOptions.Builder(version.kafkaTopicName()).setUseKafkaKeySerializer(true) .setPartitionCount(numberOfPartitions); - if (multiClusterConfigs.isParent() && finalVersion.isNativeReplicationEnabled()) { + if (multiClusterConfigs.isParent() && version.isNativeReplicationEnabled()) { // Produce directly into one of the child fabric - vwOptionsBuilder.setBrokerAddress(finalVersion.getPushStreamSourceAddress()); + vwOptionsBuilder.setBrokerAddress(version.getPushStreamSourceAddress()); } veniceWriter = getVeniceWriterFactory().createVeniceWriter(vwOptionsBuilder.build()); veniceWriter.broadcastStartOfPush( sorted, - finalVersion.isChunkingEnabled(), - finalVersion.getCompressionStrategy(), + version.isChunkingEnabled(), + version.getCompressionStrategy(), Optional.ofNullable(compressionDictionaryBuffer), Collections.emptyMap()); if (pushType.isStreamReprocessing()) { // Send TS message to version topic to inform leader to switch to the stream reprocessing topic veniceWriter.broadcastTopicSwitch( Collections.singletonList(getKafkaBootstrapServers(isSslToKafka())), - Version.composeStreamReprocessingTopic(finalVersion.getStoreName(), finalVersion.getNumber()), + Version.composeStreamReprocessingTopic(version.getStoreName(), version.getNumber()), -1L, // -1 indicates rewinding from the beginning of the source topic new HashMap<>()); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 5020f60f6a8..4c0aa6b9993 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -193,7 +193,7 @@ import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.persona.StoragePersona; import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -2815,28 +2815,21 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi String.format("Materialized View name cannot contain version separator: %s", VERSION_SEPARATOR)); } Map viewParams = viewConfig.getViewParameters(); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name(), viewName); - if (!viewParams.containsKey(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name())) { - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - store.getPartitionerConfig().getPartitionerClass()); + ViewParameters.Builder decoratedViewParamBuilder = new ViewParameters.Builder(viewName, viewParams); + if (!viewParams.containsKey(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())) { + decoratedViewParamBuilder.setPartitioner(store.getPartitionerConfig().getPartitionerClass()); if (!store.getPartitionerConfig().getPartitionerParams().isEmpty()) { try { - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name(), - ObjectMapperFactory.getInstance() - .writeValueAsString(store.getPartitionerConfig().getPartitionerParams())); + decoratedViewParamBuilder.setPartitionerParams(store.getPartitionerConfig().getPartitionerParams()); } catch (JsonProcessingException e) { throw new VeniceException("Failed to convert store partitioner params to string", e); } } } - if (!viewParams.containsKey(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), - Integer.toString(store.getPartitionCount())); + if (!viewParams.containsKey(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { + decoratedViewParamBuilder.setPartitionCount(store.getPartitionCount()); } - viewConfig.setViewParameters(viewParams); + viewConfig.setViewParameters(decoratedViewParamBuilder.build()); } VeniceView view = ViewUtils.getVeniceView(viewConfig.getViewClassName(), new Properties(), store, viewConfig.getViewParameters()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 9a5a78e61f0..973a81cde86 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -28,6 +28,7 @@ import com.linkedin.venice.meta.UncompletedReplica; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.utils.HelixUtils; @@ -35,6 +36,12 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.views.ViewUtils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -44,6 +51,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -88,6 +96,7 @@ public abstract class AbstractPushMonitor private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled; private final DisabledPartitionStats disabledPartitionStats; + private final VeniceWriterFactory veniceWriterFactory; public AbstractPushMonitor( String clusterName, @@ -103,7 +112,8 @@ public AbstractPushMonitor( HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - DisabledPartitionStats disabledPartitionStats) { + DisabledPartitionStats disabledPartitionStats, + VeniceWriterFactory veniceWriterFactory) { this.clusterName = clusterName; this.offlinePushAccessor = offlinePushAccessor; this.storeCleaner = storeCleaner; @@ -134,6 +144,7 @@ public AbstractPushMonitor( controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(), controllerConfig.useDaVinciSpecificExecutionStatusForError()); this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled(); + this.veniceWriterFactory = veniceWriterFactory; pushStatusCollector.start(); } @@ -172,7 +183,7 @@ private void loadAllPushes(List offlinePushStatusList) { if (statusWithDetails.getStatus().isTerminal()) { handleTerminalOfflinePushUpdate(offlinePushStatus, statusWithDetails); } else { - checkWhetherToStartBufferReplayForHybrid(offlinePushStatus); + checkWhetherToStartEOPProcedures(offlinePushStatus); } } else { // In any case, we found the offline push status is STARTED, but the related version could not be found. @@ -794,7 +805,7 @@ public void onPartitionStatusChange(String topic, ReadOnlyPartitionStatus partit } protected void onPartitionStatusChange(OfflinePushStatus offlinePushStatus) { - checkWhetherToStartBufferReplayForHybrid(offlinePushStatus); + checkWhetherToStartEOPProcedures(offlinePushStatus); } protected DisableReplicaCallback getDisableReplicaCallback(String kafkaTopic) { @@ -864,7 +875,7 @@ public void onExternalViewChange(PartitionAssignment partitionAssignment) { handleTerminalOfflinePushUpdate(pushStatus, statusWithDetails); } else if (statusWithDetails.getStatus().equals(ExecutionStatus.END_OF_PUSH_RECEIVED)) { // For all partitions, at least one replica has received the EOP. Check if it's time to start buffer replay. - checkWhetherToStartBufferReplayForHybrid(pushStatus); + checkWhetherToStartEOPProcedures(pushStatus); } } } else { @@ -906,7 +917,7 @@ public void onRoutingDataDeleted(String kafkaTopic) { } } - protected void checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlinePushStatus) { + protected void checkWhetherToStartEOPProcedures(OfflinePushStatus offlinePushStatus) { // As the outer method already locked on this instance, so this method is thread-safe. String storeName = Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic()); Store store = getReadWriteStoreRepository().getStore(storeName); @@ -923,35 +934,74 @@ protected void checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlin } } - if (store.isHybrid()) { - Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic())); - boolean isDataRecovery = version != null && version.getDataRecoveryVersionConfig() != null; - if (offlinePushStatus.isReadyToStartBufferReplay(isDataRecovery)) { - LOGGER.info("{} is ready to start buffer replay.", offlinePushStatus.getKafkaTopic()); - RealTimeTopicSwitcher realTimeTopicSwitcher = getRealTimeTopicSwitcher(); - try { - String newStatusDetails; + Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic())); + if (version == null) { + throw new IllegalStateException("Could not find Version object for: " + offlinePushStatus.getKafkaTopic()); + } + Map viewConfigMap = version.getViewConfigs(); + if (!store.isHybrid() && (viewConfigMap == null || viewConfigMap.isEmpty())) { + // The only procedures that may start after EOP is received in every partition are: + // 1. start buffer replay for hybrid store + // 2. send EOP for materialized view (there could be more view procedures in the future) + return; + } + try { + boolean isDataRecovery = version.getDataRecoveryVersionConfig() != null; + boolean isEOPReceivedInAllPartitions = offlinePushStatus.isEOPReceivedInEveryPartition(isDataRecovery); + StringBuilder newStatusDetails = new StringBuilder(); + // Check whether to start buffer replay + if (store.isHybrid()) { + if (isEOPReceivedInAllPartitions) { + LOGGER.info("{} is ready to start buffer replay.", offlinePushStatus.getKafkaTopic()); + RealTimeTopicSwitcher realTimeTopicSwitcher = getRealTimeTopicSwitcher(); realTimeTopicSwitcher.switchToRealTimeTopic( Version.composeRealTimeTopic(storeName), offlinePushStatus.getKafkaTopic(), store, aggregateRealTimeSourceKafkaUrl, activeActiveRealTimeSourceKafkaURLs); - newStatusDetails = "kicked off buffer replay"; - updatePushStatus(offlinePushStatus, ExecutionStatus.END_OF_PUSH_RECEIVED, Optional.of(newStatusDetails)); - LOGGER.info("Successfully {} for offlinePushStatus: {}", newStatusDetails, offlinePushStatus); - } catch (Exception e) { - // TODO: Figure out a better error handling... - String newStatusDetails = "Failed to kick off the buffer replay"; - handleTerminalOfflinePushUpdate(offlinePushStatus, new ExecutionStatusWithDetails(ERROR, newStatusDetails)); - LOGGER.error("{} for offlinePushStatus: {}", newStatusDetails, offlinePushStatus, e); + newStatusDetails.append("kicked off buffer replay"); + } else if (!offlinePushStatus.getCurrentStatus().isTerminal()) { + LOGGER.info( + "{} is not ready to start buffer replay. Current state: {}", + offlinePushStatus.getKafkaTopic(), + offlinePushStatus.getCurrentStatus().toString()); } - } else if (!offlinePushStatus.getCurrentStatus().isTerminal()) { - LOGGER.info( - "{} is not ready to start buffer replay. Current state: {}", - offlinePushStatus.getKafkaTopic(), - offlinePushStatus.getCurrentStatus().toString()); } + + if (isEOPReceivedInAllPartitions) { + // Check whether to send EOP for materialized view topic(s) + for (ViewConfig rawView: viewConfigMap.values()) { + VeniceView veniceView = + ViewUtils.getVeniceView(rawView.getViewClassName(), new Properties(), store, rawView.getViewParameters()); + if (veniceView instanceof MaterializedView) { + MaterializedView materializedView = (MaterializedView) veniceView; + for (String materializedViewTopicName: materializedView + .getTopicNamesAndConfigsForVersion(version.getNumber()) + .keySet()) { + VeniceWriterOptions.Builder vwOptionsBuilder = + new VeniceWriterOptions.Builder(materializedViewTopicName).setUseKafkaKeySerializer(true) + .setPartitionCount(materializedView.getViewPartitionCount()); + try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(vwOptionsBuilder.build())) { + veniceWriter.broadcastEndOfPush(Collections.emptyMap()); + } + if (newStatusDetails.length() > 0) { + newStatusDetails.append(", "); + } + newStatusDetails.append("broadcast EOP to materialized view topic: ").append(materializedViewTopicName); + } + } + } + updatePushStatus( + offlinePushStatus, + ExecutionStatus.END_OF_PUSH_RECEIVED, + Optional.of(newStatusDetails.toString())); + LOGGER.info("Successfully {} for offlinePushStatus: {}", newStatusDetails.toString(), offlinePushStatus); + } + } catch (Exception e) { + String newStatusDetails = "Failed to start EOP procedures"; + handleTerminalOfflinePushUpdate(offlinePushStatus, new ExecutionStatusWithDetails(ERROR, newStatusDetails)); + LOGGER.error("{} for offlinePushStatus: {}", newStatusDetails, offlinePushStatus, e); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java index 8929d108ae2..2fe30cc4260 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java @@ -11,6 +11,7 @@ import com.linkedin.venice.meta.StoreCleaner; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.writer.VeniceWriterFactory; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +39,8 @@ public PartitionStatusBasedPushMonitor( HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - DisabledPartitionStats disabledPartitionStats) { + DisabledPartitionStats disabledPartitionStats, + VeniceWriterFactory veniceWriterFactory) { super( clusterName, offlinePushAccessor, @@ -53,7 +55,8 @@ public PartitionStatusBasedPushMonitor( helixAdminClient, controllerConfig, pushStatusStoreReader, - disabledPartitionStats); + disabledPartitionStats, + veniceWriterFactory); } @Override diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java index 1ead143b0a6..d567e246e98 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java @@ -19,6 +19,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.writer.VeniceWriterFactory; import java.util.List; import java.util.Map; import java.util.Optional; @@ -57,7 +58,8 @@ public PushMonitorDelegator( HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - DisabledPartitionStats disabledPartitionStats) { + DisabledPartitionStats disabledPartitionStats, + VeniceWriterFactory veniceWriterFactory) { this.clusterName = clusterName; this.metadataRepository = metadataRepository; @@ -75,7 +77,8 @@ public PushMonitorDelegator( helixAdminClient, controllerConfig, pushStatusStoreReader, - disabledPartitionStats); + disabledPartitionStats, + veniceWriterFactory); this.clusterLockManager = clusterLockManager; this.topicToPushMonitorMap = new VeniceConcurrentHashMap<>(); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index b8a8a2fc903..054a30824d9 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -70,7 +70,7 @@ import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.InvalidKeySchemaPartitioner; @@ -1974,7 +1974,7 @@ public void testSetRePartitionViewConfig() { String viewString = String.format( rePartitionViewConfigString, MaterializedView.class.getCanonicalName(), - ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), + ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), rePartitionViewPartitionCount); // Invalid re-partition view name @@ -1991,13 +1991,13 @@ public void testSetRePartitionViewConfig() { Assert.assertTrue(updateStore.getViews().containsKey(rePartitionViewName)); Map rePartitionViewParameters = updateStore.getViews().get(rePartitionViewName).viewParameters; - Assert.assertNotNull(rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name())); + Assert.assertNotNull(rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name())); Assert.assertEquals( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()).toString(), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), rePartitionViewName); Assert.assertEquals( Integer.parseInt( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), rePartitionViewPartitionCount); } @@ -2032,9 +2032,8 @@ public void testInsertMaterializedViewConfig() { String rePartitionViewName = "rePartitionViewA"; int rePartitionViewPartitionCount = 10; Map viewClassParams = new HashMap<>(); - viewClassParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), - Integer.toString(rePartitionViewPartitionCount)); + viewClassParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(rePartitionViewPartitionCount)); // Invalid re-partition view name Assert.assertThrows( @@ -2058,13 +2057,13 @@ public void testInsertMaterializedViewConfig() { Assert.assertTrue(updateStore.getViews().containsKey(rePartitionViewName)); Map rePartitionViewParameters = updateStore.getViews().get(rePartitionViewName).viewParameters; - Assert.assertNotNull(rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name())); + Assert.assertNotNull(rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name())); Assert.assertEquals( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()).toString(), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), rePartitionViewName); Assert.assertEquals( Integer.parseInt( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), rePartitionViewPartitionCount); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java index 1d8f06eecba..ec06fb0cd4e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java @@ -46,12 +46,22 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.views.ViewUtils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.VeniceWriterOptions; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import java.util.ArrayList; @@ -61,10 +71,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -85,6 +97,10 @@ public abstract class AbstractPushMonitorTest { protected MetricsRepository mockMetricRepo; + protected VeniceWriterFactory mockVeniceWriterFactory; + + protected VeniceWriter mockVeniceWriter; + private final static String clusterName = Utils.getUniqueString("test_cluster"); private final static String aggregateRealTimeSourceKafkaUrl = "aggregate-real-time-source-kafka-url"; private String storeName; @@ -114,12 +130,15 @@ public void setUp() { mockPushHealthStats = mock(AggPushHealthStats.class); clusterLockManager = new ClusterLockManager(clusterName); mockControllerConfig = mock(VeniceControllerClusterConfig.class); + mockVeniceWriterFactory = mock(VeniceWriterFactory.class); + mockVeniceWriter = mock(VeniceWriter.class); when(mockMetricRepo.sensor(anyString(), any())).thenReturn(mock(Sensor.class)); when(mockControllerConfig.isErrorLeaderReplicaFailOverEnabled()).thenReturn(true); when(mockControllerConfig.isDaVinciPushStatusEnabled()).thenReturn(true); when(mockControllerConfig.getDaVinciPushStatusScanIntervalInSeconds()).thenReturn(5); when(mockControllerConfig.getOffLineJobWaitTimeInMilliseconds()).thenReturn(120000L); when(mockControllerConfig.getDaVinciPushStatusScanThreadNumber()).thenReturn(4); + when(mockVeniceWriterFactory.createVeniceWriter(any())).thenReturn(mockVeniceWriter); monitor = getPushMonitor(); } @@ -1081,10 +1100,71 @@ public void testGetPushStatusAndDetails() { "Details should change as a side effect of calling getPushStatusAndDetails."); } - protected Store prepareMockStore(String topic, VersionStatus status) { + @Test + public void testEOPReceivedProcedures() { + Map viewConfigMap = new HashMap<>(); + Map viewParams = new HashMap<>(); + String viewName = "testView"; + int viewPartitionCount = 10; + viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), viewName); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(viewPartitionCount)); + viewParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), DefaultVenicePartitioner.class.getCanonicalName()); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParams); + viewConfigMap.put(viewName, viewConfig); + String topic = getTopic(); + int versionNumber = Version.parseVersionFromKafkaTopicName(topic); + Store store = prepareMockStore(topic, VersionStatus.STARTED, viewConfigMap); + VeniceView veniceView = + ViewUtils.getVeniceView(viewConfig.getViewClassName(), new Properties(), store, viewConfig.getViewParameters()); + assertTrue(veniceView instanceof MaterializedView); + MaterializedView materializedView = (MaterializedView) veniceView; + String viewTopicName = + materializedView.getTopicNamesAndConfigsForVersion(versionNumber).keySet().stream().findAny().get(); + assertNotNull(viewTopicName); + + monitor.startMonitorOfflinePush( + topic, + numberOfPartition, + replicationFactor, + OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION); + // Prepare the new partition status + List replicaStatuses = new ArrayList<>(); + for (int i = 0; i < replicationFactor; i++) { + ReplicaStatus replicaStatus = new ReplicaStatus("test" + i); + replicaStatuses.add(replicaStatus); + } + // All replicas are in STARTED status + ReadOnlyPartitionStatus partitionStatus = new ReadOnlyPartitionStatus(0, replicaStatuses); + doReturn(true).when(mockRoutingDataRepo).containsKafkaTopic(topic); + doReturn(new PartitionAssignment(topic, 1)).when(mockRoutingDataRepo).getPartitionAssignments(topic); + // Check hybrid push status + monitor.onPartitionStatusChange(topic, partitionStatus); + // Not ready to send EOP to view topics + verify(mockVeniceWriterFactory, never()).createVeniceWriter(any()); + verify(mockVeniceWriter, never()).broadcastEndOfPush(any()); + + // One replica received end of push + replicaStatuses.get(0).updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED); + monitor.onPartitionStatusChange(topic, partitionStatus); + ArgumentCaptor vwOptionsCaptor = ArgumentCaptor.forClass(VeniceWriterOptions.class); + verify(mockVeniceWriterFactory, times(1)).createVeniceWriter(vwOptionsCaptor.capture()); + assertEquals(vwOptionsCaptor.getValue().getPartitionCount(), Integer.valueOf(viewPartitionCount)); + assertEquals(vwOptionsCaptor.getValue().getTopicName(), viewTopicName); + verify(mockVeniceWriter, times(1)).broadcastEndOfPush(any()); + assertEquals(monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.END_OF_PUSH_RECEIVED); + + // Another replica received end of push. We shouldn't write multiple EOP to view topic + replicaStatuses.get(1).updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED); + monitor.onPartitionStatusChange(topic, partitionStatus); + verify(mockVeniceWriter, times(1)).broadcastEndOfPush(any()); + } + + protected Store prepareMockStore(String topic, VersionStatus status, Map viewConfigMap) { String storeName = Version.parseStoreFromKafkaTopicName(topic); int versionNumber = Version.parseVersionFromKafkaTopicName(topic); Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setViewConfigs(viewConfigMap); Version version = new VersionImpl(storeName, versionNumber); version.setStatus(status); store.addVersion(version); @@ -1092,6 +1172,10 @@ protected Store prepareMockStore(String topic, VersionStatus status) { return store; } + protected Store prepareMockStore(String topic, VersionStatus status) { + return prepareMockStore(topic, status, Collections.emptyMap()); + } + protected Store prepareMockStore(String topic) { return prepareMockStore(topic, VersionStatus.STARTED); } @@ -1151,4 +1235,8 @@ protected ClusterLockManager getClusterLockManager() { protected String getAggregateRealTimeSourceKafkaUrl() { return aggregateRealTimeSourceKafkaUrl; } + + protected VeniceWriterFactory getMockVeniceWriterFactory() { + return mockVeniceWriterFactory; + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java index 7c71932dcbe..f2d1793f38a 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java @@ -66,7 +66,8 @@ protected AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner) { helixAdminClient, getMockControllerConfig(), null, - mock(DisabledPartitionStats.class)); + mock(DisabledPartitionStats.class), + getMockVeniceWriterFactory()); } @Override @@ -85,7 +86,8 @@ protected AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher mockRealTimeT mock(HelixAdminClient.class), getMockControllerConfig(), null, - mock(DisabledPartitionStats.class)); + mock(DisabledPartitionStats.class), + getMockVeniceWriterFactory()); } @Test