diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index fe5a695e72..f7e82c419a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -2475,7 +2475,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( if (kafkaKey.isControlMessage()) { boolean producedFinally = true; - ControlMessage controlMessage = (ControlMessage) kafkaValue.payloadUnion; + ControlMessage controlMessage = (ControlMessage) kafkaValue.getPayloadUnion(); ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage); leaderProducedRecordContext = LeaderProducedRecordContext .newControlMessageRecord(kafkaClusterId, consumerRecord.getOffset(), kafkaKey.getKey(), controlMessage); @@ -2502,10 +2502,11 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( case END_OF_PUSH: // CMs that are produced with DIV pass-through mode can break DIV without synchronization with view writers. // This is because for data (PUT) records we queue their produceToLocalKafka behind the completion of view - // writers. The main SIT will move on to subsequent messages and for CMs we are producing them directly - // because we don't need to replicate these CMs to view topics. If we don't synchronize before producing the + // writers. The main SIT will move on to subsequent messages and for CMs that don't need to be propagated + // to view topics we are producing them directly. If we don't check the previous write before producing the // CMs then in the VT we might get out of order messages and with pass-through DIV that's going to be an // issue. e.g. a PUT record belonging to seg:0 can come after the EOS of seg:0 due to view writer delays. + // Since SOP and EOP are rare we can simply wait for the last VT produce future. checkAndWaitForLastVTProduceFuture(partitionConsumptionState); /** * Simply produce this EOP to local VT. It will be processed in order in the drainer queue later @@ -2551,38 +2552,63 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( * * There is one exception that overrules the above conditions. i.e. if the SOS is a heartbeat from the RT topic. * In such case the heartbeat is produced to VT with updated {@link LeaderMetadataWrapper}. + * + * We want to ensure correct ordering for any SOS and EOS that we do decide to write to VT. This is done by + * coordinating with the corresponding {@link PartitionConsumptionState#getLastVTProduceCallFuture}. */ if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) { - checkAndWaitForLastVTProduceFuture(partitionConsumptionState); - produceToLocalKafka( - consumerRecord, - partitionConsumptionState, - leaderProducedRecordContext, - (callback, leaderMetadataWrapper) -> partitionConsumptionState.getVeniceWriterLazyRef() - .get() - .put( - consumerRecord.getKey(), - consumerRecord.getValue(), - callback, - consumerRecord.getTopicPartition().getPartitionNumber(), - leaderMetadataWrapper), - partition, - kafkaUrl, - kafkaClusterId, - beforeProcessingPerRecordTimestampNs); + + final LeaderProducedRecordContext segmentCMLeaderProduceRecordContext = leaderProducedRecordContext; + CompletableFuture propagateSegmentCMWrite = new CompletableFuture<>(); + partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> { + if (exception == null) { + produceToLocalKafka( + consumerRecord, + partitionConsumptionState, + segmentCMLeaderProduceRecordContext, + (callback, leaderMetadataWrapper) -> partitionConsumptionState.getVeniceWriterLazyRef() + .get() + .put( + consumerRecord.getKey(), + consumerRecord.getValue(), + callback, + consumerRecord.getTopicPartition().getPartitionNumber(), + leaderMetadataWrapper), + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingPerRecordTimestampNs); + propagateSegmentCMWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + propagateSegmentCMWrite.completeExceptionally(veniceException); + } + }); + partitionConsumptionState.setLastVTProduceCallFuture(propagateSegmentCMWrite); } else { if (controlMessageType == START_OF_SEGMENT && Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) { - // We also want to synchronize with view writers for heartbeat CMs, so we can detect hanging VWs - checkAndWaitForLastVTProduceFuture(partitionConsumptionState); - propagateHeartbeatFromUpstreamTopicToLocalVersionTopic( - partitionConsumptionState, - consumerRecord, - leaderProducedRecordContext, - partition, - kafkaUrl, - kafkaClusterId, - beforeProcessingPerRecordTimestampNs); + CompletableFuture propagateHeartbeatWrite = new CompletableFuture<>(); + final LeaderProducedRecordContext heartbeatLeaderProducedRecordContext = leaderProducedRecordContext; + partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> { + if (exception == null) { + propagateHeartbeatFromUpstreamTopicToLocalVersionTopic( + partitionConsumptionState, + consumerRecord, + heartbeatLeaderProducedRecordContext, + partition, + kafkaUrl, + kafkaClusterId, + beforeProcessingPerRecordTimestampNs); + propagateHeartbeatWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + propagateHeartbeatWrite.completeExceptionally(veniceException); + } + }); + partitionConsumptionState.setLastVTProduceCallFuture(propagateHeartbeatWrite); } else { /** * Based on current design handling this case (specially EOS) is tricky as we don't produce the SOS/EOS diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index b3646850ae..a529d0aef1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -4,91 +4,50 @@ import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; -import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.utils.ByteUtils; -import com.linkedin.venice.utils.RedundantExceptionFilter; -import com.linkedin.venice.utils.SystemTime; -import com.linkedin.venice.utils.Time; -import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; -import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * Materialized view writer is responsible for processing input records from the version topic and write them to the * materialized view topic based on parameters defined in {@link com.linkedin.venice.meta.MaterializedViewParameters}. - * This writer has its own {@link VeniceWriter} and will also propagate heartbeat messages differently. See details in - * the doc for {@link #maybePropagateHeartbeatLowWatermarkToViewTopic} method. + * This writer has its own {@link VeniceWriter}. */ public class MaterializedViewWriter extends VeniceViewWriter { private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory; private final MaterializedView internalView; - private final ReentrantLock broadcastHBLock = new ReentrantLock(); - private final Map partitionToHeartbeatTimestampMap = new HashMap<>(); - private final Time time; private final String materializedViewTopicName; private Lazy veniceWriter; - private long lastHBBroadcastTimestamp; - - /** - * These configs can be exposed to view parameters if or server configs if needed - */ - private static final long DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); - private static final long DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD = TimeUnit.MINUTES.toMillis(5); - private static final int DEFAULT_PARTITION_TO_ALWAYS_BROADCAST = 0; - private static final Logger LOGGER = LogManager.getLogger(MaterializedViewWriter.class); - private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = - RedundantExceptionFilter.getRedundantExceptionFilter(); public MaterializedViewWriter( VeniceConfigLoader props, Version version, Schema keySchema, - Map extraViewParameters, - Time time) { + Map extraViewParameters) { super(props, version, keySchema, extraViewParameters); pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); internalView = new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters); materializedViewTopicName = internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get(); - this.time = time; this.veniceWriter = Lazy.of( () -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null) .createVeniceWriter(buildWriterOptions())); } - public MaterializedViewWriter( - VeniceConfigLoader props, - Version version, - Schema keySchema, - Map extraViewParameters) { - this(props, version, keySchema, extraViewParameters, SystemTime.INSTANCE); - } - /** * package private for testing purpose */ @@ -123,14 +82,8 @@ public void processControlMessage( ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) { - final ControlMessageType type = ControlMessageType.valueOf(controlMessage); - // Ignore other control messages for materialized view. - if (type == ControlMessageType.START_OF_SEGMENT && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) { - maybePropagateHeartbeatLowWatermarkToViewTopic( - partition, - partitionConsumptionState, - kafkaMessageEnvelope.getProducerMetadata().getMessageTimestamp()); - } + // Ignore all control messages for materialized view for now. Will revisit on the client side time lag monitoring. + // TODO we need to handle new version CM for CC for materialized view. } @Override @@ -142,95 +95,4 @@ public String getWriterClassName() { VeniceWriterOptions buildWriterOptions() { return setProducerOptimizations(internalView.getWriterOptionsBuilder(materializedViewTopicName, version)).build(); } - - /** - * View topic's partitioner and partition count could be different from the VT. In order to ensure we are capturing - * all potential lag in the VT ingestion from the view topic, we will broadcast the low watermark observed from every - * VT leader to all partitions of the view topic. To reduce the heartbeat spam we can use a strategy as follows: - * 1. Leader of partition 0 always broadcasts its low watermark timestamp to all view topic partitions. - * 2. Leader of other partitions will only broadcast its heartbeat low watermark timestamp if it's sufficiently - * stale. This is configurable but by default it could be >= 5 minutes. This is because broadcasting redundant - * up-to-date heartbeat in view topic is not meaningful when the main goal here is just to identify if there - * are any lagging partitions or the largest lag amongst all VT partitions. Since lag in any VT partition could - * result in lag in one or more view topic partitions. - * 3. This broadcasting heartbeat mechanism will only provide lag info to view topic consumers if the corresponding - * VT consumption is not stuck. e.g. if one VT partition is stuck we won't be able to detect such issue from the - * view topic heartbeats because VT partitions that are not stuck will be broadcasting heartbeats. Due to this - * reason we can also clear and rebuild the partition to timestamp map to simplify the maintenance logic. - */ - private void maybePropagateHeartbeatLowWatermarkToViewTopic( - int partition, - PartitionConsumptionState partitionConsumptionState, - long heartbeatTimestamp) { - boolean propagate = false; - long oldestHeartbeatTimestamp = 0; - broadcastHBLock.lock(); - try { - partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp); - long now = time.getMilliseconds(); - if (now > lastHBBroadcastTimestamp + DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS - && !partitionToHeartbeatTimestampMap.isEmpty()) { - oldestHeartbeatTimestamp = Collections.min(partitionToHeartbeatTimestampMap.values()); - if (partition == DEFAULT_PARTITION_TO_ALWAYS_BROADCAST - || now - oldestHeartbeatTimestamp > DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD) { - propagate = true; - lastHBBroadcastTimestamp = now; - } - // We have determined that the oldestHeartbeatTimestamp offers no value in monitoring the lag for this view - // topic since it's within the DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD. We are also clearing the map, so we - // don't need to worry about removing timestamps belonging to partitions that we are no longer leader of. - partitionToHeartbeatTimestampMap.clear(); - } - } finally { - broadcastHBLock.unlock(); - } - if (propagate && oldestHeartbeatTimestamp > 0) { - LeaderCompleteState leaderCompleteState = - LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported()); - Set failedPartitions = VeniceConcurrentHashMap.newKeySet(); - Set> heartbeatFutures = VeniceConcurrentHashMap.newKeySet(); - AtomicReference completionException = new AtomicReference<>(); - for (int p = 0; p < internalView.getViewPartitionCount(); p++) { - // Due to the intertwined partition mapping, the actual LeaderMetadataWrapper is meaningless for materialized - // view consumers. Similarly, we will propagate the LeaderCompleteState, but it will only guarantee that at - // least - // one partition leader has completed. - final int viewPartitionNumber = p; - CompletableFuture heartBeatFuture = veniceWriter.get() - .sendHeartbeat( - materializedViewTopicName, - viewPartitionNumber, - null, - VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, - true, - leaderCompleteState, - oldestHeartbeatTimestamp); - heartBeatFuture.whenComplete((ignore, throwable) -> { - if (throwable != null) { - completionException.set(new CompletionException(throwable)); - failedPartitions.add(String.valueOf(viewPartitionNumber)); - } - }); - heartbeatFutures.add(heartBeatFuture); - } - if (!heartbeatFutures.isEmpty()) { - CompletableFuture.allOf(heartbeatFutures.toArray(new CompletableFuture[0])) - .whenCompleteAsync((ignore, throwable) -> { - if (!failedPartitions.isEmpty()) { - int failedCount = failedPartitions.size(); - String logMessage = String.format( - "Broadcast materialized view heartbeat for %d partitions of topic %s: %d succeeded, %d failed for partitions %s", - heartbeatFutures.size(), - materializedViewTopicName, - heartbeatFutures.size() - failedCount, - failedCount, - String.join(",", failedPartitions)); - if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) { - LOGGER.error(logMessage, completionException.get()); - } - } - }); - } - } - } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 85de83f49f..48919afaa4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -1,11 +1,13 @@ package com.linkedin.davinci.store.view; +import static com.linkedin.venice.views.ViewUtils.NEARLINE_PRODUCER_COMPRESSION_ENABLED; +import static com.linkedin.venice.views.ViewUtils.NEARLINE_PRODUCER_COUNT_PER_WRITER; + import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; -import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.views.VeniceView; @@ -43,6 +45,14 @@ public VeniceViewWriter( super(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters); this.version = version; this.versionNumber = version.getNumber(); + if (extraViewParameters.containsKey(NEARLINE_PRODUCER_COMPRESSION_ENABLED)) { + isNearlineProducerCompressionEnabled = + Optional.of(Boolean.valueOf(extraViewParameters.get(NEARLINE_PRODUCER_COMPRESSION_ENABLED))); + } + if (extraViewParameters.containsKey(NEARLINE_PRODUCER_COUNT_PER_WRITER)) { + nearlineProducerCountPerWriter = + Optional.of(Integer.valueOf(extraViewParameters.get(NEARLINE_PRODUCER_COUNT_PER_WRITER))); + } } /** @@ -102,15 +112,6 @@ public void processControlMessage( // Optionally act on Control Message } - /** - * Configure view writer options based on the configs of the provided Store - * @param store to extract the relevant configs from - */ - public void configureWriterOptions(Store store) { - isNearlineProducerCompressionEnabled = Optional.of(store.isNearlineProducerCompressionEnabled()); - nearlineProducerCountPerWriter = Optional.of(store.getNearlineProducerCountPerWriter()); - } - /** * A store could have many views and to reduce the impact to write throughput we want to check and enable producer * optimizations that can be configured at the store level. To change the producer optimization configs the ingestion diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java index e49edcf5e4..20d0b18af1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java @@ -25,7 +25,6 @@ public Map buildStoreViewWriters(Store store, int vers Map extraParams = viewConfig.getValue().getViewParameters(); VeniceViewWriter viewWriter = ViewWriterUtils.getVeniceViewWriter(className, properties, store, version, keySchema, extraParams); - viewWriter.configureWriterOptions(store); storeViewWriters.put(viewConfig.getKey(), viewWriter); } return storeViewWriters; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java index 1e8fd83910..ef098e85e5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java @@ -6,6 +6,7 @@ import com.linkedin.venice.utils.ReflectUtils; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; @@ -25,11 +26,15 @@ public static VeniceViewWriter getVeniceViewWriter( new Class[] { Properties.class, String.class, Map.class }, new Object[] { params, store.getName(), extraViewParameters }); - VeniceViewWriter viewWriter = ReflectUtils.callConstructor( + // Make a copy of the view parameters map to insert producer configs + Map viewParamsWithProducerConfigs = new HashMap<>(extraViewParameters); + viewParamsWithProducerConfigs + .put(NEARLINE_PRODUCER_COMPRESSION_ENABLED, Boolean.toString(store.isNearlineProducerCompressionEnabled())); + viewParamsWithProducerConfigs + .put(NEARLINE_PRODUCER_COUNT_PER_WRITER, Integer.toString(store.getNearlineProducerCountPerWriter())); + return ReflectUtils.callConstructor( ReflectUtils.loadClass(view.getWriterClassName()), new Class[] { VeniceConfigLoader.class, Version.class, Schema.class, Map.class }, - new Object[] { configLoader, store.getVersionOrThrow(version), keySchema, extraViewParameters }); - - return viewWriter; + new Object[] { configLoader, store.getVersionOrThrow(version), keySchema, viewParamsWithProducerConfigs }); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index 15c2d83203..af02d93c75 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -2,9 +2,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -19,7 +21,12 @@ import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; @@ -28,7 +35,9 @@ import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.utils.TestUtils; @@ -268,4 +277,48 @@ public void testProcessViewWriters() throws InterruptedException { verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture(); verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt()); } + + /** + * This test is to ensure if there are view writers the CMs produced to the VT don't get out of order due previous + * writes to the VT getting delayed by corresponding view writers. Since during NR we write to view topic(s) before VT + */ + @Test + public void testControlMessagesAreInOrderWithPassthroughDIV() throws InterruptedException { + setUp(); + PubSubMessageProcessedResultWrapper pubSubMessageProcessedResultWrapper = + mock(PubSubMessageProcessedResultWrapper.class); + PubSubMessage pubSubMessage = mock(PubSubMessage.class); + doReturn(pubSubMessage).when(pubSubMessageProcessedResultWrapper).getMessage(); + KafkaKey kafkaKey = mock(KafkaKey.class); + doReturn(kafkaKey).when(pubSubMessage).getKey(); + KafkaMessageEnvelope kafkaValue = mock(KafkaMessageEnvelope.class); + doReturn(MessageType.CONTROL_MESSAGE.getValue()).when(kafkaValue).getMessageType(); + doReturn(kafkaValue).when(pubSubMessage).getValue(); + doReturn(true).when(mockPartitionConsumptionState).consumeRemotely(); + doReturn(LeaderFollowerStateType.LEADER).when(mockPartitionConsumptionState).getLeaderFollowerState(); + OffsetRecord offsetRecord = mock(OffsetRecord.class); + doReturn(offsetRecord).when(mockPartitionConsumptionState).getOffsetRecord(); + PubSubTopicPartition pubSubTopicPartition = mock(PubSubTopicPartition.class); + doReturn(pubSubTopicPartition).when(pubSubMessage).getTopicPartition(); + PubSubTopic pubSubTopic = mock(PubSubTopic.class); + doReturn(pubSubTopic).when(pubSubTopicPartition).getPubSubTopic(); + doReturn(false).when(pubSubTopic).isRealTime(); + doReturn(true).when(kafkaKey).isControlMessage(); + ControlMessage controlMessage = mock(ControlMessage.class); + doReturn(controlMessage).when(kafkaValue).getPayloadUnion(); + doReturn(ControlMessageType.END_OF_SEGMENT.getValue()).when(controlMessage).getControlMessageType(); + doReturn(0L).when(pubSubMessage).getOffset(); + CompletableFuture lastVTWriteFuture = new CompletableFuture<>(); + doReturn(lastVTWriteFuture).when(mockPartitionConsumptionState).getLastVTProduceCallFuture(); + VeniceWriter veniceWriter = mock(VeniceWriter.class); + doReturn(Lazy.of(() -> veniceWriter)).when(mockPartitionConsumptionState).getVeniceWriterLazyRef(); + + leaderFollowerStoreIngestionTask.delegateConsumerRecord(pubSubMessageProcessedResultWrapper, 0, "testURL", 0, 0, 0); + Thread.sleep(1000); + // The CM write should be queued but not executed yet since the previous VT write future is still incomplete + verify(veniceWriter, never()).put(eq(kafkaKey), eq(kafkaValue), any(), anyInt(), any()); + lastVTWriteFuture.complete(null); + // The CM should be written once the previous VT write is completed + verify(veniceWriter, timeout(1000)).put(eq(kafkaKey), eq(kafkaValue), any(), anyInt(), any()); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index a8fa513abe..2518d111d5 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -8,7 +8,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -19,30 +18,29 @@ import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; -import com.linkedin.venice.kafka.protocol.ProducerMetadata; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.utils.ObjectMapperFactory; -import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; -import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.Test; @@ -85,9 +83,13 @@ public void testBuildWriterOptions() { viewParamsBuilder.setPartitionCount(6); viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); Map viewParamsMap = viewParamsBuilder.build(); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParamsMap); + doReturn(Collections.singletonMap(viewName, viewConfig)).when(version).getViewConfigs(); VeniceConfigLoader props = getMockProps(); - MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap); - materializedViewWriter.configureWriterOptions(store); + VeniceViewWriterFactory viewWriterFactory = new VeniceViewWriterFactory(props); + VeniceViewWriter viewWriter = viewWriterFactory.buildStoreViewWriters(store, 1, SCHEMA).get(viewName); + Assert.assertTrue(viewWriter instanceof MaterializedViewWriter); + MaterializedViewWriter materializedViewWriter = (MaterializedViewWriter) viewWriter; VeniceWriterOptions writerOptions = materializedViewWriter.buildWriterOptions(); Assert.assertEquals( writerOptions.getTopicName(), @@ -100,7 +102,7 @@ public void testBuildWriterOptions() { } @Test - public void testProcessIngestionHeartbeat() { + public void testProcessControlMessage() { String storeName = "testStore"; String viewName = "testMaterializedView"; Version version = mock(Version.class); @@ -112,11 +114,7 @@ public void testProcessIngestionHeartbeat() { viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); Map viewParamsMap = viewParamsBuilder.build(); VeniceConfigLoader props = getMockProps(); - Time time = mock(Time.class); - long startTime = System.currentTimeMillis(); - doReturn(startTime).when(time).getMilliseconds(); - MaterializedViewWriter materializedViewWriter = - new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap, time); + MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap); ControlMessage controlMessage = new ControlMessage(); controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); KafkaKey kafkaKey = mock(KafkaKey.class); @@ -128,56 +126,10 @@ public void testProcessIngestionHeartbeat() { .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); materializedViewWriter.setVeniceWriter(veniceWriter); KafkaMessageEnvelope kafkaMessageEnvelope = mock(KafkaMessageEnvelope.class); - ProducerMetadata producerMetadata = mock(ProducerMetadata.class); - doReturn(producerMetadata).when(kafkaMessageEnvelope).getProducerMetadata(); - doReturn(startTime).when(producerMetadata).getMessageTimestamp(); PartitionConsumptionState partitionConsumptionState = mock(PartitionConsumptionState.class); - doReturn(true).when(partitionConsumptionState).isCompletionReported(); - materializedViewWriter .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); - long newTime = startTime + TimeUnit.MINUTES.toMillis(4); - doReturn(newTime).when(time).getMilliseconds(); - doReturn(startTime + TimeUnit.MINUTES.toMillis(1)).when(producerMetadata).getMessageTimestamp(); - materializedViewWriter - .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); - // We still don't expect any broadcast from partition 1 leader because staleness is within 5 minutes - verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); - doReturn(newTime).when(producerMetadata).getMessageTimestamp(); - materializedViewWriter - .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 0, partitionConsumptionState); - // Partition 0's leader should broadcast based on last broadcast timestamp (0) regardless of staleness threshold - ArgumentCaptor heartbeatTimestampCaptor = ArgumentCaptor.forClass(Long.class); - verify(veniceWriter, times(6)) - .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); - // The low watermark for this materialized view writer should be the latest heartbeat stamp received by partition 0 - // since the low watermark from partition 1 was ignored due to DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD - for (Long timestamp: heartbeatTimestampCaptor.getAllValues()) { - Assert.assertEquals(timestamp, Long.valueOf(newTime)); - } - newTime = newTime + TimeUnit.SECONDS.toMillis(30); - doReturn(newTime).when(time).getMilliseconds(); - doReturn(startTime).when(producerMetadata).getMessageTimestamp(); - materializedViewWriter - .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 2, partitionConsumptionState); - doReturn(newTime).when(producerMetadata).getMessageTimestamp(); - materializedViewWriter - .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); - materializedViewWriter - .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 0, partitionConsumptionState); - // No new broadcast since it's still within DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS (1 minute) since last broadcast. - verify(veniceWriter, times(6)) - .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); - newTime = newTime + TimeUnit.MINUTES.toMillis(3); - doReturn(newTime).when(time).getMilliseconds(); - doReturn(newTime).when(producerMetadata).getMessageTimestamp(); - materializedViewWriter - .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); - // We should broadcast the stale heartbeat timestamp from partition 2 since it's > than the reporting threshold - verify(veniceWriter, times(12)) - .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); - Assert.assertEquals(heartbeatTimestampCaptor.getValue(), Long.valueOf(startTime)); } private VeniceConfigLoader getMockProps() { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java index 87c522e121..4789f38a3b 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java @@ -143,6 +143,8 @@ public int getValueSchemaId() { private long lastMessageCompletedCount = 0; private AbstractVeniceWriter veniceWriter = null; + private VeniceWriter mainWriter = null; + private VeniceWriter[] childWriters = null; private int valueSchemaId = -1; private int derivedValueSchemaId = -1; private boolean enableWriteCompute = false; @@ -369,9 +371,10 @@ private AbstractVeniceWriter createBasicVeniceWriter() { .build(); String flatViewConfigMapString = props.getString(PUSH_JOB_VIEW_CONFIGS, ""); if (!flatViewConfigMapString.isEmpty()) { + mainWriter = veniceWriterFactoryFactory.createVeniceWriter(options); return createCompositeVeniceWriter( veniceWriterFactoryFactory, - veniceWriterFactoryFactory.createVeniceWriter(options), + mainWriter, flatViewConfigMapString, topicName, chunkingEnabled, @@ -391,7 +394,7 @@ protected AbstractVeniceWriter createCompositeVeniceWrit boolean rmdChunkingEnabled) { try { Map viewConfigMap = ViewUtils.parseViewConfigMapString(flatViewConfigMapString); - VeniceWriter[] childWriters = new VeniceWriter[viewConfigMap.size()]; + childWriters = new VeniceWriter[viewConfigMap.size()]; String storeName = Version.parseStoreFromKafkaTopicName(topicName); int versionNumber = Version.parseVersionFromKafkaTopicName(topicName); // TODO using a dummy Version to get venice writer options could be error prone. Alternatively we could change @@ -467,6 +470,16 @@ public void close() throws IOException { } finally { veniceWriter.close(shouldEndAllSegments); } + if (veniceWriter instanceof CompositeVeniceWriter) { + if (childWriters != null) { + for (VeniceWriter childWriter: childWriters) { + childWriter.close(shouldEndAllSegments); + } + } + if (mainWriter != null) { + mainWriter.close(shouldEndAllSegments); + } + } } maybePropagateCallbackException(); LOGGER.info("Kafka message progress after flushing and closing producer:"); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java index a1a3e33566..0a189df377 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java @@ -84,7 +84,7 @@ public static ControlMessageType valueOf(int value) { } public static ControlMessageType valueOf(ControlMessage controlMessage) { - return valueOf(controlMessage.controlMessageType); + return valueOf(controlMessage.getControlMessageType()); } public int getShallowClassOverhead() { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java index 68e99e5346..b982c9eb23 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java @@ -79,7 +79,7 @@ public static MessageType valueOf(int value) { } public static MessageType valueOf(KafkaMessageEnvelope kafkaMessageEnvelope) { - return valueOf(kafkaMessageEnvelope.messageType); + return valueOf(kafkaMessageEnvelope.getMessageType()); } public static class Constants { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java index 3e77ea0e27..3b9cd61681 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/ViewUtils.java @@ -18,6 +18,10 @@ public class ViewUtils { public static final String ETERNAL_TOPIC_RETENTION_ENABLED = "eternal.topic.retention.enabled"; + public static final String NEARLINE_PRODUCER_COMPRESSION_ENABLED = "nearline.producer.compression.enabled"; + + public static final String NEARLINE_PRODUCER_COUNT_PER_WRITER = "nearline.producer.count.per.writer"; + public static VeniceView getVeniceView( String viewClass, Properties params, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java index 792dbd5ab6..d900d81ead 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java @@ -3,12 +3,14 @@ import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; +import com.linkedin.venice.annotation.NotThreadsafe; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.function.Function; /** @@ -18,6 +20,7 @@ * VeniceViewWriter here. However, the current implementation of VeniceViewWriter involves PCS which is something * specific to the ingestion path that we don't want to leak into venice-common. */ +@NotThreadsafe public class CompositeVeniceWriter extends AbstractVeniceWriter { private final VeniceWriter mainWriter; private final VeniceWriter[] childWriters; @@ -38,10 +41,7 @@ public CompositeVeniceWriter( @Override public void close(boolean gracefulClose) throws IOException { - for (VeniceWriter veniceWriter: childWriters) { - veniceWriter.close(gracefulClose); - } - mainWriter.close(gracefulClose); + // no op, child writers and the main writer are initialized outside the class and should be closed elsewhere. } @Override @@ -50,7 +50,9 @@ public CompletableFuture put( V value, int valueSchemaId, PubSubProducerCallback callback) { - return put(key, value, valueSchemaId, callback, null, false); + return compositeOperation( + (writer) -> writer.put(key, value, valueSchemaId, childCallback), + (writer) -> writer.put(key, value, valueSchemaId, callback)); } @Override @@ -60,58 +62,23 @@ public Future put( int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) { - return put(key, value, valueSchemaId, callback, putMetadata, true); - } - - CompletableFuture put( - K key, - V value, - int valueSchemaId, - PubSubProducerCallback callback, - PutMetadata putMetadata, - boolean hasPutMetadata) { - CompletableFuture finalFuture = new CompletableFuture<>(); - CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1]; - int index = 0; - childFutures[index++] = lastWriteFuture; - for (VeniceWriter writer: childWriters) { - childFutures[index++] = hasPutMetadata - ? writer.put( - key, - value, - valueSchemaId, - childCallback, - DEFAULT_LEADER_METADATA_WRAPPER, - APP_DEFAULT_LOGICAL_TS, - putMetadata) - : writer.put(key, value, valueSchemaId, childCallback); - } - CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> { - if (childException == null) { - CompletableFuture mainFuture = hasPutMetadata - ? mainWriter.put( - key, - value, - valueSchemaId, - callback, - DEFAULT_LEADER_METADATA_WRAPPER, - APP_DEFAULT_LOGICAL_TS, - putMetadata) - : mainWriter.put(key, value, valueSchemaId, callback); - mainFuture.whenCompleteAsync((result, mainWriteException) -> { - if (mainWriteException == null) { - finalFuture.complete(result); - } else { - finalFuture.completeExceptionally(new VeniceException(mainWriteException)); - } - }); - } else { - VeniceException veniceException = new VeniceException(childException); - finalFuture.completeExceptionally(veniceException); - } - }); - lastWriteFuture = finalFuture; - return finalFuture; + return compositeOperation( + (writer) -> writer.put( + key, + value, + valueSchemaId, + childCallback, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS, + putMetadata), + (writer) -> writer.put( + key, + value, + valueSchemaId, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS, + putMetadata)); } @Override @@ -119,31 +86,9 @@ public CompletableFuture delete( K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) { - CompletableFuture finalFuture = new CompletableFuture<>(); - CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1]; - int index = 0; - childFutures[index++] = lastWriteFuture; - for (VeniceWriter writer: childWriters) { - childFutures[index++] = writer.delete(key, callback, deleteMetadata); - } - CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> { - if (childException == null) { - CompletableFuture mainFuture = mainWriter.delete(key, callback, deleteMetadata); - mainFuture.whenCompleteAsync((result, mainWriteException) -> { - if (mainWriteException == null) { - finalFuture.complete(result); - } else { - finalFuture.completeExceptionally(new VeniceException(mainWriteException)); - } - }); - } else { - VeniceException veniceException = new VeniceException(childException); - finalFuture.completeExceptionally(veniceException); - } - }); - - lastWriteFuture = finalFuture; - return finalFuture; + return compositeOperation( + (writer) -> writer.delete(key, callback, deleteMetadata), + (writer) -> writer.delete(key, callback, deleteMetadata)); } /** @@ -179,4 +124,36 @@ public void flush() { public void close() throws IOException { close(true); } + + /** + * Helper function to perform a composite operation where the childWriterOp is first executed for all childWriters + * and then mainWriterOp is executed for mainWriter. The returned completable future is completed when the mainWriter + * completes the mainWriterOp. + */ + private CompletableFuture compositeOperation( + Function, CompletableFuture> childWriterOp, + Function, CompletableFuture> mainWriterOp) { + CompletableFuture finalFuture = new CompletableFuture<>(); + CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1]; + int index = 0; + childFutures[index++] = lastWriteFuture; + for (VeniceWriter writer: childWriters) { + childFutures[index++] = childWriterOp.apply(writer); + } + CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> { + if (childException == null) { + mainWriterOp.apply(mainWriter).whenCompleteAsync((result, mainWriterException) -> { + if (mainWriterException == null) { + finalFuture.complete(result); + } else { + finalFuture.completeExceptionally(new VeniceException(mainWriterException)); + } + }); + } else { + finalFuture.completeExceptionally(new VeniceException(childException)); + } + }); + lastWriteFuture = finalFuture; + return finalFuture; + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java index 6765220d45..1297ceaea3 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java @@ -11,6 +11,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import java.util.concurrent.CompletableFuture; import org.testng.Assert; import org.testng.annotations.Test; @@ -35,18 +36,27 @@ public void testWritesAreInOrder() throws InterruptedException { VeniceWriter mockMainWriter = mock(VeniceWriter.class); CompletableFuture mainWriterFuture = CompletableFuture.completedFuture(null); doReturn(mainWriterFuture).when(mockMainWriter).put(any(), any(), anyInt(), eq(null)); + PubSubProducerCallback deletePubSubProducerCallback = mock(PubSubProducerCallback.class); + DeleteMetadata deleteMetadata = mock(DeleteMetadata.class); + doReturn(mainWriterFuture).when(mockMainWriter).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata)); VeniceWriter mockChildWriter = mock(VeniceWriter.class); CompletableFuture childWriterFuture = new CompletableFuture<>(); doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(null)); + doReturn(childWriterFuture).when(mockChildWriter) + .delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata)); VeniceWriter[] childWriters = new VeniceWriter[1]; childWriters[0] = mockChildWriter; AbstractVeniceWriter compositeVeniceWriter = new CompositeVeniceWriter("test_v1", mockMainWriter, childWriters, null); compositeVeniceWriter.put(new byte[1], new byte[1], 1, null); + compositeVeniceWriter.delete(new byte[1], deletePubSubProducerCallback, deleteMetadata); verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null)); + verify(mockMainWriter, never()).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata)); Thread.sleep(1000); verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null)); + verify(mockMainWriter, never()).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata)); childWriterFuture.complete(null); verify(mockMainWriter, timeout(1000)).put(any(), any(), anyInt(), eq(null)); + verify(mockMainWriter, timeout(1000)).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata)); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index ad324d2cea..6cab04f0c3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -2498,8 +2498,8 @@ private void constructViewResources(Properties params, Store store, Version vers } } - private void cleanUpViewResources(Properties params, Store store, int version) { - Map viewConfigs = store.getViewConfigs(); + private void cleanUpViewResources(Properties params, Store store, Version version) { + Map viewConfigs = version.getViewConfigs(); if (viewConfigs == null || viewConfigs.isEmpty()) { return; } @@ -2511,11 +2511,11 @@ private void cleanUpViewResources(Properties params, Store store, int version) { for (ViewConfig rawView: viewConfigs.values()) { VeniceView adminView = ViewUtils.getVeniceView(rawView.getViewClassName(), params, store.getName(), rawView.getViewParameters()); - topicNamesAndConfigs.putAll(adminView.getTopicNamesAndConfigsForVersion(version)); + topicNamesAndConfigs.putAll(adminView.getTopicNamesAndConfigsForVersion(version.getNumber())); } Set versionTopicsToDelete = topicNamesAndConfigs.keySet() .stream() - .filter(t -> VeniceView.parseVersionFromViewTopic(t) == version) + .filter(t -> VeniceView.parseVersionFromViewTopic(t) == version.getNumber()) .collect(Collectors.toSet()); for (String topic: versionTopicsToDelete) { truncateKafkaTopic(topic); @@ -3765,7 +3765,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver if (deletedVersion.get().getPushType().isStreamReprocessing()) { truncateKafkaTopic(Version.composeStreamReprocessingTopic(storeName, versionNumber)); } - cleanUpViewResources(new Properties(), store, deletedVersion.get().getNumber()); + cleanUpViewResources(new Properties(), store, deletedVersion.get()); } if (store.isDaVinciPushStatusStoreEnabled() && !isParent()) { ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -3963,6 +3963,33 @@ public void topicCleanupWhenPushComplete(String clusterName, String storeName, i true, expectedMinCompactionLagMs, expectedMaxCompactionLagMs > 0 ? Optional.of(expectedMaxCompactionLagMs) : Optional.empty()); + + // Compaction settings should also be applied to corresponding materialized view topics + Map viewConfigs = store.getVersionOrThrow(versionNumber).getViewConfigs(); + if (viewConfigs != null && !viewConfigs.isEmpty()) { + Set viewTopicsToUpdate = new HashSet<>(); + for (ViewConfig rawViewConfig: viewConfigs.values()) { + if (MaterializedView.class.getCanonicalName().equals(rawViewConfig.getViewClassName())) { + viewTopicsToUpdate.addAll( + ViewUtils + .getVeniceView( + rawViewConfig.getViewClassName(), + new Properties(), + storeName, + rawViewConfig.getViewParameters()) + .getTopicNamesAndConfigsForVersion(versionNumber) + .keySet()); + } + } + for (String topic: viewTopicsToUpdate) { + PubSubTopic viewTopic = pubSubTopicRepository.getTopic(topic); + getTopicManager().updateTopicCompactionPolicy( + viewTopic, + true, + expectedMinCompactionLagMs, + expectedMaxCompactionLagMs > 0 ? Optional.of(expectedMaxCompactionLagMs) : Optional.empty()); + } + } } }