From 88ec04bce10476c68878f1541ca5fd5fc4ce2d4f Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Mon, 13 Jan 2025 17:51:33 -0800 Subject: [PATCH] Handle leader complete state and refactoring for code dup and testing --- .../ActiveActiveStoreIngestionTask.java | 63 +++------ .../LeaderFollowerStoreIngestionTask.java | 122 ++++++++++-------- .../LeaderFollowerStoreIngestionTaskTest.java | 96 +++++++++++--- .../datawriter/reduce/VeniceReducer.java | 23 ++-- .../datawriter/AbstractPartitionWriter.java | 38 ++++-- .../mapreduce/AbstractTestVeniceMR.java | 4 + .../datawriter/reduce/TestVeniceReducer.java | 47 +++++-- .../venice/pushmonitor/OfflinePushStatus.java | 18 +++ .../pushmonitor/AbstractPushMonitor.java | 118 +++++++++++++---- .../pushmonitor/AbstractPushMonitorTest.java | 107 +++++++++++++-- .../PartitionStatusBasedPushMonitorTest.java | 1 + 11 files changed, 438 insertions(+), 199 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 747e0af1ff..a292792657 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -22,7 +22,6 @@ import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.record.ByteBufferValueRecord; import com.linkedin.davinci.store.record.ValueRecord; -import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.utils.ByteArrayKey; import com.linkedin.venice.exceptions.PersistenceFailureException; import com.linkedin.venice.exceptions.VeniceException; @@ -68,7 +67,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; @@ -641,7 +639,7 @@ protected void processMessageAndMaybeProduceToKafka( // call in this context much less obtrusive, however, it implies that all views can only work for AA stores // Write to views - if (viewWriters != null && !this.viewWriters.isEmpty()) { + if (hasViewWriters()) { /** * The ordering guarantees we want is the following: * @@ -649,32 +647,27 @@ protected void processMessageAndMaybeProduceToKafka( * 2. Write to the VT only after we get the ack for all views AND the previous write to VT was queued into the * producer (but not necessarily acked). */ - long preprocessingTime = System.currentTimeMillis(); - CompletableFuture currentVersionTopicWrite = new CompletableFuture(); - CompletableFuture[] viewWriterFutures = - processViewWriters(partitionConsumptionState, keyBytes, mergeConflictResultWrapper); - hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); - CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { - hostLevelIngestionStats - .recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); - if (exception == null) { - producePutOrDeleteToKafka( + ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); + int oldValueSchemaId = + oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); + queueUpVersionTopicWritesWithViewWriters( + partitionConsumptionState, + (viewWriter) -> viewWriter.processRecord( + mergeConflictResultWrapper.getUpdatedValueBytes(), + oldValueBB, + keyBytes, + mergeConflictResult.getValueSchemaId(), + oldValueSchemaId, + mergeConflictResult.getRmdRecord()), + (pcs) -> producePutOrDeleteToKafka( mergeConflictResultWrapper, - partitionConsumptionState, + pcs, keyBytes, consumerRecord, partition, kafkaUrl, kafkaClusterId, - beforeProcessingRecordTimestampNs); - currentVersionTopicWrite.complete(null); - } else { - VeniceException veniceException = new VeniceException(exception); - this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); - currentVersionTopicWrite.completeExceptionally(veniceException); - } - }); - partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite); + beforeProcessingRecordTimestampNs)); } else { // This function may modify the original record in KME and it is unsafe to use the payload from KME directly // after @@ -1428,30 +1421,6 @@ public boolean isReadyToServeAnnouncedWithRTLag() { return false; } - CompletableFuture[] processViewWriters( - PartitionConsumptionState partitionConsumptionState, - byte[] keyBytes, - MergeConflictResultWrapper mergeConflictResultWrapper) { - CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; - MergeConflictResult mergeConflictResult = mergeConflictResultWrapper.getMergeConflictResult(); - int index = 0; - // The first future is for the previous write to VT - viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); - ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); - int oldValueSchemaId = - oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); - for (VeniceViewWriter writer: viewWriters.values()) { - viewWriterFutures[index++] = writer.processRecord( - mergeConflictResultWrapper.getUpdatedValueBytes(), - oldValueBB, - keyBytes, - mergeConflictResult.getValueSchemaId(), - oldValueSchemaId, - mergeConflictResult.getRmdRecord()); - } - return viewWriterFutures; - } - Runnable buildRepairTask( String sourceKafkaUrl, PubSubTopicPartition sourceTopicPartition, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index f7e82c419a..933a47be8d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -110,6 +110,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.LongPredicate; import java.util.function.Predicate; import java.util.function.Supplier; @@ -2555,14 +2557,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( * * We want to ensure correct ordering for any SOS and EOS that we do decide to write to VT. This is done by * coordinating with the corresponding {@link PartitionConsumptionState#getLastVTProduceCallFuture}. + * However, this coordination is only needed if there are view writers. i.e. the VT writes and CM writes + * need to be in the same mode. Either both coordinate with lastVTProduceCallFuture or neither. */ if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) { - final LeaderProducedRecordContext segmentCMLeaderProduceRecordContext = leaderProducedRecordContext; - CompletableFuture propagateSegmentCMWrite = new CompletableFuture<>(); - partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> { - if (exception == null) { - produceToLocalKafka( + maybeQueueCMWritesToVersionTopic( + partitionConsumptionState, + () -> produceToLocalKafka( consumerRecord, partitionConsumptionState, segmentCMLeaderProduceRecordContext, @@ -2577,38 +2579,21 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( partition, kafkaUrl, kafkaClusterId, - beforeProcessingPerRecordTimestampNs); - propagateSegmentCMWrite.complete(null); - } else { - VeniceException veniceException = new VeniceException(exception); - this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); - propagateSegmentCMWrite.completeExceptionally(veniceException); - } - }); - partitionConsumptionState.setLastVTProduceCallFuture(propagateSegmentCMWrite); + beforeProcessingPerRecordTimestampNs)); } else { if (controlMessageType == START_OF_SEGMENT && Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) { - CompletableFuture propagateHeartbeatWrite = new CompletableFuture<>(); final LeaderProducedRecordContext heartbeatLeaderProducedRecordContext = leaderProducedRecordContext; - partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> { - if (exception == null) { - propagateHeartbeatFromUpstreamTopicToLocalVersionTopic( + maybeQueueCMWritesToVersionTopic( + partitionConsumptionState, + () -> propagateHeartbeatFromUpstreamTopicToLocalVersionTopic( partitionConsumptionState, consumerRecord, heartbeatLeaderProducedRecordContext, partition, kafkaUrl, kafkaClusterId, - beforeProcessingPerRecordTimestampNs); - propagateHeartbeatWrite.complete(null); - } else { - VeniceException veniceException = new VeniceException(exception); - this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); - propagateHeartbeatWrite.completeExceptionally(veniceException); - } - }); - partitionConsumptionState.setLastVTProduceCallFuture(propagateHeartbeatWrite); + beforeProcessingPerRecordTimestampNs)); } else { /** * Based on current design handling this case (specially EOS) is tricky as we don't produce the SOS/EOS @@ -3385,31 +3370,19 @@ protected void processMessageAndMaybeProduceToKafka( return; } // Write to views - if (viewWriters != null && !viewWriters.isEmpty()) { - long preprocessingTime = System.currentTimeMillis(); - CompletableFuture currentVersionTopicWrite = new CompletableFuture<>(); - CompletableFuture[] viewWriterFutures = - processViewWriters(partitionConsumptionState, keyBytes, writeComputeResultWrapper); - hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); - CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { - hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); - if (exception == null) { - produceToLocalKafkaHelper( + if (hasViewWriters()) { + Put newPut = writeComputeResultWrapper.getNewPut(); + queueUpVersionTopicWritesWithViewWriters( + partitionConsumptionState, + (viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId), + (pcs) -> produceToLocalKafkaHelper( consumerRecord, - partitionConsumptionState, + pcs, writeComputeResultWrapper, partition, kafkaUrl, kafkaClusterId, - beforeProcessingRecordTimestampNs); - currentVersionTopicWrite.complete(null); - } else { - VeniceException veniceException = new VeniceException(exception); - this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); - currentVersionTopicWrite.completeExceptionally(veniceException); - } - }); - partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite); + beforeProcessingRecordTimestampNs)); } else { produceToLocalKafkaHelper( consumerRecord, @@ -3976,21 +3949,33 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio } } - CompletableFuture[] processViewWriters( + protected void queueUpVersionTopicWritesWithViewWriters( PartitionConsumptionState partitionConsumptionState, - byte[] keyBytes, - WriteComputeResultWrapper writeComputeResultWrapper) { + Function> viewWriterRecordProcessor, + Consumer versionTopicWrite) { + long preprocessingTime = System.currentTimeMillis(); + CompletableFuture currentVersionTopicWrite = new CompletableFuture<>(); CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1]; int index = 0; // The first future is for the previous write to VT viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture(); for (VeniceViewWriter writer: viewWriters.values()) { - viewWriterFutures[index++] = writer.processRecord( - writeComputeResultWrapper.getNewPut().putValue, - keyBytes, - writeComputeResultWrapper.getNewPut().schemaId); - } - return viewWriterFutures; + viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer); + } + hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { + hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + if (exception == null) { + versionTopicWrite.accept(partitionConsumptionState); + currentVersionTopicWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + currentVersionTopicWrite.completeExceptionally(veniceException); + } + }); + + partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite); } /** @@ -4023,4 +4008,29 @@ private void checkAndWaitForLastVTProduceFuture(PartitionConsumptionState partit throws ExecutionException, InterruptedException { partitionConsumptionState.getLastVTProduceCallFuture().get(); } + + protected boolean hasViewWriters() { + return viewWriters != null && !viewWriters.isEmpty(); + } + + private void maybeQueueCMWritesToVersionTopic( + PartitionConsumptionState partitionConsumptionState, + Runnable produceCall) { + if (hasViewWriters()) { + CompletableFuture propagateSegmentCMWrite = new CompletableFuture<>(); + partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> { + if (exception == null) { + produceCall.run(); + propagateSegmentCMWrite.complete(null); + } else { + VeniceException veniceException = new VeniceException(exception); + this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); + propagateSegmentCMWrite.completeExceptionally(veniceException); + } + }); + partitionConsumptionState.setLastVTProduceCallFuture(propagateSegmentCMWrite); + } else { + produceCall.run(); + } + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index af02d93c75..490cc1bb17 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -1,8 +1,8 @@ package com.linkedin.davinci.kafka.consumer; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -17,12 +17,15 @@ import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel; +import com.linkedin.davinci.stats.AggHostLevelIngestionStats; +import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; import com.linkedin.venice.kafka.protocol.enums.MessageType; @@ -46,12 +49,18 @@ import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.writer.VeniceWriter; import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; +import org.mockito.ArgumentCaptor; +import org.mockito.internal.verification.VerificationModeFactory; +import org.mockito.verification.Timeout; import org.testng.annotations.Test; @@ -67,6 +76,7 @@ public class LeaderFollowerStoreIngestionTaskTest { private VeniceStoreVersionConfig mockVeniceStoreVersionConfig; private VeniceViewWriterFactory mockVeniceViewWriterFactory; + private HostLevelIngestionStats hostLevelIngestionStats; @Test public void testCheckWhetherToCloseUnusedVeniceWriter() { @@ -174,10 +184,14 @@ public void setUp() throws InterruptedException { VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); doReturn(Object2IntMaps.emptyMap()).when(mockVeniceServerConfig).getKafkaClusterUrlToIdMap(); PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + hostLevelIngestionStats = mock(HostLevelIngestionStats.class); + AggHostLevelIngestionStats aggHostLevelIngestionStats = mock(AggHostLevelIngestionStats.class); + doReturn(hostLevelIngestionStats).when(aggHostLevelIngestionStats).getStoreStats(storeName); StoreIngestionTaskFactory.Builder builder = TestUtils.getStoreIngestionTaskBuilder(storeName) .setServerConfig(mockVeniceServerConfig) .setPubSubTopicRepository(pubSubTopicRepository) - .setVeniceViewWriterFactory(mockVeniceViewWriterFactory); + .setVeniceViewWriterFactory(mockVeniceViewWriterFactory) + .setHostLevelIngestionStats(aggHostLevelIngestionStats); when(builder.getSchemaRepo().getKeySchema(storeName)).thenReturn(new SchemaEntry(1, "\"string\"")); mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName); Version version = mockStore.getVersion(versionNumber); @@ -258,7 +272,7 @@ public void testVeniceWriterInProcessConsumerAction() throws InterruptedExceptio } @Test - public void testProcessViewWriters() throws InterruptedException { + public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedException { mockVeniceViewWriterFactory = mock(VeniceViewWriterFactory.class); Map viewWriterMap = new HashMap<>(); MaterializedViewWriter materializedViewWriter = mock(MaterializedViewWriter.class); @@ -271,11 +285,30 @@ public void testProcessViewWriters() throws InterruptedException { Put put = new Put(); put.schemaId = 1; when(mockResult.getNewPut()).thenReturn(put); - CompletableFuture[] futures = - leaderFollowerStoreIngestionTask.processViewWriters(mockPartitionConsumptionState, new byte[1], mockResult); - assertEquals(futures.length, 2); + AtomicBoolean writeToVersionTopic = new AtomicBoolean(false); + when(mockPartitionConsumptionState.getLastVTProduceCallFuture()) + .thenReturn(CompletableFuture.completedFuture(null)); + leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters( + mockPartitionConsumptionState, + (viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1), + (pcs) -> writeToVersionTopic.set(true)); verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture(); + ArgumentCaptor vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class); + verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture()); verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt()); + verify(hostLevelIngestionStats, times(1)).recordViewProducerLatency(anyDouble()); + verify(hostLevelIngestionStats, never()).recordViewProducerAckLatency(anyDouble()); + assertFalse(writeToVersionTopic.get()); + assertFalse(vtWriteFutureCaptor.getValue().isDone()); + assertFalse(vtWriteFutureCaptor.getValue().isCompletedExceptionally()); + viewWriterFuture.complete(null); + TestUtils.waitForNonDeterministicAssertion( + 1, + TimeUnit.SECONDS, + () -> assertTrue(vtWriteFutureCaptor.getValue().isDone())); + assertFalse(vtWriteFutureCaptor.getValue().isCompletedExceptionally()); + assertTrue(writeToVersionTopic.get()); + verify(hostLevelIngestionStats, times(1)).recordViewProducerAckLatency(anyDouble()); } /** @@ -284,7 +317,38 @@ public void testProcessViewWriters() throws InterruptedException { */ @Test public void testControlMessagesAreInOrderWithPassthroughDIV() throws InterruptedException { + mockVeniceViewWriterFactory = mock(VeniceViewWriterFactory.class); + Map viewWriterMap = new HashMap<>(); + MaterializedViewWriter materializedViewWriter = mock(MaterializedViewWriter.class); + viewWriterMap.put("testView", materializedViewWriter); + when(mockVeniceViewWriterFactory.buildStoreViewWriters(any(), anyInt(), any())).thenReturn(viewWriterMap); setUp(); + PubSubMessageProcessedResultWrapper firstCM = getMockMessage(1); + PubSubMessageProcessedResultWrapper secondCM = getMockMessage(2); + CompletableFuture lastVTWriteFuture = new CompletableFuture<>(); + CompletableFuture nextVTWriteFuture = new CompletableFuture<>(); + when(mockPartitionConsumptionState.getLastVTProduceCallFuture()).thenReturn(lastVTWriteFuture) + .thenReturn(nextVTWriteFuture); + VeniceWriter veniceWriter = mock(VeniceWriter.class); + doReturn(Lazy.of(() -> veniceWriter)).when(mockPartitionConsumptionState).getVeniceWriterLazyRef(); + leaderFollowerStoreIngestionTask.delegateConsumerRecord(firstCM, 0, "testURL", 0, 0, 0); + leaderFollowerStoreIngestionTask.delegateConsumerRecord(secondCM, 0, "testURL", 0, 0, 0); + // The CM write should be queued but not executed yet since the previous VT write future is still incomplete + verify(veniceWriter, never()).put(any(), any(), any(), anyInt(), any()); + lastVTWriteFuture.complete(null); + verify(veniceWriter, timeout(1000)).put(any(), any(), any(), anyInt(), any()); + nextVTWriteFuture.complete(null); + // The CM should be written once the previous VT write is completed + ArgumentCaptor kafkaValueCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + verify(veniceWriter, new Timeout(1000, VerificationModeFactory.times(2))) + .put(any(), kafkaValueCaptor.capture(), any(), anyInt(), any()); + int seqNumber = 1; + for (KafkaMessageEnvelope value: kafkaValueCaptor.getAllValues()) { + assertEquals(seqNumber++, value.getProducerMetadata().getMessageSequenceNumber()); + } + } + + private PubSubMessageProcessedResultWrapper getMockMessage(int seqNumber) { PubSubMessageProcessedResultWrapper pubSubMessageProcessedResultWrapper = mock(PubSubMessageProcessedResultWrapper.class); PubSubMessage pubSubMessage = mock(PubSubMessage.class); @@ -293,6 +357,9 @@ public void testControlMessagesAreInOrderWithPassthroughDIV() throws Interrupted doReturn(kafkaKey).when(pubSubMessage).getKey(); KafkaMessageEnvelope kafkaValue = mock(KafkaMessageEnvelope.class); doReturn(MessageType.CONTROL_MESSAGE.getValue()).when(kafkaValue).getMessageType(); + ProducerMetadata producerMetadata = mock(ProducerMetadata.class); + doReturn(seqNumber).when(producerMetadata).getMessageSequenceNumber(); + doReturn(producerMetadata).when(kafkaValue).getProducerMetadata(); doReturn(kafkaValue).when(pubSubMessage).getValue(); doReturn(true).when(mockPartitionConsumptionState).consumeRemotely(); doReturn(LeaderFollowerStateType.LEADER).when(mockPartitionConsumptionState).getLeaderFollowerState(); @@ -306,19 +373,8 @@ public void testControlMessagesAreInOrderWithPassthroughDIV() throws Interrupted doReturn(true).when(kafkaKey).isControlMessage(); ControlMessage controlMessage = mock(ControlMessage.class); doReturn(controlMessage).when(kafkaValue).getPayloadUnion(); - doReturn(ControlMessageType.END_OF_SEGMENT.getValue()).when(controlMessage).getControlMessageType(); - doReturn(0L).when(pubSubMessage).getOffset(); - CompletableFuture lastVTWriteFuture = new CompletableFuture<>(); - doReturn(lastVTWriteFuture).when(mockPartitionConsumptionState).getLastVTProduceCallFuture(); - VeniceWriter veniceWriter = mock(VeniceWriter.class); - doReturn(Lazy.of(() -> veniceWriter)).when(mockPartitionConsumptionState).getVeniceWriterLazyRef(); - - leaderFollowerStoreIngestionTask.delegateConsumerRecord(pubSubMessageProcessedResultWrapper, 0, "testURL", 0, 0, 0); - Thread.sleep(1000); - // The CM write should be queued but not executed yet since the previous VT write future is still incomplete - verify(veniceWriter, never()).put(eq(kafkaKey), eq(kafkaValue), any(), anyInt(), any()); - lastVTWriteFuture.complete(null); - // The CM should be written once the previous VT write is completed - verify(veniceWriter, timeout(1000)).put(eq(kafkaKey), eq(kafkaValue), any(), anyInt(), any()); + doReturn(ControlMessageType.START_OF_SEGMENT.getValue()).when(controlMessage).getControlMessageType(); + doReturn((long) seqNumber).when(pubSubMessage).getOffset(); + return pubSubMessageProcessedResultWrapper; } } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java index e202ced5d0..e398741803 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java @@ -13,7 +13,6 @@ import com.linkedin.venice.utils.IteratorUtils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.AbstractVeniceWriter; -import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import java.io.IOException; import java.util.Iterator; @@ -179,19 +178,13 @@ protected void setHadoopJobClientProvider(HadoopJobClientProvider hadoopJobClien // Visible for testing @Override - protected AbstractVeniceWriter createCompositeVeniceWriter( - VeniceWriterFactory factory, - VeniceWriter mainWriter, - String flatViewConfigMapString, - String topicName, - boolean chunkingEnabled, - boolean rmdChunkingEnabled) { - return super.createCompositeVeniceWriter( - factory, - mainWriter, - flatViewConfigMapString, - topicName, - chunkingEnabled, - rmdChunkingEnabled); + protected AbstractVeniceWriter createBasicVeniceWriter() { + return super.createBasicVeniceWriter(); + } + + // Visible for testing + @Override + protected void setVeniceWriterFactory(VeniceWriterFactory factory) { + super.setVeniceWriterFactory(factory); } } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java index 4789f38a3b..e45a50b30c 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java @@ -35,6 +35,7 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.AbstractVeniceWriter; @@ -142,6 +143,7 @@ public int getValueSchemaId() { private long lastTimeThroughputWasLoggedInNS = System.nanoTime(); private long lastMessageCompletedCount = 0; + private Lazy veniceWriterFactory; private AbstractVeniceWriter veniceWriter = null; private VeniceWriter mainWriter = null; private VeniceWriter[] childWriters = null; @@ -216,6 +218,11 @@ public void processValuesForKey(byte[] key, Iterator values, DataWriterT updateExecutionTimeStatus(timeOfLastReduceFunctionStartInNS); } + // For testing purpose + protected void setVeniceWriterFactory(VeniceWriterFactory factory) { + this.veniceWriterFactory = Lazy.of(() -> factory); + } + protected DataWriterTaskTracker getDataWriterTaskTracker() { return dataWriterTaskTracker; } @@ -338,20 +345,10 @@ private void sendMessageToKafka( dataWriterTaskTracker.trackRecordSentToPubSub(); } - private AbstractVeniceWriter createBasicVeniceWriter() { - Properties writerProps = props.toProperties(); - // Closing segments based on elapsed time should always be disabled in data writer compute jobs to prevent storage - // nodes from consuming out of order keys when speculative execution is enabled. - writerProps.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, -1); - + protected AbstractVeniceWriter createBasicVeniceWriter() { EngineTaskConfigProvider engineTaskConfigProvider = getEngineTaskConfigProvider(); Properties jobProps = engineTaskConfigProvider.getJobProps(); - - // Use the UUID bits created by the VPJ driver to build a producerGUID deterministically - writerProps.put(GuidUtils.GUID_GENERATOR_IMPLEMENTATION, GuidUtils.DETERMINISTIC_GUID_GENERATOR_IMPLEMENTATION); - writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS)); - writerProps.put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS)); - VeniceWriterFactory veniceWriterFactoryFactory = new VeniceWriterFactory(writerProps); + VeniceWriterFactory veniceWriterFactoryFactory = veniceWriterFactory.get(); boolean chunkingEnabled = props.getBoolean(VeniceWriter.ENABLE_CHUNKING, false); boolean rmdChunkingEnabled = props.getBoolean(VeniceWriter.ENABLE_RMD_CHUNKING, false); String maxRecordSizeBytesStr = (String) jobProps @@ -384,8 +381,7 @@ private AbstractVeniceWriter createBasicVeniceWriter() { } } - // protected and package private for testing purposes - protected AbstractVeniceWriter createCompositeVeniceWriter( + private AbstractVeniceWriter createCompositeVeniceWriter( VeniceWriterFactory factory, VeniceWriter mainWriter, String flatViewConfigMapString, @@ -522,6 +518,20 @@ protected void configureTask(VeniceProperties props) { this.dataWriterTaskTracker.heartbeat(); } }, 0, 5, TimeUnit.MINUTES); + + veniceWriterFactory = Lazy.of(() -> { + Properties writerProps = this.props.toProperties(); + // Closing segments based on elapsed time should always be disabled in data writer compute jobs to prevent storage + // nodes from consuming out of order keys when speculative execution is enabled. + writerProps.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, -1); + EngineTaskConfigProvider engineTaskConfigProvider = getEngineTaskConfigProvider(); + Properties jobProps = engineTaskConfigProvider.getJobProps(); + // Use the UUID bits created by the VPJ driver to build a producerGUID deterministically + writerProps.put(GuidUtils.GUID_GENERATOR_IMPLEMENTATION, GuidUtils.DETERMINISTIC_GUID_GENERATOR_IMPLEMENTATION); + writerProps.put(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS)); + writerProps.put(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, jobProps.getProperty(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS)); + return new VeniceWriterFactory(writerProps); + }); } private void initStorageQuotaFields(VeniceProperties props) { diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/AbstractTestVeniceMR.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/AbstractTestVeniceMR.java index f3b5878c33..55d8f7b081 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/AbstractTestVeniceMR.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/AbstractTestVeniceMR.java @@ -1,5 +1,7 @@ package com.linkedin.venice.hadoop.mapreduce; +import static com.linkedin.venice.ConfigKeys.PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS; +import static com.linkedin.venice.ConfigKeys.PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS; import static com.linkedin.venice.vpj.VenicePushJobConstants.ALLOW_DUPLICATE_KEY; import static com.linkedin.venice.vpj.VenicePushJobConstants.COMPRESSION_METRIC_COLLECTION_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.COMPRESSION_STRATEGY; @@ -85,6 +87,8 @@ protected Configuration getDefaultJobConfiguration(int partitionCount) { config.set(VeniceReducer.MAP_REDUCE_JOB_ID_PROP, "job_200707121733_0003"); config.setBoolean(VeniceWriter.ENABLE_CHUNKING, false); config.setInt(PARTITION_COUNT, partitionCount); + config.setLong(PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS, 1L); + config.setLong(PUSH_JOB_GUID_LEAST_SIGNIFICANT_BITS, 1L); return new JobConf(config); } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java index d7abaee150..d50b237a22 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java @@ -1,5 +1,6 @@ package com.linkedin.venice.hadoop.mapreduce.datawriter.reduce; +import static com.linkedin.venice.ConfigKeys.PUSH_JOB_VIEW_CONFIGS; import static com.linkedin.venice.hadoop.mapreduce.counter.MRJobCounterHelper.TOTAL_KEY_SIZE_GROUP_COUNTER_NAME; import static com.linkedin.venice.hadoop.mapreduce.counter.MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME; import static com.linkedin.venice.vpj.VenicePushJobConstants.ALLOW_DUPLICATE_KEY; @@ -7,6 +8,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.ENABLE_WRITE_COMPUTE; import static com.linkedin.venice.vpj.VenicePushJobConstants.STORAGE_QUOTA_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.TELEMETRY_MESSAGE_INTERVAL; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; @@ -18,7 +20,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.core.JsonProcessingException; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.exceptions.RecordTooLargeException; import com.linkedin.venice.exceptions.VeniceException; @@ -32,6 +33,7 @@ import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker; import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; @@ -631,10 +633,10 @@ public void close() throws IOException { } @Test - public void testCreateCompositeVeniceWriter() throws JsonProcessingException { + public void testCreateAndCloseCompositeVeniceWriter() throws IOException { VeniceReducer reducer = new VeniceReducer(); - VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); - VeniceWriter mainWriter = mock(VeniceWriter.class); + VeniceWriter mainWriter = mock(VeniceWriter.class); + VeniceWriter childWriter = mock(VeniceWriter.class); Map viewConfigMap = new HashMap<>(); String view1Name = "view1"; MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder(view1Name); @@ -649,21 +651,38 @@ public void testCreateCompositeVeniceWriter() throws JsonProcessingException { ViewConfigImpl viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); viewConfigMap.put(view2Name, viewConfig2); String flatViewConfigMapString = ViewUtils.flatViewConfigMapString(viewConfigMap); - String topicName = "test_v1"; - reducer.createCompositeVeniceWriter(writerFactory, mainWriter, flatViewConfigMapString, topicName, true, true); + Configuration configuration = getDefaultJobConfiguration(10); + configuration.setStrings(PUSH_JOB_VIEW_CONFIGS, flatViewConfigMapString); + reducer.configure(new JobConf(configuration)); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + reducer.setVeniceWriterFactory(writerFactory); + when(writerFactory.createVeniceWriter(any())).thenReturn(mainWriter).thenReturn(childWriter); + reducer.setVeniceWriter(reducer.createBasicVeniceWriter()); ArgumentCaptor vwOptionsCaptor = ArgumentCaptor.forClass(VeniceWriterOptions.class); - verify(writerFactory, times(2)).createVeniceWriter(vwOptionsCaptor.capture()); + verify(writerFactory, times(3)).createVeniceWriter(vwOptionsCaptor.capture()); Map verifyPartitionToViewsMap = new HashMap<>(); + String storeName = Version.parseStoreFromKafkaTopicName(TOPIC_NAME); verifyPartitionToViewsMap.put( 6, - ViewUtils - .getVeniceView(viewConfig1.getViewClassName(), new Properties(), "test", viewConfig1.getViewParameters())); + ViewUtils.getVeniceView( + viewConfig1.getViewClassName(), + new Properties(), + storeName, + viewConfig1.getViewParameters())); verifyPartitionToViewsMap.put( 12, - ViewUtils - .getVeniceView(viewConfig2.getViewClassName(), new Properties(), "test", viewConfig2.getViewParameters())); + ViewUtils.getVeniceView( + viewConfig2.getViewClassName(), + new Properties(), + storeName, + viewConfig2.getViewParameters())); for (VeniceWriterOptions options: vwOptionsCaptor.getAllValues()) { int partitionCount = options.getPartitionCount(); + if (partitionCount == 10) { + // main writer + Assert.assertEquals(options.getTopicName(), TOPIC_NAME); + continue; + } Assert.assertTrue(verifyPartitionToViewsMap.containsKey(partitionCount)); VeniceView veniceView = verifyPartitionToViewsMap.get(partitionCount); Assert.assertTrue(veniceView instanceof MaterializedView); @@ -673,6 +692,12 @@ public void testCreateCompositeVeniceWriter() throws JsonProcessingException { materializedView.getTopicNamesAndConfigsForVersion(1).keySet().stream().findAny().get()); Assert.assertTrue(materializedView.getViewPartitioner() instanceof DefaultVenicePartitioner); } + reducer.close(); + // All child writers and main writers should be flushed and closed + verify(mainWriter, times(1)).flush(); + verify(childWriter, times(2)).flush(); + verify(mainWriter, times(1)).close(anyBoolean()); + verify(childWriter, times(2)).close(anyBoolean()); } private Reporter createZeroCountReporterMock() { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java index 3cbfdf5726..7eb7188b8b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java @@ -469,6 +469,24 @@ public boolean isEOPReceivedInEveryPartition(boolean isDataRecovery) { return isReady; } + public boolean isLeaderCompletedInEveryPartition() { + boolean isLeaderCompleted = true; + for (PartitionStatus partitionStatus: getPartitionStatuses()) { + boolean proceedToNextPartition = false; + for (ReplicaStatus replicaStatus: partitionStatus.getReplicaStatuses()) { + if (replicaStatus.getCurrentStatus() == COMPLETED) { + proceedToNextPartition = true; + break; + } + } + if (!proceedToNextPartition) { + isLeaderCompleted = false; + break; + } + } + return isLeaderCompleted; + } + public Map getPushProperties() { return pushProperties; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 47ec699c9a..111232bf75 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -41,6 +41,7 @@ import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; +import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; @@ -85,6 +86,7 @@ public abstract class AbstractPushMonitor private final StoreCleaner storeCleaner; private final AggPushHealthStats aggPushHealthStats; private final Map topicToPushMap = new VeniceConcurrentHashMap<>(); + private final Map topicToLeaderCompleteTimestampMap = new VeniceConcurrentHashMap<>(); private RealTimeTopicSwitcher realTimeTopicSwitcher; private final ClusterLockManager clusterLockManager; private final String aggregateRealTimeSourceKafkaUrl; @@ -189,6 +191,7 @@ private void loadAllPushes(List offlinePushStatusList) { } else { checkWhetherToStartEOPProcedures(offlinePushStatus); } + checkWhetherToStartLeaderCompletedProcedures(offlinePushStatus); } else { // In any case, we found the offline push status is STARTED, but the related version could not be found. // We only log it as cleaning up here was found to prematurely delete push jobs during controller failover @@ -663,6 +666,7 @@ private void cleanupPushStatus(OfflinePushStatus offlinePushStatus, boolean dele String storeName = Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic()); try (AutoCloseableLock ignore = clusterLockManager.createStoreWriteLock(storeName)) { topicToPushMap.remove(offlinePushStatus.getKafkaTopic()); + topicToLeaderCompleteTimestampMap.remove(offlinePushStatus.getKafkaTopic()); if (deletePushStatus) { offlinePushAccessor.deleteOfflinePushStatusAndItsPartitionStatuses(offlinePushStatus.getKafkaTopic()); } @@ -810,6 +814,7 @@ public void onPartitionStatusChange(String topic, ReadOnlyPartitionStatus partit protected void onPartitionStatusChange(OfflinePushStatus offlinePushStatus) { checkWhetherToStartEOPProcedures(offlinePushStatus); + checkWhetherToStartLeaderCompletedProcedures(offlinePushStatus); } protected DisableReplicaCallback getDisableReplicaCallback(String kafkaTopic) { @@ -881,6 +886,7 @@ public void onExternalViewChange(PartitionAssignment partitionAssignment) { // For all partitions, at least one replica has received the EOP. Check if it's time to start buffer replay. checkWhetherToStartEOPProcedures(pushStatus); } + checkWhetherToStartLeaderCompletedProcedures(pushStatus); } } else { LOGGER.info( @@ -923,25 +929,9 @@ public void onRoutingDataDeleted(String kafkaTopic) { protected void checkWhetherToStartEOPProcedures(OfflinePushStatus offlinePushStatus) { // As the outer method already locked on this instance, so this method is thread-safe. - String storeName = Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic()); - Store store = getReadWriteStoreRepository().getStore(storeName); - if (store == null) { - LOGGER - .info("Got a null store from metadataRepository for store name: '{}'. Will attempt a refresh().", storeName); - store = getReadWriteStoreRepository().refreshOneStore(storeName); - if (store == null) { - throw new IllegalStateException( - "checkHybridPushStatus could not find a store named '" + storeName - + "' in the metadataRepository, even after refresh()!"); - } else { - LOGGER.info("metadataRepository.refresh() allowed us to retrieve store: '{}'!", storeName); - } - } + Store store = getStoreOrThrow(offlinePushStatus); - Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(offlinePushStatus.getKafkaTopic())); - if (version == null) { - throw new IllegalStateException("Could not find Version object for: " + offlinePushStatus.getKafkaTopic()); - } + Version version = getVersionOrThrow(store, offlinePushStatus.getKafkaTopic()); Map viewConfigMap = version.getViewConfigs(); if (!store.isHybrid() && (viewConfigMap == null || viewConfigMap.isEmpty())) { // The only procedures that may start after EOP is received in every partition are: @@ -976,12 +966,12 @@ protected void checkWhetherToStartEOPProcedures(OfflinePushStatus offlinePushSta if (isEOPReceivedInAllPartitions) { // Check whether to send EOP for materialized view topic(s) for (ViewConfig rawView: viewConfigMap.values()) { - VeniceView veniceView = ViewUtils.getVeniceView( - rawView.getViewClassName(), - new Properties(), - store.getName(), - rawView.getViewParameters()); - if (veniceView instanceof MaterializedView) { + if (MaterializedView.class.getCanonicalName().equals(rawView.getViewClassName())) { + VeniceView veniceView = ViewUtils.getVeniceView( + rawView.getViewClassName(), + new Properties(), + store.getName(), + rawView.getViewParameters()); MaterializedView materializedView = (MaterializedView) veniceView; for (String materializedViewTopicName: materializedView .getTopicNamesAndConfigsForVersion(version.getNumber()) @@ -1012,6 +1002,86 @@ protected void checkWhetherToStartEOPProcedures(OfflinePushStatus offlinePushSta } } + /** + * For hybrid stores we use {@link com.linkedin.venice.writer.LeaderCompleteState} to prevent follower replicas from + * reporting COMPLETED prematurely. The same safeguard is needed for view topic consumers. Due to the obfuscated + * partition mapping between version topic leaders and view topic partitions we can only send LEADER_COMPLETED once + * all version topic partition leaders are completed. This is equivalent to at least one completed replica for all + * partitions. + */ + protected void checkWhetherToStartLeaderCompletedProcedures(OfflinePushStatus offlinePushStatus) { + if (topicToLeaderCompleteTimestampMap.containsKey(offlinePushStatus.getKafkaTopic())) { + // We've sent the LeaderCompleteState for all view topics of this version topic already. Duplicate heartbeats are + // harmless, so we are only tracking it in memory (not across restarts) to avoid heartbeat spam + return; + } + Store store = getStoreOrThrow(offlinePushStatus); + Version version = getVersionOrThrow(store, offlinePushStatus.getKafkaTopic()); + Map viewConfigMap = version.getViewConfigs(); + if (!version.isHybrid() || viewConfigMap == null || viewConfigMap.isEmpty()) { + return; + } + if (offlinePushStatus.isLeaderCompletedInEveryPartition()) { + // broadcast heartbeat with LeaderCompleteState to view topic partitions + long heartbeatTimestamp = System.currentTimeMillis(); + for (ViewConfig rawViewConfig: viewConfigMap.values()) { + if (MaterializedView.class.getCanonicalName().equals(rawViewConfig.getViewClassName())) { + VeniceView veniceView = ViewUtils.getVeniceView( + rawViewConfig.getViewClassName(), + new Properties(), + store.getName(), + rawViewConfig.getViewParameters()); + MaterializedView materializedView = (MaterializedView) veniceView; + for (String materializedViewTopicName: materializedView.getTopicNamesAndConfigsForVersion(version.getNumber()) + .keySet()) { + VeniceWriterOptions.Builder vwOptionsBuilder = + new VeniceWriterOptions.Builder(materializedViewTopicName).setUseKafkaKeySerializer(true) + .setPartitionCount(materializedView.getViewPartitionCount()); + try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(vwOptionsBuilder.build())) { + for (int p = 0; p < materializedView.getViewPartitionCount(); p++) { + veniceWriter.sendHeartbeat( + materializedViewTopicName, + p, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + true, + LeaderCompleteState.getLeaderCompleteState(true), + heartbeatTimestamp); + } + } + } + } + } + topicToLeaderCompleteTimestampMap.put(offlinePushStatus.getKafkaTopic(), heartbeatTimestamp); + } + } + + private Store getStoreOrThrow(OfflinePushStatus offlinePushStatus) { + String storeName = Version.parseStoreFromKafkaTopicName(offlinePushStatus.getKafkaTopic()); + Store store = getReadWriteStoreRepository().getStore(storeName); + if (store == null) { + LOGGER + .info("Got a null store from metadataRepository for store name: '{}'. Will attempt a refresh().", storeName); + store = getReadWriteStoreRepository().refreshOneStore(storeName); + if (store == null) { + throw new IllegalStateException( + "checkHybridPushStatus could not find a store named '" + storeName + + "' in the metadataRepository, even after refresh()!"); + } else { + LOGGER.info("metadataRepository.refresh() allowed us to retrieve store: '{}'!", storeName); + } + } + return store; + } + + private Version getVersionOrThrow(Store store, String topic) { + Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(topic)); + if (version == null) { + throw new IllegalStateException("Could not find Version object for: " + topic); + } + return version; + } + /** * This method will unsubscribe external view changes and is intended to be called when the statues are terminable. */ diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java index 94a3c38c79..419fd83b7f 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java @@ -7,6 +7,7 @@ import static com.linkedin.venice.pushmonitor.OfflinePushStatus.HELIX_ASSIGNMENT_COMPLETED; import static com.linkedin.venice.pushmonitor.OfflinePushStatus.HELIX_RESOURCE_NOT_CREATED; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; @@ -34,6 +35,7 @@ import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher; import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.DataReplicationPolicy; +import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.HybridStoreConfigImpl; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.MaterializedViewParameters; @@ -59,6 +61,7 @@ import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; +import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; @@ -1103,20 +1106,18 @@ public void testGetPushStatusAndDetails() { @Test public void testEOPReceivedProcedures() { Map viewConfigMap = new HashMap<>(); - Map viewParams = new HashMap<>(); String viewName = "testView"; int viewPartitionCount = 10; - viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name(), viewName); - viewParams - .put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(viewPartitionCount)); - viewParams.put( - MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name(), - DefaultVenicePartitioner.class.getCanonicalName()); - ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParams); + MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(viewPartitionCount); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParamsBuilder.build()); viewConfigMap.put(viewName, viewConfig); String topic = getTopic(); int versionNumber = Version.parseVersionFromKafkaTopicName(topic); - Store store = prepareMockStore(topic, VersionStatus.STARTED, viewConfigMap); + HybridStoreConfig hybridStoreConfig = + new HybridStoreConfigImpl(1, 1, 1, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); + Store store = prepareMockStore(topic, VersionStatus.STARTED, viewConfigMap, hybridStoreConfig); VeniceView veniceView = ViewUtils.getVeniceView( viewConfig.getViewClassName(), new Properties(), @@ -1159,18 +1160,100 @@ public void testEOPReceivedProcedures() { verify(mockVeniceWriter, times(1)).broadcastEndOfPush(any()); assertEquals(monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.END_OF_PUSH_RECEIVED); - // Another replica received end of push. We shouldn't write multiple EOP to view topic + // Another replica received end of push. We shouldn't write multiple EOP to view topic(s) replicaStatuses.get(1).updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED); monitor.onPartitionStatusChange(topic, partitionStatus); verify(mockVeniceWriter, times(1)).broadcastEndOfPush(any()); } - protected Store prepareMockStore(String topic, VersionStatus status, Map viewConfigMap) { + @Test + public void testLeaderCompleteStateProcedures() { + Map viewConfigMap = new HashMap<>(); + String viewName = "testView"; + int viewPartitionCount = 10; + MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(viewPartitionCount); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParamsBuilder.build()); + viewConfigMap.put(viewName, viewConfig); + String topic = getTopic(); + int versionNumber = Version.parseVersionFromKafkaTopicName(topic); + HybridStoreConfig hybridStoreConfig = + new HybridStoreConfigImpl(1, 1, 1, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); + Store store = prepareMockStore(topic, VersionStatus.STARTED, viewConfigMap, hybridStoreConfig); + VeniceView veniceView = ViewUtils.getVeniceView( + viewConfig.getViewClassName(), + new Properties(), + store.getName(), + viewConfig.getViewParameters()); + assertTrue(veniceView instanceof MaterializedView); + MaterializedView materializedView = (MaterializedView) veniceView; + String viewTopicName = + materializedView.getTopicNamesAndConfigsForVersion(versionNumber).keySet().stream().findAny().get(); + assertNotNull(viewTopicName); + monitor.startMonitorOfflinePush( + topic, + numberOfPartition, + replicationFactor, + OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION); + // Prepare the new partition status + List replicaStatuses = new ArrayList<>(); + for (int i = 0; i < replicationFactor; i++) { + ReplicaStatus replicaStatus = new ReplicaStatus("test" + i); + replicaStatuses.add(replicaStatus); + } + // All replicas are in STARTED status + ReadOnlyPartitionStatus partitionStatus = new ReadOnlyPartitionStatus(0, replicaStatuses); + doReturn(true).when(mockRoutingDataRepo).containsKafkaTopic(topic); + doReturn(new PartitionAssignment(topic, 1)).when(mockRoutingDataRepo).getPartitionAssignments(topic); + monitor.onPartitionStatusChange(topic, partitionStatus); + // Not ready to send leader complete state to view topics + verify(mockVeniceWriterFactory, never()).createVeniceWriter(any()); + verify(mockVeniceWriter, never()) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + + // One replica reports COMPLETED + replicaStatuses.get(0).updateStatus(ExecutionStatus.COMPLETED); + monitor.onPartitionStatusChange(topic, partitionStatus); + ArgumentCaptor vwOptionsCaptor = ArgumentCaptor.forClass(VeniceWriterOptions.class); + verify(mockVeniceWriterFactory, times(1)).createVeniceWriter(vwOptionsCaptor.capture()); + assertEquals(vwOptionsCaptor.getValue().getPartitionCount(), Integer.valueOf(viewPartitionCount)); + assertEquals(vwOptionsCaptor.getValue().getTopicName(), viewTopicName); + verify(mockVeniceWriter, times(10)).sendHeartbeat( + eq(viewTopicName), + anyInt(), + any(), + any(), + eq(true), + eq(LeaderCompleteState.getLeaderCompleteState(true)), + anyLong()); + // Another replica reports COMPLETED. We shouldn't write multiple leader completed heartbeat to view topic(s) + replicaStatuses.get(1).updateStatus(ExecutionStatus.COMPLETED); + monitor.onPartitionStatusChange(topic, partitionStatus); + verify(mockVeniceWriterFactory, times(1)).createVeniceWriter(any()); + verify(mockVeniceWriter, times(10)).sendHeartbeat( + eq(viewTopicName), + anyInt(), + any(), + any(), + eq(true), + eq(LeaderCompleteState.getLeaderCompleteState(true)), + anyLong()); + } + + protected Store prepareMockStore( + String topic, + VersionStatus status, + Map viewConfigMap, + HybridStoreConfig hybridStoreConfig) { String storeName = Version.parseStoreFromKafkaTopicName(topic); int versionNumber = Version.parseVersionFromKafkaTopicName(topic); Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); store.setViewConfigs(viewConfigMap); Version version = new VersionImpl(storeName, versionNumber); + if (hybridStoreConfig != null) { + version.setHybridStoreConfig(hybridStoreConfig); + } version.setStatus(status); store.addVersion(version); doReturn(store).when(mockStoreRepo).getStore(storeName); @@ -1178,7 +1261,7 @@ protected Store prepareMockStore(String topic, VersionStatus status, Map> map = new HashMap<>(); String kafkaTopic = Version.composeKafkaTopic(store.getName(), 1);