diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 1ceaa95b653..582591544c8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -638,7 +638,7 @@ protected void processMessageAndMaybeProduceToKafka( // call in this context much less obtrusive, however, it implies that all views can only work for AA stores // Write to views - if (this.viewWriters.size() > 0) { + if (!this.viewWriters.isEmpty()) { /** * The ordering guarantees we want is the following: * @@ -647,24 +647,9 @@ protected void processMessageAndMaybeProduceToKafka( * producer (but not necessarily acked). */ long preprocessingTime = System.currentTimeMillis(); - CompletableFuture currentVersionTopicWrite = new CompletableFuture(); - CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; - int index = 0; - // The first future is for the previous write to VT - viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); - ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); - int oldValueSchemaId = - oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); - for (VeniceViewWriter writer: viewWriters.values()) { - viewWriterFutures[index++] = writer.processRecord( - mergeConflictResult.getNewValue(), - oldValueBB, - keyBytes, - versionNumber, - mergeConflictResult.getValueSchemaId(), - oldValueSchemaId, - mergeConflictResult.getRmdRecord()); - } + CompletableFuture currentVersionTopicWrite = new CompletableFuture(); + CompletableFuture[] viewWriterFutures = + processViewWriters(partitionConsumptionState, keyBytes, mergeConflictResultWrapper, null); CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); if (exception == null) { @@ -1512,6 +1497,33 @@ public boolean isReadyToServeAnnouncedWithRTLag() { return false; } + @Override + protected CompletableFuture[] processViewWriters( + PartitionConsumptionState partitionConsumptionState, + byte[] keyBytes, + MergeConflictResultWrapper mergeConflictResultWrapper, + WriteComputeResultWrapper writeComputeResultWrapper) { + CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; + MergeConflictResult mergeConflictResult = mergeConflictResultWrapper.getMergeConflictResult(); + int index = 0; + // The first future is for the previous write to VT + viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); + ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); + int oldValueSchemaId = + oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); + for (VeniceViewWriter writer: viewWriters.values()) { + viewWriterFutures[index++] = writer.processRecord( + mergeConflictResult.getNewValue(), + oldValueBB, + keyBytes, + versionNumber, + mergeConflictResult.getValueSchemaId(), + oldValueSchemaId, + mergeConflictResult.getRmdRecord()); + } + return viewWriterFutures; + } + Runnable buildRepairTask( String sourceKafkaUrl, PubSubTopicPartition sourceTopicPartition, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 9e0415d8755..3ff2bd51dd5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -9,6 +9,7 @@ import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT; +import static com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE; import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER; import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; @@ -30,6 +31,7 @@ import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.view.ChangeCaptureViewWriter; +import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.validation.KafkaDataIntegrityValidator; import com.linkedin.davinci.validation.PartitionTracker; @@ -55,6 +57,7 @@ import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; @@ -196,6 +199,8 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { protected final Map viewWriters; protected final boolean hasChangeCaptureView; + protected final boolean hasMaterializedView; + protected final boolean hasVersionCompleted; protected final AvroStoreDeserializerCache storeDeserializerCache; @@ -326,17 +331,25 @@ public LeaderFollowerStoreIngestionTask( version.getNumber(), schemaRepository.getKeySchema(store.getName()).getSchema()); boolean tmpValueForHasChangeCaptureViewWriter = false; + boolean tmpValueForHasMaterializedViewWriter = false; for (Map.Entry viewWriter: viewWriters.entrySet()) { if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) { tmpValueForHasChangeCaptureViewWriter = true; + } else if (viewWriter.getValue() instanceof MaterializedViewWriter) { + tmpValueForHasMaterializedViewWriter = true; + } + if (tmpValueForHasChangeCaptureViewWriter && tmpValueForHasMaterializedViewWriter) { break; } } hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter; + hasMaterializedView = tmpValueForHasMaterializedViewWriter; } else { viewWriters = Collections.emptyMap(); hasChangeCaptureView = false; + hasMaterializedView = false; } + hasVersionCompleted = version.getStatus().equals(VersionStatus.ONLINE); this.storeDeserializerCache = new AvroStoreDeserializerCache( builder.getSchemaRepo(), getStoreName(), @@ -2389,7 +2402,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState); // UPDATE message is only expected in LEADER which must be produced to kafka. MessageType msgType = MessageType.valueOf(kafkaValue); - if (msgType == MessageType.UPDATE && !produceToLocalKafka) { + if (msgType == UPDATE && !produceToLocalKafka) { throw new VeniceMessageException( ingestionTaskName + " hasProducedToKafka: Received UPDATE message in non-leader for: " + consumerRecord.getTopicPartition() + " Offset " + consumerRecord.getOffset()); @@ -2423,6 +2436,39 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( partitionConsumptionState.getVeniceWriterLazyRef().ifPresent(vw -> vw.flush()); partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime); } + /** + * For materialized view we still need to produce to the view topic when we are consuming batch data from the + * local VT for the very first time for a new version. Once the version has completed any new SIT will have its + * {@link hasVersionCompleted} set to true to prevent duplicate processing of batch records by the view writers. + * Duplicate processing is still possible in the case when leader replica crashes before the version becomes + * ONLINE, but it should be harmless since eventually it will be correct. + * TODO We need to introduce additional checkpointing and rewind for L/F transition during batch processing to + * prevent data loss in the view topic for the following scenario: + * 1. VT contains 100 batch records between SOP and EOP + * 2. Initial leader consumed to 50th record while followers consumed to 80th + * 3. Initial leader crashes and one of the followers becomes the new leader and start processing and writing + * to view topic from 80th record until ingestion is completed and version becomes ONLINE. + * 4. View topic will have a gap for batch data between 50th record and 80th record. + */ + if (hasMaterializedView && shouldViewWritersProcessBatchRecords(partitionConsumptionState) + && msgType == MessageType.PUT) { + WriteComputeResultWrapper writeComputeResultWrapper = + new WriteComputeResultWrapper((Put) kafkaValue.payloadUnion, null, false); + long preprocessingTime = System.currentTimeMillis(); + CompletableFuture currentVersionTopicWrite = new CompletableFuture(); + CompletableFuture[] viewWriterFutures = + processViewWriters(partitionConsumptionState, kafkaKey.getKey(), null, writeComputeResultWrapper); + CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { + hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + if (exception == null) { + currentVersionTopicWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + currentVersionTopicWrite.completeExceptionally(veniceException); + } + }); + } return DelegateConsumerRecordResult.QUEUED_TO_DRAINER; } @@ -3321,9 +3367,60 @@ protected void processMessageAndMaybeProduceToKafka( beforeProcessingRecordTimestampNs, beforeProcessingBatchRecordsTimestampMs).getWriteComputeResultWrapper(); } + if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) { + return; + } + // Write to views + if (viewWriters != null && !viewWriters.isEmpty()) { + long preprocessingTime = System.currentTimeMillis(); + CompletableFuture currentVersionTopicWrite = new CompletableFuture(); + CompletableFuture[] viewWriterFutures = + processViewWriters(partitionConsumptionState, keyBytes, null, writeComputeResultWrapper); + CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { + hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + if (exception == null) { + produceToLocalKafkaHelper( + consumerRecord, + partitionConsumptionState, + writeComputeResultWrapper, + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingRecordTimestampNs); + currentVersionTopicWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + currentVersionTopicWrite.completeExceptionally(veniceException); + } + }); + partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite); + } else { + produceToLocalKafkaHelper( + consumerRecord, + partitionConsumptionState, + writeComputeResultWrapper, + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingRecordTimestampNs); + } + } - Put newPut = writeComputeResultWrapper.getNewPut(); + private void produceToLocalKafkaHelper( + PubSubMessage consumerRecord, + PartitionConsumptionState partitionConsumptionState, + WriteComputeResultWrapper writeComputeResultWrapper, + int partition, + String kafkaUrl, + int kafkaClusterId, + long beforeProcessingRecordTimestampNs) { + KafkaKey kafkaKey = consumerRecord.getKey(); + KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); + byte[] keyBytes = kafkaKey.getKey(); + MessageType msgType = MessageType.valueOf(kafkaValue.messageType); LeaderProducedRecordContext leaderProducedRecordContext; + Put newPut = writeComputeResultWrapper.getNewPut(); switch (msgType) { case PUT: leaderProducedRecordContext = @@ -3377,10 +3474,6 @@ protected void processMessageAndMaybeProduceToKafka( break; case UPDATE: - if (writeComputeResultWrapper.isSkipProduce()) { - return; - } - leaderProducedRecordContext = LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, newPut); BiConsumer produceFunction = @@ -3594,15 +3687,22 @@ protected long measureRTOffsetLagForSingleRegion( } @Override - protected void processVersionSwapMessage( + protected void processControlMessageForViews( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) { // Iterate through list of views for the store and process the control message. for (VeniceViewWriter viewWriter: viewWriters.values()) { - // TODO: at some point, we should do this on more or all control messages potentially as we add more view types - viewWriter.processControlMessage(controlMessage, partition, partitionConsumptionState, this.versionNumber); + viewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + controlMessage, + partition, + partitionConsumptionState, + this.versionNumber); } } @@ -3861,6 +3961,33 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio } } + protected CompletableFuture[] processViewWriters( + PartitionConsumptionState partitionConsumptionState, + byte[] keyBytes, + MergeConflictResultWrapper mergeConflictResultWrapper, + WriteComputeResultWrapper writeComputeResultWrapper) { + CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; + int index = 0; + // The first future is for the previous write to VT + viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); + for (VeniceViewWriter writer: viewWriters.values()) { + viewWriterFutures[index++] = writer.processRecord( + writeComputeResultWrapper.getNewPut().putValue, + keyBytes, + versionNumber, + writeComputeResultWrapper.getNewPut().schemaId); + } + return viewWriterFutures; + } + + private boolean shouldViewWritersProcessBatchRecords(PartitionConsumptionState partitionConsumptionState) { + if (hasVersionCompleted) { + return false; + } + return Objects.equals(partitionConsumptionState.getLeaderFollowerState(), LEADER) + || Objects.equals(partitionConsumptionState.getLeaderFollowerState(), IN_TRANSITION_FROM_STANDBY_TO_LEADER); + } + /** * Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp} * such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 1e544f62f32..6270ee5e3b0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2976,10 +2976,12 @@ protected void processEndOfIncrementalPush( } /** - * This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation in - * {@link ActiveActiveStoreIngestionTask} + * This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation in + * {@link LeaderFollowerStoreIngestionTask} */ - protected void processVersionSwapMessage( + protected void processControlMessageForViews( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) { @@ -3001,6 +3003,7 @@ protected boolean processTopicSwitch( * offset is stale and is not updated until the very end */ private boolean processControlMessage( + KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, @@ -3034,6 +3037,7 @@ private boolean processControlMessage( break; case START_OF_SEGMENT: case END_OF_SEGMENT: + case VERSION_SWAP: /** * Nothing to do here as all the processing is being done in {@link StoreIngestionTask#delegateConsumerRecord(ConsumerRecord, int, String)}. */ @@ -3048,13 +3052,11 @@ private boolean processControlMessage( checkReadyToServeAfterProcess = processTopicSwitch(controlMessage, partition, offset, partitionConsumptionState); break; - case VERSION_SWAP: - processVersionSwapMessage(controlMessage, partition, partitionConsumptionState); - break; default: throw new UnsupportedMessageTypeException( "Unrecognized Control message type " + controlMessage.controlMessageType); } + processControlMessageForViews(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState); return checkReadyToServeAfterProcess; } @@ -3172,6 +3174,7 @@ private int internalProcessConsumerRecord( ? (ControlMessage) kafkaValue.payloadUnion : (ControlMessage) leaderProducedRecordContext.getValueUnion()); checkReadyToServeAfterProcess = processControlMessage( + kafkaKey, kafkaValue, controlMessage, consumerRecord.getTopicPartition().getPartitionNumber(), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index 3bbba39b2cc..8cfe6a211fe 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -8,8 +8,10 @@ import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent; import com.linkedin.venice.client.change.capture.protocol.ValueBytes; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.VersionSwap; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; @@ -81,8 +83,20 @@ public CompletableFuture processRecord( return veniceWriter.put(key, recordChangeEvent, 1); } + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId) { + // No op + return CompletableFuture.completedFuture(null); + } + @Override public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState, @@ -176,7 +190,7 @@ VeniceWriterOptions buildWriterOptions(int version) { } configBuilder.setChunkingEnabled(storeVersionConfig.isChunkingEnabled()); - return configBuilder.build(); + return setProducerOptimizations(configBuilder).build(); } synchronized private void initializeVeniceWriter(int version) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java new file mode 100644 index 00000000000..96e5a7ba249 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -0,0 +1,237 @@ +package com.linkedin.davinci.store.view; + +import com.linkedin.davinci.config.VeniceConfigLoader; +import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.utils.RedundantExceptionFilter; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.writer.LeaderCompleteState; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.nio.ByteBuffer; +import java.time.Clock; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class MaterializedViewWriter extends VeniceViewWriter { + private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory; + private final MaterializedView internalView; + private final ReentrantLock broadcastHBLock = new ReentrantLock(); + private final Map partitionToHeartbeatTimestampMap = new HashMap<>(); + private final Clock clock; + private VeniceWriter veniceWriter; + private String materializedViewTopicName; + private long lastHBBroadcastTimestamp; + + /** + * These configs can be exposed to view parameters if or server configs if needed + */ + private static final long DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); + private static final long DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD = TimeUnit.MINUTES.toMillis(5); + private static final int DEFAULT_PARTITION_TO_ALWAYS_BROADCAST = 0; + private static final Logger LOGGER = LogManager.getLogger(MaterializedViewWriter.class); + private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = + RedundantExceptionFilter.getRedundantExceptionFilter(); + + public MaterializedViewWriter( + VeniceConfigLoader props, + Store store, + Schema keySchema, + Map extraViewParameters, + Clock clock) { + super(props, store, keySchema, extraViewParameters); + pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); + internalView = new MaterializedView(props.getCombinedProperties().toProperties(), store, extraViewParameters); + this.clock = clock; + } + + public MaterializedViewWriter( + VeniceConfigLoader props, + Store store, + Schema keySchema, + Map extraViewParameters) { + this(props, store, keySchema, extraViewParameters, Clock.systemUTC()); + } + + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + ByteBuffer oldValue, + byte[] key, + int version, + int newValueSchemaId, + int oldValueSchemaId, + GenericRecord replicationMetadataRecord) { + return processRecord(newValue, key, version, newValueSchemaId); + } + + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId) { + if (veniceWriter == null) { + initializeVeniceWriter(version); + } + return veniceWriter.put(key, newValue.array(), newValueSchemaId); + } + + @Override + public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, + ControlMessage controlMessage, + int partition, + PartitionConsumptionState partitionConsumptionState, + int version) { + final ControlMessageType type = ControlMessageType.valueOf(controlMessage); + // Ignore other control messages for materialized view. + if (type == ControlMessageType.START_OF_SEGMENT && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) { + maybePropagateHeartbeatLowWatermarkToViewTopic( + partition, + partitionConsumptionState, + kafkaMessageEnvelope.producerMetadata.messageTimestamp, + version); + } + } + + @Override + public String getWriterClassName() { + return internalView.getWriterClassName(); + } + + // package private for testing purposes. + VeniceWriterOptions buildWriterOptions(int version) { + // We need to change this and have a map of writers if one materialized view will have many topics. + materializedViewTopicName = + internalView.getTopicNamesAndConfigsForVersion(version).keySet().stream().findAny().get(); + VeniceWriterOptions.Builder configBuilder = new VeniceWriterOptions.Builder(materializedViewTopicName); + Version storeVersionConfig = store.getVersionOrThrow(version); + configBuilder.setPartitionCount(internalView.getViewPartitionCount()); + configBuilder.setChunkingEnabled(storeVersionConfig.isChunkingEnabled()); + configBuilder.setRmdChunkingEnabled(storeVersionConfig.isRmdChunkingEnabled()); + configBuilder.setPartitioner(internalView.getViewPartitioner()); + return setProducerOptimizations(configBuilder).build(); + } + + synchronized private void initializeVeniceWriter(int version) { + if (veniceWriter == null) { + veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null) + .createVeniceWriter(buildWriterOptions(version)); + } + } + + /** + * View topic's partitioner and partition count could be different from the VT. In order to ensure we are capturing + * all potential lag in the VT ingestion from the view topic, we will broadcast the low watermark observed from every + * VT leader to all partitions of the view topic. To reduce the heartbeat spam we can use a strategy as follows: + * 1. Leader of partition 0 always broadcasts its low watermark timestamp to all view topic partitions. + * 2. Leader of other partitions will only broadcast its heartbeat low watermark timestamp if it's sufficiently + * stale. This is configurable but by default it could be >= 5 minutes. This is because broadcasting redundant + * up-to-date heartbeat in view topic is not meaningful when the main goal here is just to identify if there + * are any lagging partitions or the largest lag amongst all VT partitions. Since lag in any VT partition could + * result in lag in one or more view topic partitions. + * 3. This broadcasting heartbeat mechanism will only provide lag info to view topic consumers if the corresponding + * VT consumption is not stuck. e.g. if one VT partition is stuck we won't be able to detect such issue from the + * view topic heartbeats because VT partitions that are not stuck will be broadcasting heartbeats. Due to this + * reason we can also clear and rebuild the partition to timestamp map to simplify the maintenance logic. + */ + private void maybePropagateHeartbeatLowWatermarkToViewTopic( + int partition, + PartitionConsumptionState partitionConsumptionState, + long heartbeatTimestamp, + int version) { + boolean propagate = false; + long oldestHeartbeatTimestamp; + broadcastHBLock.lock(); + try { + partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp); + long now = clock.millis(); + if (now > lastHBBroadcastTimestamp + DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS + && !partitionToHeartbeatTimestampMap.isEmpty()) { + oldestHeartbeatTimestamp = Collections.min(partitionToHeartbeatTimestampMap.values()); + if (partition == DEFAULT_PARTITION_TO_ALWAYS_BROADCAST + || now - oldestHeartbeatTimestamp > DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD) { + propagate = true; + lastHBBroadcastTimestamp = now; + } + partitionToHeartbeatTimestampMap.clear(); + } + } finally { + broadcastHBLock.unlock(); + } + if (propagate) { + if (veniceWriter == null) { + initializeVeniceWriter(version); + } + LeaderCompleteState leaderCompleteState = + LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported()); + Set failedPartitions = VeniceConcurrentHashMap.newKeySet(); + Set> heartbeatFutures = VeniceConcurrentHashMap.newKeySet(); + AtomicReference completionException = new AtomicReference<>(); + for (int p = 0; p < internalView.getViewPartitionCount(); p++) { + // Due to the intertwined partition mapping, the actual LeaderMetadataWrapper is meaningless for materialized + // view consumers. Similarly, we will propagate the LeaderCompleteState, but it will only guarantee that at + // least + // one partition leader has completed. + final int viewPartitionNumber = p; + CompletableFuture heartBeatFuture = veniceWriter.sendHeartbeat( + materializedViewTopicName, + viewPartitionNumber, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + true, + leaderCompleteState, + heartbeatTimestamp); + heartBeatFuture.whenComplete((ignore, throwable) -> { + if (throwable != null) { + completionException.set(new CompletionException(throwable)); + failedPartitions.add(String.valueOf(viewPartitionNumber)); + } + }); + heartbeatFutures.add(heartBeatFuture); + } + if (!heartbeatFutures.isEmpty()) { + CompletableFuture.allOf(heartbeatFutures.toArray(new CompletableFuture[0])) + .whenCompleteAsync((ignore, throwable) -> { + if (!failedPartitions.isEmpty()) { + int failedCount = failedPartitions.size(); + String logMessage = String.format( + "Broadcast materialized view heartbeat for %d partitions of topic %s: %d succeeded, %d failed for partitions %s", + heartbeatFutures.size(), + materializedViewTopicName, + heartbeatFutures.size() - failedCount, + failedCount, + String.join(",", failedPartitions)); + if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) { + LOGGER.error(logMessage, completionException.get()); + } + } + }); + } + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 64824b19e71..3a02a435a7d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -3,9 +3,12 @@ import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Store; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -45,36 +48,65 @@ public VeniceViewWriter( * @param oldValueSchemaId the schemaId of the old record * @param replicationMetadataRecord the associated RMD for the incoming record. */ - public CompletableFuture processRecord( + public abstract CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, int version, int newValueSchemaId, int oldValueSchemaId, - GenericRecord replicationMetadataRecord) { - return CompletableFuture.completedFuture(null); - } + GenericRecord replicationMetadataRecord); + + /** + * To be called as a given ingestion task consumes each record. This is called prior to writing to a + * VT or to persistent storage. + * + * @param newValue the incoming fully specified value which hasn't yet been committed to Venice + * @param key the key of the record that designates newValue and oldValue + * @param version the version of the store taking this record + * @param newValueSchemaId the schemaId of the incoming record + */ + public abstract CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId); /** * Called when the server encounters a control message. There isn't (today) a strict ordering * on if the rest of the server alters it's state completely or not based on the incoming control message * relative to the given view. * - * TODO: Today this is only invoked for VERSION_SWAP control message, but we - * may in the future call this method for all control messages so that certain - * view types can act accordingly. + * Different view types may be interested in different control messages and act differently. The corresponding + * view writer should implement this method accordingly. * + * @param kafkaKey the corresponding kafka key of this control message + * @param kafkaMessageEnvelope the corresponding kafka message envelope of this control message * @param controlMessage the control message we're processing * @param partition the partition this control message was delivered to * @param partitionConsumptionState the pcs of the consuming node * @param version the store version that received this message */ public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState, int version) { // Optionally act on Control Message } + + /** + * A store could have many views and to reduce the impact to write throughput we want to check and enable producer + * optimizations that can be configured at the store level. To change the producer optimization configs the ingestion + * task needs to be re-initialized. Meaning either a new version push or server restart after the store level config + * change and this is by design. + * @param configBuilder to be configured with the producer optimizations + * @return + */ + protected VeniceWriterOptions.Builder setProducerOptimizations(VeniceWriterOptions.Builder configBuilder) { + return configBuilder.setProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled()) + .setProducerCount(store.getNearlineProducerCountPerWriter()); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java index f65552b5267..644159a7c61 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME; import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_NAME; import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; +import static org.mockito.Mockito.mock; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.davinci.config.VeniceConfigLoader; @@ -12,7 +13,9 @@ import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.EndOfIncrementalPush; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; @@ -60,14 +63,14 @@ public void testConstructVersionSwapMessage() { highWaterMarks.put(LOR_1, 111L); highWaterMarks.put(LTX_1, 99L); highWaterMarks.put(LVA_1, 22222L); - PartitionConsumptionState mockLeaderPartitionConsumptionState = Mockito.mock(PartitionConsumptionState.class); + PartitionConsumptionState mockLeaderPartitionConsumptionState = mock(PartitionConsumptionState.class); Mockito.when(mockLeaderPartitionConsumptionState.getLeaderFollowerState()) .thenReturn(LeaderFollowerStateType.LEADER); Mockito.when(mockLeaderPartitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap()) .thenReturn(highWaterMarks); Mockito.when(mockLeaderPartitionConsumptionState.getPartition()).thenReturn(1); - PartitionConsumptionState mockFollowerPartitionConsumptionState = Mockito.mock(PartitionConsumptionState.class); + PartitionConsumptionState mockFollowerPartitionConsumptionState = mock(PartitionConsumptionState.class); Mockito.when(mockFollowerPartitionConsumptionState.getLeaderFollowerState()) .thenReturn(LeaderFollowerStateType.STANDBY); Mockito.when(mockFollowerPartitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap()) @@ -78,29 +81,31 @@ public void testConstructVersionSwapMessage() { versionSwapMessage.oldServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 1); versionSwapMessage.newServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 2); + KafkaKey kafkaKey = mock(KafkaKey.class); + KafkaMessageEnvelope kafkaMessageEnvelope = mock(KafkaMessageEnvelope.class); ControlMessage controlMessage = new ControlMessage(); controlMessage.controlMessageUnion = versionSwapMessage; - Store mockStore = Mockito.mock(Store.class); + Store mockStore = mock(Store.class); VeniceProperties props = VeniceProperties.empty(); Object2IntMap urlMappingMap = new Object2IntOpenHashMap<>(); // Add ID's to the region's to name the sort order of the RMD urlMappingMap.put(LTX_1, 0); urlMappingMap.put(LVA_1, 1); urlMappingMap.put(LOR_1, 2); - CompletableFuture mockFuture = Mockito.mock(CompletableFuture.class); + CompletableFuture mockFuture = mock(CompletableFuture.class); - VeniceWriter mockVeniceWriter = Mockito.mock(VeniceWriter.class); + VeniceWriter mockVeniceWriter = mock(VeniceWriter.class); Mockito.when(mockVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(mockFuture); - VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); Mockito.when(mockVeniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(urlMappingMap); - PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = Mockito.mock(PubSubProducerAdapterFactory.class); - PubSubClientsFactory mockPubSubClientsFactory = Mockito.mock(PubSubClientsFactory.class); + PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + PubSubClientsFactory mockPubSubClientsFactory = mock(PubSubClientsFactory.class); Mockito.when(mockPubSubClientsFactory.getProducerAdapterFactory()).thenReturn(mockPubSubProducerAdapterFactory); Mockito.when(mockVeniceServerConfig.getPubSubClientsFactory()).thenReturn(mockPubSubClientsFactory); - VeniceConfigLoader mockVeniceConfigLoader = Mockito.mock(VeniceConfigLoader.class); + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); Mockito.when(mockVeniceConfigLoader.getCombinedProperties()).thenReturn(props); Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); @@ -110,14 +115,26 @@ public void testConstructVersionSwapMessage() { changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter); // Verify that we never produce the version swap from a follower replica - changeCaptureViewWriter.processControlMessage(controlMessage, 1, mockFollowerPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + controlMessage, + 1, + mockFollowerPartitionConsumptionState, + 1); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); // Verify that we never produce anything if it's not a VersionSwap Message ControlMessage ignoredControlMessage = new ControlMessage(); ignoredControlMessage.controlMessageUnion = new EndOfIncrementalPush(); - changeCaptureViewWriter.processControlMessage(ignoredControlMessage, 1, mockLeaderPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + ignoredControlMessage, + 1, + mockLeaderPartitionConsumptionState, + 1); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); @@ -126,11 +143,23 @@ public void testConstructVersionSwapMessage() { ignoredVersionSwapMessage.oldServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 2); ignoredVersionSwapMessage.newServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 3); ignoredControlMessage.controlMessageUnion = ignoredVersionSwapMessage; - changeCaptureViewWriter.processControlMessage(ignoredControlMessage, 1, mockLeaderPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + ignoredControlMessage, + 1, + mockLeaderPartitionConsumptionState, + 1); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); - changeCaptureViewWriter.processControlMessage(controlMessage, 1, mockLeaderPartitionConsumptionState, 1); + changeCaptureViewWriter.processControlMessage( + kafkaKey, + kafkaMessageEnvelope, + controlMessage, + 1, + mockLeaderPartitionConsumptionState, + 1); ArgumentCaptor messageArgumentCaptor = ArgumentCaptor.forClass(ControlMessage.class); // Verify and capture input @@ -156,7 +185,7 @@ public void testConstructVersionSwapMessage() { @Test public void testBuildWriterOptions() { // Set up mocks - Store mockStore = Mockito.mock(Store.class); + Store mockStore = mock(Store.class); Version version = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID); Mockito.when(mockStore.getVersionOrThrow(1)).thenReturn(version); @@ -164,18 +193,18 @@ public void testBuildWriterOptions() { VeniceProperties props = VeniceProperties.empty(); Object2IntMap urlMappingMap = new Object2IntOpenHashMap<>(); - CompletableFuture mockFuture = Mockito.mock(CompletableFuture.class); + CompletableFuture mockFuture = mock(CompletableFuture.class); - VeniceWriter mockVeniceWriter = Mockito.mock(VeniceWriter.class); + VeniceWriter mockVeniceWriter = mock(VeniceWriter.class); Mockito.when(mockVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(mockFuture); - VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); Mockito.when(mockVeniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(urlMappingMap); - VeniceConfigLoader mockVeniceConfigLoader = Mockito.mock(VeniceConfigLoader.class); + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); Mockito.when(mockVeniceConfigLoader.getCombinedProperties()).thenReturn(props); - PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = Mockito.mock(PubSubProducerAdapterFactory.class); - PubSubClientsFactory mockPubSubClientsFactory = Mockito.mock(PubSubClientsFactory.class); + PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + PubSubClientsFactory mockPubSubClientsFactory = mock(PubSubClientsFactory.class); Mockito.when(mockPubSubClientsFactory.getProducerAdapterFactory()).thenReturn(mockPubSubProducerAdapterFactory); Mockito.when(mockVeniceServerConfig.getPubSubClientsFactory()).thenReturn(mockPubSubClientsFactory); Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); @@ -196,20 +225,20 @@ public void testBuildWriterOptions() { @Test public void testProcessRecord() throws ExecutionException, InterruptedException { // Set up mocks - Store mockStore = Mockito.mock(Store.class); + Store mockStore = mock(Store.class); VeniceProperties props = VeniceProperties.empty(); Object2IntMap urlMappingMap = new Object2IntOpenHashMap<>(); - CompletableFuture mockFuture = Mockito.mock(CompletableFuture.class); + CompletableFuture mockFuture = mock(CompletableFuture.class); - VeniceWriter mockVeniceWriter = Mockito.mock(VeniceWriter.class); + VeniceWriter mockVeniceWriter = mock(VeniceWriter.class); Mockito.when(mockVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(mockFuture); - VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); Mockito.when(mockVeniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(urlMappingMap); - VeniceConfigLoader mockVeniceConfigLoader = Mockito.mock(VeniceConfigLoader.class); - PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = Mockito.mock(PubSubProducerAdapterFactory.class); - PubSubClientsFactory mockPubSubClientsFactory = Mockito.mock(PubSubClientsFactory.class); + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); + PubSubProducerAdapterFactory mockPubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + PubSubClientsFactory mockPubSubClientsFactory = mock(PubSubClientsFactory.class); Mockito.when(mockPubSubClientsFactory.getProducerAdapterFactory()).thenReturn(mockPubSubProducerAdapterFactory); Mockito.when(mockVeniceServerConfig.getPubSubClientsFactory()).thenReturn(mockPubSubClientsFactory); Mockito.when(mockVeniceConfigLoader.getCombinedProperties()).thenReturn(props); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameterKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameterKeys.java deleted file mode 100644 index 1944ca1d508..00000000000 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameterKeys.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.linkedin.venice.meta; - -public enum ViewParameterKeys { - /** - * Parameter key used to specify the re-partition view name. - */ - MATERIALIZED_VIEW_NAME, - /** - * Parameter key used to specify the partitioner for the re-partition view. - */ - MATERIALIZED_VIEW_PARTITIONER, - /** - * Parameter key used to specify the partitioner parameters for the partitioner associated with the re-partition view. - */ - MATERIALIZED_VIEW_PARTITIONER_PARAMS, - /** - * Parameter key used to specify the partition count for the re-partition view. - */ - MATERIALIZED_VIEW_PARTITION_COUNT; -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java new file mode 100644 index 00000000000..a8001c21c78 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ViewParameters.java @@ -0,0 +1,80 @@ +package com.linkedin.venice.meta; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.linkedin.venice.utils.ObjectMapperFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + + +public enum ViewParameters { + /** + * Parameter key used to specify the re-partition view name. + */ + MATERIALIZED_VIEW_NAME, + /** + * Parameter key used to specify the partitioner for the re-partition view. + */ + MATERIALIZED_VIEW_PARTITIONER, + /** + * Parameter key used to specify the partitioner parameters for the partitioner associated with the re-partition view. + */ + MATERIALIZED_VIEW_PARTITIONER_PARAMS, + /** + * Parameter key used to specify the partition count for the re-partition view. + */ + MATERIALIZED_VIEW_PARTITION_COUNT; + + public static class Builder { + private String viewName; + private String partitioner; + private String partitionerParams; + private String partitionCount; + + public Builder(String viewName) { + this.viewName = Objects.requireNonNull(viewName, "View name cannot be null for ViewParameters"); + } + + public Builder(String viewName, Map viewParams) { + this.viewName = viewName; + this.partitioner = viewParams.get(MATERIALIZED_VIEW_PARTITIONER.name()); + this.partitionerParams = viewParams.get(MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); + this.partitionCount = viewParams.get(MATERIALIZED_VIEW_PARTITION_COUNT.name()); + } + + public Builder setPartitioner(String partitioner) { + this.partitioner = partitioner; + return this; + } + + public Builder setPartitionerParams(String partitionerParams) { + this.partitionerParams = partitionerParams; + return this; + } + + public Builder setPartitionerParams(Map partitionerParams) throws JsonProcessingException { + this.partitionerParams = ObjectMapperFactory.getInstance().writeValueAsString(partitionerParams); + return this; + } + + public Builder setPartitionCount(int partitionCount) { + this.partitionCount = Integer.toString(partitionCount); + return this; + } + + public Map build() { + Map viewParams = new HashMap<>(); + viewParams.put(MATERIALIZED_VIEW_NAME.name(), viewName); + if (partitioner != null) { + viewParams.put(MATERIALIZED_VIEW_PARTITIONER.name(), partitioner); + } + if (partitionerParams != null) { + viewParams.put(MATERIALIZED_VIEW_PARTITIONER_PARAMS.name(), partitionerParams); + } + if (partitionCount != null) { + viewParams.put(MATERIALIZED_VIEW_PARTITION_COUNT.name(), partitionCount); + } + return viewParams; + } + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java index 70492cdfea2..3cbfdf5726b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java @@ -430,14 +430,13 @@ private void addHistoricStatus(ExecutionStatus status, String incrementalPushVer * Checks whether at least one replica of each partition has returned {@link ExecutionStatus#END_OF_PUSH_RECEIVED} * * This is intended for {@link OfflinePushStatus} instances which belong to Hybrid Stores, though there - * should be no negative side-effects if called on an instance tied to a non-hybrid store, as the logic - * should consistently return false in that case. + * should be no negative side-effects if called on an instance tied to a non-hybrid store. * * @return true if at least one replica of each partition has consumed an EOP control message, false otherwise */ - public boolean isReadyToStartBufferReplay(boolean isDataRecovery) { + public boolean isEOPReceivedInEveryPartition(boolean isDataRecovery) { // Only allow the push in STARTED status to start buffer replay. It could avoid: - // 1. Send duplicated start buffer replay message. + // 1. Send duplicated CM such as the start buffer replay message. // 2. Send start buffer replay message when a push had already been terminated. if (!getCurrentStatus().equals(STARTED)) { return false; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java index 4ecf0f4456c..68b14e69583 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java @@ -1,5 +1,6 @@ package com.linkedin.venice.utils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.PartitionerConfig; @@ -7,6 +8,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; +import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; @@ -95,6 +97,20 @@ public static VenicePartitioner getUserPartitionLevelVenicePartitioner(Partition return getVenicePartitioner(config.getPartitionerClass(), new VeniceProperties(params)); } + public static VenicePartitioner getVenicePartitioner(String partitionerClass, String partitionerParamsString) { + Properties params = new Properties(); + if (partitionerParamsString != null) { + Map partitionerParamsMap = null; + try { + partitionerParamsMap = ObjectMapperFactory.getInstance().readValue(partitionerParamsString, Map.class); + } catch (JsonProcessingException e) { + throw new VeniceException("Invalid partitioner params string: " + partitionerParamsString, e); + } + params.putAll(partitionerParamsMap); + } + return getVenicePartitioner(partitionerClass, new VeniceProperties(params), null); + } + public static VenicePartitioner getVenicePartitioner(String partitionerClass, VeniceProperties params) { return getVenicePartitioner(partitionerClass, params, null); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java index 5bc07fb71c3..05f743b9bbb 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java @@ -33,10 +33,12 @@ public String getWriterClassName() { @Override public void validateConfigs() { - super.validateConfigs(); - // change capture requires chunking to be enabled. This is because it's logistically difficult to insist + // change capture requires A/A and chunking to be enabled. This is because it's logistically difficult to insist // that all rows be under 50% the chunking threshhold (since we have to publish the before and after image of the // record to the change capture topic). So we make a blanket assertion. + if (!store.isActiveActiveReplicationEnabled()) { + throw new VeniceException("Views are not supported with non Active/Active stores!"); + } if (!store.isChunkingEnabled()) { throw new VeniceException("Change capture view are not supported with stores that don't have chunking enabled!"); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java index 87bf644c384..79db7698a55 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java @@ -1,11 +1,16 @@ package com.linkedin.venice.views; +import static com.linkedin.venice.views.ViewUtils.PARTITION_COUNT; + import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.partitioner.VenicePartitioner; +import com.linkedin.venice.utils.PartitionUtils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; import java.util.Collections; import java.util.Map; import java.util.Properties; @@ -13,16 +18,30 @@ public class MaterializedView extends VeniceView { public static final String MATERIALIZED_VIEW_TOPIC_SUFFIX = "_mv"; + public static final String MATERIALIZED_VIEW_WRITER_CLASS_NAME = + "com.linkedin.davinci.store.view.MaterializedViewWriter"; private static final String MISSING_PARAMETER_MESSAGE = "%s is required for materialized view!"; + private final int viewPartitionCount; + + private Lazy viewPartitioner; public MaterializedView(Properties props, Store store, Map viewParameters) { super(props, store, viewParameters); + // Override topic partition count config + viewPartitionCount = Integer.parseInt(viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + this.props.put(PARTITION_COUNT, viewPartitionCount); + viewPartitioner = Lazy.of(() -> { + String viewPartitionerClass = this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); + String viewPartitionerParamsString = + this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); + return PartitionUtils.getVenicePartitioner(viewPartitionerClass, viewPartitionerParamsString); + }); } @Override public Map getTopicNamesAndConfigsForVersion(int version) { VeniceProperties properties = new VeniceProperties(props); - String viewName = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()); + String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()); return Collections.singletonMap( Version.composeKafkaTopic(store.getName(), version) + VIEW_TOPIC_SEPARATOR + viewName + MATERIALIZED_VIEW_TOPIC_SUFFIX, @@ -30,36 +49,34 @@ public Map getTopicNamesAndConfigsForVersion(int versi } /** - * {@link ViewParameterKeys#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view. - * {@link ViewParameterKeys#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level + * {@link ViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view. + * {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level * partitioner config if it's not specified in the view parameters. - * {@link ViewParameterKeys#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional. + * {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional. */ @Override public void validateConfigs() { - super.validateConfigs(); - String viewName = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()); + String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()); if (viewName == null) { - throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameterKeys.MATERIALIZED_VIEW_NAME.name())); + throw new VeniceException(String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_NAME.name())); } if (store.getViewConfigs().containsKey(viewName)) { throw new VeniceException("A view config with the same view name already exist, view name: " + viewName); } - String viewPartitioner = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name()); + String viewPartitioner = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); if (viewPartitioner == null) { throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name())); + String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())); } try { Class.forName(viewPartitioner); } catch (ClassNotFoundException e) { throw new VeniceException("Cannot find partitioner class: " + viewPartitioner); } - String partitionCountString = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name()); + String partitionCountString = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()); if (partitionCountString == null) { throw new VeniceException( - String.format(MISSING_PARAMETER_MESSAGE, ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); } int viewPartitionCount = Integer.parseInt(partitionCountString); // A materialized view with the exact same partitioner and partition count as the store is not allwoed @@ -73,9 +90,9 @@ public void validateConfigs() { ViewConfig viewConfig = viewConfigEntries.getValue(); if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) { String configPartitioner = - viewConfig.getViewParameters().get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name()); + viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); int configPartitionCount = Integer - .parseInt(viewConfig.getViewParameters().get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name())); + .parseInt(viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); if (configPartitionCount == viewPartitionCount && configPartitioner.equals(viewPartitioner)) { throw new VeniceException( "A view with identical view configs already exist, view name: " + viewConfigEntries.getKey()); @@ -83,4 +100,17 @@ public void validateConfigs() { } } } + + @Override + public String getWriterClassName() { + return MATERIALIZED_VIEW_WRITER_CLASS_NAME; + } + + public int getViewPartitionCount() { + return viewPartitionCount; + } + + public VenicePartitioner getViewPartitioner() { + return viewPartitioner.get(); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java index 381db477fa8..7b5d2f5fe8d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java @@ -65,13 +65,10 @@ public String getWriterClassName() { /** * Validate that the configs set up for this view for this store are valid. If not, throw an exception. Implementors - * should override this function to add their own validation logic, and need only call this base implementation optionally. + * should override this function to add their own validation logic. */ public void validateConfigs() { - // All views which publish data only work with A/A. Views which don't publish data should override this validation - if (!store.isActiveActiveReplicationEnabled()) { - throw new VeniceException("Views are not supported with non Active/Active stores!"); - } + // validation based on view implementation } public void close() { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 5d8cd79cb28..6c938d828f2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -1906,6 +1906,25 @@ public CompletableFuture sendHeartbeat( callback); } + public CompletableFuture sendHeartbeat( + String topicName, + int partitionNumber, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + boolean addLeaderCompleteState, + LeaderCompleteState leaderCompleteState, + long originTimeStampMs) { + KafkaMessageEnvelope kafkaMessageEnvelope = + getHeartbeatKME(originTimeStampMs, leaderMetadataWrapper, heartBeatMessage, writerId); + return producerAdapter.sendMessage( + topicName, + partitionNumber, + KafkaKey.HEART_BEAT, + kafkaMessageEnvelope, + getHeaders(kafkaMessageEnvelope.getProducerMetadata(), addLeaderCompleteState, leaderCompleteState), + callback); + } + /** * The Key part of the {@link KafkaKey} needs to be unique in order to avoid clobbering each other during * Kafka's Log Compaction. Since there is no key per se associated with control messages, we generate one diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java index 4e05a0453fb..159ec39c0e7 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java @@ -109,7 +109,7 @@ public void testIsReadyToStartBufferReplay() { } offlinePushStatus.setPartitionStatuses(Collections.singletonList(partitionStatus)); Assert.assertTrue( - offlinePushStatus.isReadyToStartBufferReplay(false), + offlinePushStatus.isEOPReceivedInEveryPartition(false), "Buffer replay should be allowed to start since END_OF_PUSH_RECEIVED was already received"); } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/views/MaterializedViewTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java similarity index 77% rename from internal/venice-common/src/test/java/com/linkedin/venice/views/MaterializedViewTest.java rename to internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java index 394216ef9eb..44566c64f98 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/views/MaterializedViewTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java @@ -9,7 +9,7 @@ import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.ViewConfig; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.partitioner.ConstantVenicePartitioner; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.utils.VeniceProperties; @@ -20,7 +20,7 @@ import org.testng.annotations.Test; -public class MaterializedViewTest { +public class TestMaterializedView { @Test public void testValidateConfigs() { Properties properties = new Properties(); @@ -28,46 +28,43 @@ public void testValidateConfigs() { Store testStore = getMockStore("test-store", 12); // Fail due to missing view name assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name(), "test-view"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), "test-view"); // Fail due to missing partition count assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); // Fail due to same partitioner and partition count assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - ConstantVenicePartitioner.class.getCanonicalName()); + viewParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); // Pass, same partition count but different partitioner new MaterializedView(properties, testStore, viewParams).validateConfigs(); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); // Pass, same partitioner but different partition count new MaterializedView(properties, testStore, viewParams).validateConfigs(); viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), + ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName() + "DNE"); // Fail due to invalid partitioner class assertThrows(() -> new MaterializedView(properties, testStore, viewParams).validateConfigs()); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - ConstantVenicePartitioner.class.getCanonicalName()); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "12"); + viewParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); Store storeWithExistingViews = getMockStore("test-store-existing-config", 12); ViewConfig viewConfig = mock(ViewConfig.class); doReturn(Collections.singletonMap("test-view", viewConfig)).when(storeWithExistingViews).getViewConfigs(); // Fail due to same view name assertThrows(() -> new MaterializedView(properties, storeWithExistingViews, viewParams).validateConfigs()); Map existingViewConfigParams = new HashMap<>(); - existingViewConfigParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - ConstantVenicePartitioner.class.getCanonicalName()); - existingViewConfigParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(12)); + existingViewConfigParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), ConstantVenicePartitioner.class.getCanonicalName()); + existingViewConfigParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(12)); doReturn(existingViewConfigParams).when(viewConfig).getViewParameters(); doReturn(MaterializedView.class.getCanonicalName()).when(viewConfig).getViewClassName(); doReturn(Collections.singletonMap("old-view", viewConfig)).when(storeWithExistingViews).getViewConfigs(); // Fail due to existing identical view config assertThrows(() -> new MaterializedView(properties, storeWithExistingViews, viewParams).validateConfigs()); - existingViewConfigParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(36)); + existingViewConfigParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(36)); // Pass, same partitioner but different partition count new MaterializedView(properties, testStore, viewParams).validateConfigs(); } @@ -79,8 +76,8 @@ public void testRePartitionViewTopicProcessing() { int version = 8; Store testStore = getMockStore(storeName, 6); String rePartitionViewName = "test-view"; - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name(), rePartitionViewName); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), rePartitionViewName); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); MaterializedView materializedView = new MaterializedView(new Properties(), testStore, viewParams); Map rePartitionViewTopicMap = materializedView.getTopicNamesAndConfigsForVersion(version); assertEquals(rePartitionViewTopicMap.size(), 1); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java new file mode 100644 index 00000000000..7f172acab1f --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java @@ -0,0 +1,151 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; +import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; +import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; +import static com.linkedin.venice.views.MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX; +import static com.linkedin.venice.views.VeniceView.VIEW_TOPIC_SEPARATOR; +import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP; +import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP; + +import com.linkedin.venice.ConfigKeys; +import com.linkedin.venice.controller.VeniceHelixAdmin; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.views.MaterializedView; +import it.unimi.dsi.fastutil.ints.Int2LongMap; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.apache.avro.Schema; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class MaterializedViewTest { + private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE; + private static final String[] CLUSTER_NAMES = + IntStream.range(0, 1).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); + + private List childDatacenters; + private List parentControllers; + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + private String clusterName; + + @BeforeClass(alwaysRun = true) + public void setUp() { + Properties serverProperties = new Properties(); + serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); + serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); + serverProperties.put( + CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + DEFAULT_PARENT_DATA_CENTER_REGION_NAME, + "localhost:" + TestUtils.getFreePort()); + multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + 1, + 1, + 1, + 1, + 1, + 1, + 1, + Optional.empty(), + Optional.empty(), + Optional.of(serverProperties), + false); + + childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); + parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); + clusterName = CLUSTER_NAMES[0]; + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + multiRegionMultiClusterWrapper.close(); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testLFIngestionWithMaterializedView() throws IOException { + // Create a non-A/A store with materialized view and run batch push job with 100 records + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("store"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setHybridRewindSeconds(500) + .setHybridOffsetLagThreshold(8) + .setChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3); + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + String testViewName = "MaterializedViewTest"; + ViewParameters.Builder viewParamBuilder = new ViewParameters.Builder(testViewName).setPartitionCount(6); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + Assert.assertEquals(viewConfigMap.get(testViewName).getViewParameters().size(), 3); + }); + + TestWriteUtils.runPushJob("Run push job", props); + // TODO we will verify the actual content once the DVC consumption part of the view topic is completed. + // For now just check for topic existence and that they contain some records. + String viewTopicName = Version.composeKafkaTopic(storeName, 1) + VIEW_TOPIC_SEPARATOR + testViewName + + MATERIALIZED_VIEW_TOPIC_SUFFIX; + String versionTopicName = Version.composeKafkaTopic(storeName, 1); + for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); + PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); + PubSubTopic versionPubSubTopic = admin.getPubSubTopicRepository().getTopic(versionTopicName); + Assert.assertTrue(admin.getTopicManager().containsTopic(viewPubSubTopic)); + long records = 0; + long versionTopicRecords = 0; + Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); + Int2LongMap versionTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(versionPubSubTopic); + Assert.assertEquals(versionTopicOffsetMap.keySet().size(), 3, "Unexpected version partition count"); + Assert.assertEquals(viewTopicOffsetMap.keySet().size(), 6, "Unexpected view partition count"); + for (long endOffset: viewTopicOffsetMap.values()) { + records += endOffset; + } + for (long endOffset: versionTopicOffsetMap.values()) { + versionTopicRecords += endOffset; + } + Assert.assertTrue(versionTopicRecords > 100, "Version topic records size: " + versionTopicRecords); + Assert.assertTrue(records > 100, "View topic records size: " + records); + } + } + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index f8f67d2a9e1..f429fe9e59e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -5,7 +5,9 @@ import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.api.PubSubProduceResult; @@ -43,8 +45,20 @@ public CompletableFuture processRecord( } + @Override + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int version, + int newValueSchemaId) { + internalView.incrementRecordCount(store.getName()); + return CompletableFuture.completedFuture(null); + } + @Override public void processControlMessage( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java index 0a1182c9be7..429e2c6fe9e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java @@ -165,7 +165,8 @@ public HelixVeniceClusterResources( helixAdminClient, config, admin.getPushStatusStoreReader(), - admin.getDisabledPartitionStats(clusterName)); + admin.getDisabledPartitionStats(clusterName), + admin.getVeniceWriterFactory()); this.leakedPushStatusCleanUpService = new LeakedPushStatusCleanUpService( clusterName, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 9c65c13eded..e5f381ec118 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -216,6 +216,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.utils.locks.AutoCloseableLock; +import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.VeniceWriter; @@ -2409,25 +2410,25 @@ public void createHelixResourceAndStartMonitoring(String clusterName, String sto * @param store the store to create these resources for * @param version the store version to create these resources for */ - private void constructViewResources(Properties params, Store store, int version) { - Map viewConfigs = store.getViewConfigs(); + private void constructViewResources(Properties params, Store store, Version version, String compressionDictionary) { + Map viewConfigs = version.getViewConfigs(); if (viewConfigs == null || viewConfigs.isEmpty()) { return; } // Construct Kafka topics // TODO: Today we only have support for creating Kafka topics as a resource for a given view, but later we would - // like - // to add support for potentially other resource types (maybe helix RG's as an example?) + // like to add support for potentially other resource types (maybe helix RG's as an example?) Map topicNamesAndConfigs = new HashMap<>(); for (ViewConfig rawView: viewConfigs.values()) { VeniceView adminView = ViewUtils.getVeniceView(rawView.getViewClassName(), params, store, rawView.getViewParameters()); - topicNamesAndConfigs.putAll(adminView.getTopicNamesAndConfigsForVersion(version)); + topicNamesAndConfigs.putAll(adminView.getTopicNamesAndConfigsForVersion(version.getNumber())); } TopicManager topicManager = getTopicManager(); for (Map.Entry topicNameAndConfigs: topicNamesAndConfigs.entrySet()) { - PubSubTopic kafkaTopic = pubSubTopicRepository.getTopic(topicNameAndConfigs.getKey()); + String materializedViewTopicName = topicNameAndConfigs.getKey(); + PubSubTopic kafkaTopic = pubSubTopicRepository.getTopic(materializedViewTopicName); VeniceProperties kafkaTopicConfigs = topicNameAndConfigs.getValue(); topicManager.createTopic( kafkaTopic, @@ -2437,6 +2438,26 @@ private void constructViewResources(Properties params, Store store, int version) kafkaTopicConfigs.getBoolean(LOG_COMPACTION_ENABLED), kafkaTopicConfigs.getOptionalInt(KAFKA_MIN_IN_SYNC_REPLICAS), kafkaTopicConfigs.getBoolean(USE_FAST_KAFKA_OPERATION_TIMEOUT)); + if (topicNameAndConfigs.getKey().endsWith(MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX)) { + // Send SOP CM to materialized view topic with sorted flag equal to false due to reshuffle + VeniceWriterOptions.Builder vwOptionsBuilder = + new VeniceWriterOptions.Builder(materializedViewTopicName).setUseKafkaKeySerializer(true) + .setPartitionCount(kafkaTopicConfigs.getInt(PARTITION_COUNT)); + ByteBuffer compressionDictBuffer = null; + if (compressionDictionary != null) { + compressionDictBuffer = ByteBuffer.wrap(EncodingUtils.base64DecodeFromString(compressionDictionary)); + } else if (version.getCompressionStrategy().equals(CompressionStrategy.ZSTD_WITH_DICT)) { + compressionDictBuffer = emptyPushZSTDDictionary.get(); + } + try (VeniceWriter veniceWriter = getVeniceWriterFactory().createVeniceWriter(vwOptionsBuilder.build())) { + veniceWriter.broadcastStartOfPush( + false, + version.isChunkingEnabled(), + version.getCompressionStrategy(), + Optional.ofNullable(compressionDictBuffer), + Collections.emptyMap()); + } + } } } @@ -2803,16 +2824,6 @@ private Pair addVersion( version.setRepushSourceVersion(repushSourceVersion); } - Properties veniceViewProperties = new Properties(); - veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions); - veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout); - veniceViewProperties.putAll(clusterConfig.getProps().toProperties()); - veniceViewProperties.put(LOG_COMPACTION_ENABLED, false); - veniceViewProperties.put(KAFKA_REPLICATION_FACTOR, clusterConfig.getKafkaReplicationFactor()); - veniceViewProperties.put(ETERNAL_TOPIC_RETENTION_ENABLED, true); - - constructViewResources(veniceViewProperties, store, version.getNumber()); - repository.updateStore(store); LOGGER.info("Add version: {} for store: {}", version.getNumber(), storeName); @@ -2838,12 +2849,25 @@ private Pair addVersion( useFastKafkaOperationTimeout); } + // We shouldn't need to create view resources in parent fabric + if (!multiClusterConfigs.isParent()) { + Properties veniceViewProperties = new Properties(); + veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions); + veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout); + veniceViewProperties.putAll(clusterConfig.getProps().toProperties()); + veniceViewProperties.put(LOG_COMPACTION_ENABLED, false); + veniceViewProperties.put(KAFKA_REPLICATION_FACTOR, clusterConfig.getKafkaReplicationFactor()); + veniceViewProperties.put(ETERNAL_TOPIC_RETENTION_ENABLED, true); + + constructViewResources(veniceViewProperties, store, version, compressionDictionary); + } + if (sendStartOfPush) { ByteBuffer compressionDictionaryBuffer = null; if (compressionDictionary != null) { compressionDictionaryBuffer = ByteBuffer.wrap(EncodingUtils.base64DecodeFromString(compressionDictionary)); - } else if (store.getCompressionStrategy().equals(CompressionStrategy.ZSTD_WITH_DICT)) { + } else if (version.getCompressionStrategy().equals(CompressionStrategy.ZSTD_WITH_DICT)) { // This compression strategy needs a dictionary even if there is no input data, // so we generate a dictionary based on synthetic data. This is done in vpj driver // as well, but this code will be triggered in cases like Samza batch push job @@ -2851,28 +2875,27 @@ private Pair addVersion( compressionDictionaryBuffer = emptyPushZSTDDictionary.get(); } - final Version finalVersion = version; VeniceWriter veniceWriter = null; try { VeniceWriterOptions.Builder vwOptionsBuilder = - new VeniceWriterOptions.Builder(finalVersion.kafkaTopicName()).setUseKafkaKeySerializer(true) + new VeniceWriterOptions.Builder(version.kafkaTopicName()).setUseKafkaKeySerializer(true) .setPartitionCount(numberOfPartitions); - if (multiClusterConfigs.isParent() && finalVersion.isNativeReplicationEnabled()) { + if (multiClusterConfigs.isParent() && version.isNativeReplicationEnabled()) { // Produce directly into one of the child fabric - vwOptionsBuilder.setBrokerAddress(finalVersion.getPushStreamSourceAddress()); + vwOptionsBuilder.setBrokerAddress(version.getPushStreamSourceAddress()); } veniceWriter = getVeniceWriterFactory().createVeniceWriter(vwOptionsBuilder.build()); veniceWriter.broadcastStartOfPush( sorted, - finalVersion.isChunkingEnabled(), - finalVersion.getCompressionStrategy(), + version.isChunkingEnabled(), + version.getCompressionStrategy(), Optional.ofNullable(compressionDictionaryBuffer), Collections.emptyMap()); if (pushType.isStreamReprocessing()) { // Send TS message to version topic to inform leader to switch to the stream reprocessing topic veniceWriter.broadcastTopicSwitch( Collections.singletonList(getKafkaBootstrapServers(isSslToKafka())), - Version.composeStreamReprocessingTopic(finalVersion.getStoreName(), finalVersion.getNumber()), + Version.composeStreamReprocessingTopic(version.getStoreName(), version.getNumber()), -1L, // -1 indicates rewinding from the beginning of the source topic new HashMap<>()); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 5020f60f6a8..4c0aa6b9993 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -193,7 +193,7 @@ import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.persona.StoragePersona; import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -2815,28 +2815,21 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi String.format("Materialized View name cannot contain version separator: %s", VERSION_SEPARATOR)); } Map viewParams = viewConfig.getViewParameters(); - viewParams.put(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name(), viewName); - if (!viewParams.containsKey(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name())) { - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name(), - store.getPartitionerConfig().getPartitionerClass()); + ViewParameters.Builder decoratedViewParamBuilder = new ViewParameters.Builder(viewName, viewParams); + if (!viewParams.containsKey(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name())) { + decoratedViewParamBuilder.setPartitioner(store.getPartitionerConfig().getPartitionerClass()); if (!store.getPartitionerConfig().getPartitionerParams().isEmpty()) { try { - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name(), - ObjectMapperFactory.getInstance() - .writeValueAsString(store.getPartitionerConfig().getPartitionerParams())); + decoratedViewParamBuilder.setPartitionerParams(store.getPartitionerConfig().getPartitionerParams()); } catch (JsonProcessingException e) { throw new VeniceException("Failed to convert store partitioner params to string", e); } } } - if (!viewParams.containsKey(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { - viewParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), - Integer.toString(store.getPartitionCount())); + if (!viewParams.containsKey(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())) { + decoratedViewParamBuilder.setPartitionCount(store.getPartitionCount()); } - viewConfig.setViewParameters(viewParams); + viewConfig.setViewParameters(decoratedViewParamBuilder.build()); } VeniceView view = ViewUtils.getVeniceView(viewConfig.getViewClassName(), new Properties(), store, viewConfig.getViewParameters()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 9a5a78e61f0..973a81cde86 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -28,6 +28,7 @@ import com.linkedin.venice.meta.UncompletedReplica; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.utils.HelixUtils; @@ -35,6 +36,12 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.views.ViewUtils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -44,6 +51,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -88,6 +96,7 @@ public abstract class AbstractPushMonitor private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled; private final DisabledPartitionStats disabledPartitionStats; + private final VeniceWriterFactory veniceWriterFactory; public AbstractPushMonitor( String clusterName, @@ -103,7 +112,8 @@ public AbstractPushMonitor( HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - DisabledPartitionStats disabledPartitionStats) { + DisabledPartitionStats disabledPartitionStats, + VeniceWriterFactory veniceWriterFactory) { this.clusterName = clusterName; this.offlinePushAccessor = offlinePushAccessor; this.storeCleaner = storeCleaner; @@ -134,6 +144,7 @@ public AbstractPushMonitor( controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(), controllerConfig.useDaVinciSpecificExecutionStatusForError()); this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled(); + this.veniceWriterFactory = veniceWriterFactory; pushStatusCollector.start(); } @@ -172,7 +183,7 @@ private void loadAllPushes(List offlinePushStatusList) { if (statusWithDetails.getStatus().isTerminal()) { handleTerminalOfflinePushUpdate(offlinePushStatus, statusWithDetails); } else { - checkWhetherToStartBufferReplayForHybrid(offlinePushStatus); + checkWhetherToStartEOPProcedures(offlinePushStatus); } } else { // In any case, we found the offline push status is STARTED, but the related version could not be found. @@ -794,7 +805,7 @@ public void onPartitionStatusChange(String topic, ReadOnlyPartitionStatus partit } protected void onPartitionStatusChange(OfflinePushStatus offlinePushStatus) { - checkWhetherToStartBufferReplayForHybrid(offlinePushStatus); + checkWhetherToStartEOPProcedures(offlinePushStatus); } protected DisableReplicaCallback getDisableReplicaCallback(String kafkaTopic) { @@ -864,7 +875,7 @@ public void onExternalViewChange(PartitionAssignment partitionAssignment) { handleTerminalOfflinePushUpdate(pushStatus, statusWithDetails); } else if (statusWithDetails.getStatus().equals(ExecutionStatus.END_OF_PUSH_RECEIVED)) { // For all partitions, at least one replica has received the EOP. Check if it's time to start buffer replay. - checkWhetherToStartBufferReplayForHybrid(pushStatus); + checkWhetherToStartEOPProcedures(pushStatus); } } } else { @@ -906,7 +917,7 @@ public void onRoutingDataDeleted(String kafkaTopic) { } } - protected void checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlinePushStatus) { + protected void checkWhetherToStartEOPProcedures(OfflinePushStatus offlinePushStatus) { // As the outer method already locked on this instance, so this method is thread-safe. String storeName = Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic()); Store store = getReadWriteStoreRepository().getStore(storeName); @@ -923,35 +934,74 @@ protected void checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlin } } - if (store.isHybrid()) { - Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic())); - boolean isDataRecovery = version != null && version.getDataRecoveryVersionConfig() != null; - if (offlinePushStatus.isReadyToStartBufferReplay(isDataRecovery)) { - LOGGER.info("{} is ready to start buffer replay.", offlinePushStatus.getKafkaTopic()); - RealTimeTopicSwitcher realTimeTopicSwitcher = getRealTimeTopicSwitcher(); - try { - String newStatusDetails; + Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic())); + if (version == null) { + throw new IllegalStateException("Could not find Version object for: " + offlinePushStatus.getKafkaTopic()); + } + Map viewConfigMap = version.getViewConfigs(); + if (!store.isHybrid() && (viewConfigMap == null || viewConfigMap.isEmpty())) { + // The only procedures that may start after EOP is received in every partition are: + // 1. start buffer replay for hybrid store + // 2. send EOP for materialized view (there could be more view procedures in the future) + return; + } + try { + boolean isDataRecovery = version.getDataRecoveryVersionConfig() != null; + boolean isEOPReceivedInAllPartitions = offlinePushStatus.isEOPReceivedInEveryPartition(isDataRecovery); + StringBuilder newStatusDetails = new StringBuilder(); + // Check whether to start buffer replay + if (store.isHybrid()) { + if (isEOPReceivedInAllPartitions) { + LOGGER.info("{} is ready to start buffer replay.", offlinePushStatus.getKafkaTopic()); + RealTimeTopicSwitcher realTimeTopicSwitcher = getRealTimeTopicSwitcher(); realTimeTopicSwitcher.switchToRealTimeTopic( Version.composeRealTimeTopic(storeName), offlinePushStatus.getKafkaTopic(), store, aggregateRealTimeSourceKafkaUrl, activeActiveRealTimeSourceKafkaURLs); - newStatusDetails = "kicked off buffer replay"; - updatePushStatus(offlinePushStatus, ExecutionStatus.END_OF_PUSH_RECEIVED, Optional.of(newStatusDetails)); - LOGGER.info("Successfully {} for offlinePushStatus: {}", newStatusDetails, offlinePushStatus); - } catch (Exception e) { - // TODO: Figure out a better error handling... - String newStatusDetails = "Failed to kick off the buffer replay"; - handleTerminalOfflinePushUpdate(offlinePushStatus, new ExecutionStatusWithDetails(ERROR, newStatusDetails)); - LOGGER.error("{} for offlinePushStatus: {}", newStatusDetails, offlinePushStatus, e); + newStatusDetails.append("kicked off buffer replay"); + } else if (!offlinePushStatus.getCurrentStatus().isTerminal()) { + LOGGER.info( + "{} is not ready to start buffer replay. Current state: {}", + offlinePushStatus.getKafkaTopic(), + offlinePushStatus.getCurrentStatus().toString()); } - } else if (!offlinePushStatus.getCurrentStatus().isTerminal()) { - LOGGER.info( - "{} is not ready to start buffer replay. Current state: {}", - offlinePushStatus.getKafkaTopic(), - offlinePushStatus.getCurrentStatus().toString()); } + + if (isEOPReceivedInAllPartitions) { + // Check whether to send EOP for materialized view topic(s) + for (ViewConfig rawView: viewConfigMap.values()) { + VeniceView veniceView = + ViewUtils.getVeniceView(rawView.getViewClassName(), new Properties(), store, rawView.getViewParameters()); + if (veniceView instanceof MaterializedView) { + MaterializedView materializedView = (MaterializedView) veniceView; + for (String materializedViewTopicName: materializedView + .getTopicNamesAndConfigsForVersion(version.getNumber()) + .keySet()) { + VeniceWriterOptions.Builder vwOptionsBuilder = + new VeniceWriterOptions.Builder(materializedViewTopicName).setUseKafkaKeySerializer(true) + .setPartitionCount(materializedView.getViewPartitionCount()); + try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(vwOptionsBuilder.build())) { + veniceWriter.broadcastEndOfPush(Collections.emptyMap()); + } + if (newStatusDetails.length() > 0) { + newStatusDetails.append(", "); + } + newStatusDetails.append("broadcast EOP to materialized view topic: ").append(materializedViewTopicName); + } + } + } + updatePushStatus( + offlinePushStatus, + ExecutionStatus.END_OF_PUSH_RECEIVED, + Optional.of(newStatusDetails.toString())); + LOGGER.info("Successfully {} for offlinePushStatus: {}", newStatusDetails.toString(), offlinePushStatus); + } + } catch (Exception e) { + String newStatusDetails = "Failed to start EOP procedures"; + handleTerminalOfflinePushUpdate(offlinePushStatus, new ExecutionStatusWithDetails(ERROR, newStatusDetails)); + LOGGER.error("{} for offlinePushStatus: {}", newStatusDetails, offlinePushStatus, e); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java index 8929d108ae2..2fe30cc4260 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java @@ -11,6 +11,7 @@ import com.linkedin.venice.meta.StoreCleaner; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.writer.VeniceWriterFactory; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +39,8 @@ public PartitionStatusBasedPushMonitor( HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - DisabledPartitionStats disabledPartitionStats) { + DisabledPartitionStats disabledPartitionStats, + VeniceWriterFactory veniceWriterFactory) { super( clusterName, offlinePushAccessor, @@ -53,7 +55,8 @@ public PartitionStatusBasedPushMonitor( helixAdminClient, controllerConfig, pushStatusStoreReader, - disabledPartitionStats); + disabledPartitionStats, + veniceWriterFactory); } @Override diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java index 1ead143b0a6..d567e246e98 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java @@ -19,6 +19,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.writer.VeniceWriterFactory; import java.util.List; import java.util.Map; import java.util.Optional; @@ -57,7 +58,8 @@ public PushMonitorDelegator( HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - DisabledPartitionStats disabledPartitionStats) { + DisabledPartitionStats disabledPartitionStats, + VeniceWriterFactory veniceWriterFactory) { this.clusterName = clusterName; this.metadataRepository = metadataRepository; @@ -75,7 +77,8 @@ public PushMonitorDelegator( helixAdminClient, controllerConfig, pushStatusStoreReader, - disabledPartitionStats); + disabledPartitionStats, + veniceWriterFactory); this.clusterLockManager = clusterLockManager; this.topicToPushMonitorMap = new VeniceConcurrentHashMap<>(); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index b8a8a2fc903..054a30824d9 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -70,7 +70,7 @@ import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfigImpl; -import com.linkedin.venice.meta.ViewParameterKeys; +import com.linkedin.venice.meta.ViewParameters; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.InvalidKeySchemaPartitioner; @@ -1974,7 +1974,7 @@ public void testSetRePartitionViewConfig() { String viewString = String.format( rePartitionViewConfigString, MaterializedView.class.getCanonicalName(), - ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), + ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), rePartitionViewPartitionCount); // Invalid re-partition view name @@ -1991,13 +1991,13 @@ public void testSetRePartitionViewConfig() { Assert.assertTrue(updateStore.getViews().containsKey(rePartitionViewName)); Map rePartitionViewParameters = updateStore.getViews().get(rePartitionViewName).viewParameters; - Assert.assertNotNull(rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name())); + Assert.assertNotNull(rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name())); Assert.assertEquals( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()).toString(), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), rePartitionViewName); Assert.assertEquals( Integer.parseInt( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), rePartitionViewPartitionCount); } @@ -2032,9 +2032,8 @@ public void testInsertMaterializedViewConfig() { String rePartitionViewName = "rePartitionViewA"; int rePartitionViewPartitionCount = 10; Map viewClassParams = new HashMap<>(); - viewClassParams.put( - ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), - Integer.toString(rePartitionViewPartitionCount)); + viewClassParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(rePartitionViewPartitionCount)); // Invalid re-partition view name Assert.assertThrows( @@ -2058,13 +2057,13 @@ public void testInsertMaterializedViewConfig() { Assert.assertTrue(updateStore.getViews().containsKey(rePartitionViewName)); Map rePartitionViewParameters = updateStore.getViews().get(rePartitionViewName).viewParameters; - Assert.assertNotNull(rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name())); + Assert.assertNotNull(rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name())); Assert.assertEquals( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()).toString(), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()).toString(), rePartitionViewName); Assert.assertEquals( Integer.parseInt( - rePartitionViewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), + rePartitionViewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()).toString()), rePartitionViewPartitionCount); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java index 1d8f06eecba..ec06fb0cd4e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java @@ -46,12 +46,22 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.views.ViewUtils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.VeniceWriterOptions; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import java.util.ArrayList; @@ -61,10 +71,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -85,6 +97,10 @@ public abstract class AbstractPushMonitorTest { protected MetricsRepository mockMetricRepo; + protected VeniceWriterFactory mockVeniceWriterFactory; + + protected VeniceWriter mockVeniceWriter; + private final static String clusterName = Utils.getUniqueString("test_cluster"); private final static String aggregateRealTimeSourceKafkaUrl = "aggregate-real-time-source-kafka-url"; private String storeName; @@ -114,12 +130,15 @@ public void setUp() { mockPushHealthStats = mock(AggPushHealthStats.class); clusterLockManager = new ClusterLockManager(clusterName); mockControllerConfig = mock(VeniceControllerClusterConfig.class); + mockVeniceWriterFactory = mock(VeniceWriterFactory.class); + mockVeniceWriter = mock(VeniceWriter.class); when(mockMetricRepo.sensor(anyString(), any())).thenReturn(mock(Sensor.class)); when(mockControllerConfig.isErrorLeaderReplicaFailOverEnabled()).thenReturn(true); when(mockControllerConfig.isDaVinciPushStatusEnabled()).thenReturn(true); when(mockControllerConfig.getDaVinciPushStatusScanIntervalInSeconds()).thenReturn(5); when(mockControllerConfig.getOffLineJobWaitTimeInMilliseconds()).thenReturn(120000L); when(mockControllerConfig.getDaVinciPushStatusScanThreadNumber()).thenReturn(4); + when(mockVeniceWriterFactory.createVeniceWriter(any())).thenReturn(mockVeniceWriter); monitor = getPushMonitor(); } @@ -1081,10 +1100,71 @@ public void testGetPushStatusAndDetails() { "Details should change as a side effect of calling getPushStatusAndDetails."); } - protected Store prepareMockStore(String topic, VersionStatus status) { + @Test + public void testEOPReceivedProcedures() { + Map viewConfigMap = new HashMap<>(); + Map viewParams = new HashMap<>(); + String viewName = "testView"; + int viewPartitionCount = 10; + viewParams.put(ViewParameters.MATERIALIZED_VIEW_NAME.name(), viewName); + viewParams.put(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(viewPartitionCount)); + viewParams + .put(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), DefaultVenicePartitioner.class.getCanonicalName()); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParams); + viewConfigMap.put(viewName, viewConfig); + String topic = getTopic(); + int versionNumber = Version.parseVersionFromKafkaTopicName(topic); + Store store = prepareMockStore(topic, VersionStatus.STARTED, viewConfigMap); + VeniceView veniceView = + ViewUtils.getVeniceView(viewConfig.getViewClassName(), new Properties(), store, viewConfig.getViewParameters()); + assertTrue(veniceView instanceof MaterializedView); + MaterializedView materializedView = (MaterializedView) veniceView; + String viewTopicName = + materializedView.getTopicNamesAndConfigsForVersion(versionNumber).keySet().stream().findAny().get(); + assertNotNull(viewTopicName); + + monitor.startMonitorOfflinePush( + topic, + numberOfPartition, + replicationFactor, + OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION); + // Prepare the new partition status + List replicaStatuses = new ArrayList<>(); + for (int i = 0; i < replicationFactor; i++) { + ReplicaStatus replicaStatus = new ReplicaStatus("test" + i); + replicaStatuses.add(replicaStatus); + } + // All replicas are in STARTED status + ReadOnlyPartitionStatus partitionStatus = new ReadOnlyPartitionStatus(0, replicaStatuses); + doReturn(true).when(mockRoutingDataRepo).containsKafkaTopic(topic); + doReturn(new PartitionAssignment(topic, 1)).when(mockRoutingDataRepo).getPartitionAssignments(topic); + // Check hybrid push status + monitor.onPartitionStatusChange(topic, partitionStatus); + // Not ready to send EOP to view topics + verify(mockVeniceWriterFactory, never()).createVeniceWriter(any()); + verify(mockVeniceWriter, never()).broadcastEndOfPush(any()); + + // One replica received end of push + replicaStatuses.get(0).updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED); + monitor.onPartitionStatusChange(topic, partitionStatus); + ArgumentCaptor vwOptionsCaptor = ArgumentCaptor.forClass(VeniceWriterOptions.class); + verify(mockVeniceWriterFactory, times(1)).createVeniceWriter(vwOptionsCaptor.capture()); + assertEquals(vwOptionsCaptor.getValue().getPartitionCount(), Integer.valueOf(viewPartitionCount)); + assertEquals(vwOptionsCaptor.getValue().getTopicName(), viewTopicName); + verify(mockVeniceWriter, times(1)).broadcastEndOfPush(any()); + assertEquals(monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.END_OF_PUSH_RECEIVED); + + // Another replica received end of push. We shouldn't write multiple EOP to view topic + replicaStatuses.get(1).updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED); + monitor.onPartitionStatusChange(topic, partitionStatus); + verify(mockVeniceWriter, times(1)).broadcastEndOfPush(any()); + } + + protected Store prepareMockStore(String topic, VersionStatus status, Map viewConfigMap) { String storeName = Version.parseStoreFromKafkaTopicName(topic); int versionNumber = Version.parseVersionFromKafkaTopicName(topic); Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setViewConfigs(viewConfigMap); Version version = new VersionImpl(storeName, versionNumber); version.setStatus(status); store.addVersion(version); @@ -1092,6 +1172,10 @@ protected Store prepareMockStore(String topic, VersionStatus status) { return store; } + protected Store prepareMockStore(String topic, VersionStatus status) { + return prepareMockStore(topic, status, Collections.emptyMap()); + } + protected Store prepareMockStore(String topic) { return prepareMockStore(topic, VersionStatus.STARTED); } @@ -1151,4 +1235,8 @@ protected ClusterLockManager getClusterLockManager() { protected String getAggregateRealTimeSourceKafkaUrl() { return aggregateRealTimeSourceKafkaUrl; } + + protected VeniceWriterFactory getMockVeniceWriterFactory() { + return mockVeniceWriterFactory; + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java index 7c71932dcbe..f2d1793f38a 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java @@ -66,7 +66,8 @@ protected AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner) { helixAdminClient, getMockControllerConfig(), null, - mock(DisabledPartitionStats.class)); + mock(DisabledPartitionStats.class), + getMockVeniceWriterFactory()); } @Override @@ -85,7 +86,8 @@ protected AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher mockRealTimeT mock(HelixAdminClient.class), getMockControllerConfig(), null, - mock(DisabledPartitionStats.class)); + mock(DisabledPartitionStats.class), + getMockVeniceWriterFactory()); } @Test