From d180bd65d21722b60aeee1dcc063daffbf89dad2 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Tue, 12 Nov 2024 18:31:42 -0800 Subject: [PATCH] Add unit tests and make VeniceViewWriter version specific (it already is implicitly) --- .../ActiveActiveStoreIngestionTask.java | 1 - .../LeaderFollowerStoreIngestionTask.java | 80 +++----- .../store/view/ChangeCaptureViewWriter.java | 19 +- .../store/view/MaterializedViewWriter.java | 46 ++--- .../davinci/store/view/VeniceViewWriter.java | 10 +- .../store/view/VeniceViewWriterFactory.java | 2 +- .../davinci/store/view/ViewWriterUtils.java | 5 +- .../view/ChangeCaptureViewWriterTest.java | 30 ++- .../view/MaterializedViewWriterTest.java | 177 ++++++++++++++++++ .../store/view/ViewWriterUtilsTest.java | 1 + ...java => TestMaterializedViewEndToEnd.java} | 12 +- .../linkedin/venice/view/TestViewWriter.java | 13 +- 12 files changed, 262 insertions(+), 134 deletions(-) create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java rename internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/{MaterializedViewTest.java => TestMaterializedViewEndToEnd.java} (96%) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 582591544c8..43e3a544929 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -1516,7 +1516,6 @@ protected CompletableFuture[] processViewWriters( mergeConflictResult.getNewValue(), oldValueBB, keyBytes, - versionNumber, mergeConflictResult.getValueSchemaId(), oldValueSchemaId, mergeConflictResult.getRmdRecord()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 3ff2bd51dd5..cc1856ae260 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -31,7 +31,6 @@ import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.view.ChangeCaptureViewWriter; -import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.validation.KafkaDataIntegrityValidator; import com.linkedin.davinci.validation.PartitionTracker; @@ -57,7 +56,6 @@ import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; @@ -103,6 +101,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -199,8 +198,6 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { protected final Map viewWriters; protected final boolean hasChangeCaptureView; - protected final boolean hasMaterializedView; - protected final boolean hasVersionCompleted; protected final AvroStoreDeserializerCache storeDeserializerCache; @@ -331,25 +328,19 @@ public LeaderFollowerStoreIngestionTask( version.getNumber(), schemaRepository.getKeySchema(store.getName()).getSchema()); boolean tmpValueForHasChangeCaptureViewWriter = false; - boolean tmpValueForHasMaterializedViewWriter = false; for (Map.Entry viewWriter: viewWriters.entrySet()) { if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) { tmpValueForHasChangeCaptureViewWriter = true; - } else if (viewWriter.getValue() instanceof MaterializedViewWriter) { - tmpValueForHasMaterializedViewWriter = true; } - if (tmpValueForHasChangeCaptureViewWriter && tmpValueForHasMaterializedViewWriter) { + if (tmpValueForHasChangeCaptureViewWriter) { break; } } hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter; - hasMaterializedView = tmpValueForHasMaterializedViewWriter; } else { viewWriters = Collections.emptyMap(); hasChangeCaptureView = false; - hasMaterializedView = false; } - hasVersionCompleted = version.getStatus().equals(VersionStatus.ONLINE); this.storeDeserializerCache = new AvroStoreDeserializerCache( builder.getSchemaRepo(), getStoreName(), @@ -2437,38 +2428,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime); } /** - * For materialized view we still need to produce to the view topic when we are consuming batch data from the - * local VT for the very first time for a new version. Once the version has completed any new SIT will have its - * {@link hasVersionCompleted} set to true to prevent duplicate processing of batch records by the view writers. - * Duplicate processing is still possible in the case when leader replica crashes before the version becomes - * ONLINE, but it should be harmless since eventually it will be correct. - * TODO We need to introduce additional checkpointing and rewind for L/F transition during batch processing to - * prevent data loss in the view topic for the following scenario: - * 1. VT contains 100 batch records between SOP and EOP - * 2. Initial leader consumed to 50th record while followers consumed to 80th - * 3. Initial leader crashes and one of the followers becomes the new leader and start processing and writing - * to view topic from 80th record until ingestion is completed and version becomes ONLINE. - * 4. View topic will have a gap for batch data between 50th record and 80th record. + * Materialized view need to produce to the corresponding view topic for the batch portion of the data. This is + * achieved in the following ways: + * 1. Remote fabric(s) will leverage NR where the leader will replicate VT from NR source fabric and produce + * to local view topic(s). + * 2. NR source fabric's view topic will be produced by VPJ. This is because there is no checkpointing and + * easy way to add checkpointing for leaders consuming the local VT. Making it difficult and error prone if + * we let the leader produce to view topic(s) in NR source fabric. */ - if (hasMaterializedView && shouldViewWritersProcessBatchRecords(partitionConsumptionState) - && msgType == MessageType.PUT) { - WriteComputeResultWrapper writeComputeResultWrapper = - new WriteComputeResultWrapper((Put) kafkaValue.payloadUnion, null, false); - long preprocessingTime = System.currentTimeMillis(); - CompletableFuture currentVersionTopicWrite = new CompletableFuture(); - CompletableFuture[] viewWriterFutures = - processViewWriters(partitionConsumptionState, kafkaKey.getKey(), null, writeComputeResultWrapper); - CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { - hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); - if (exception == null) { - currentVersionTopicWrite.complete(null); - } else { - VeniceException veniceException = new VeniceException(exception); - this.setIngestionException(partitionConsumptionState.getPartition(), veniceException); - currentVersionTopicWrite.completeExceptionally(veniceException); - } - }); - } return DelegateConsumerRecordResult.QUEUED_TO_DRAINER; } @@ -2522,6 +2489,9 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( * consumes the first message; potential message type: SOS, EOS, SOP, EOP, data message (consider server restart). */ case END_OF_PUSH: + // CMs that may be produced with DIV pass-through mode can break DIV without synchronization with view + // writers + checkAndWaitForLastVTProduceFuture(partitionConsumptionState); /** * Simply produce this EOP to local VT. It will be processed in order in the drainer queue later * after successfully producing to kafka. @@ -2568,6 +2538,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( * In such case the heartbeat is produced to VT with updated {@link LeaderMetadataWrapper}. */ if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) { + checkAndWaitForLastVTProduceFuture(partitionConsumptionState); produceToLocalKafka( consumerRecord, partitionConsumptionState, @@ -3371,7 +3342,7 @@ protected void processMessageAndMaybeProduceToKafka( return; } // Write to views - if (viewWriters != null && !viewWriters.isEmpty()) { + if (!viewWriters.isEmpty()) { long preprocessingTime = System.currentTimeMillis(); CompletableFuture currentVersionTopicWrite = new CompletableFuture(); CompletableFuture[] viewWriterFutures = @@ -3696,13 +3667,8 @@ protected void processControlMessageForViews( // Iterate through list of views for the store and process the control message. for (VeniceViewWriter viewWriter: viewWriters.values()) { - viewWriter.processControlMessage( - kafkaKey, - kafkaMessageEnvelope, - controlMessage, - partition, - partitionConsumptionState, - this.versionNumber); + viewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState); } } @@ -3974,20 +3940,11 @@ protected CompletableFuture[] processViewWriters( viewWriterFutures[index++] = writer.processRecord( writeComputeResultWrapper.getNewPut().putValue, keyBytes, - versionNumber, writeComputeResultWrapper.getNewPut().schemaId); } return viewWriterFutures; } - private boolean shouldViewWritersProcessBatchRecords(PartitionConsumptionState partitionConsumptionState) { - if (hasVersionCompleted) { - return false; - } - return Objects.equals(partitionConsumptionState.getLeaderFollowerState(), LEADER) - || Objects.equals(partitionConsumptionState.getLeaderFollowerState(), IN_TRANSITION_FROM_STANDBY_TO_LEADER); - } - /** * Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp} * such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics @@ -4001,4 +3958,9 @@ void reportCompleted(PartitionConsumptionState partitionConsumptionState, boolea lastSendIngestionHeartbeatTimestamp.set(0); } } + + private void checkAndWaitForLastVTProduceFuture(PartitionConsumptionState partitionConsumptionState) + throws ExecutionException, InterruptedException { + partitionConsumptionState.getLastVTProduceCallFuture().get(); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index 8cfe6a211fe..c41725cb4df 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -49,9 +49,10 @@ public class ChangeCaptureViewWriter extends VeniceViewWriter { public ChangeCaptureViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { - super(props, store, keySchema, extraViewParameters); + super(props, store, version, keySchema, extraViewParameters); internalView = new ChangeCaptureView(props.getCombinedProperties().toProperties(), store, extraViewParameters); kafkaClusterUrlToIdMap = props.getVeniceServerConfig().getKafkaClusterUrlToIdMap(); pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); @@ -63,7 +64,6 @@ public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord) { @@ -77,18 +77,14 @@ public CompletableFuture processRecord( recordChangeEvent.replicationCheckpointVector = RmdUtils.extractOffsetVectorFromRmd(replicationMetadataRecord); if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } // TODO: RecordChangeEvent isn't versioned today. return veniceWriter.put(key, recordChangeEvent, 1); } @Override - public CompletableFuture processRecord( - ByteBuffer newValue, - byte[] key, - int version, - int newValueSchemaId) { + public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { // No op return CompletableFuture.completedFuture(null); } @@ -99,8 +95,7 @@ public void processControlMessage( KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { // We only care (for now) about version swap control Messages if (!(controlMessage.getControlMessageUnion() instanceof VersionSwap)) { @@ -136,7 +131,7 @@ public void processControlMessage( // Write the message on veniceWriter to the change capture topic if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } veniceWriter.sendControlMessage( @@ -193,7 +188,7 @@ VeniceWriterOptions buildWriterOptions(int version) { return setProducerOptimizations(configBuilder).build(); } - synchronized private void initializeVeniceWriter(int version) { + synchronized private void initializeVeniceWriter() { if (veniceWriter != null) { return; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index 96e5a7ba249..bb46e5c79a1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -58,10 +58,11 @@ public class MaterializedViewWriter extends VeniceViewWriter { public MaterializedViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters, Clock clock) { - super(props, store, keySchema, extraViewParameters); + super(props, store, version, keySchema, extraViewParameters); pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); internalView = new MaterializedView(props.getCombinedProperties().toProperties(), store, extraViewParameters); this.clock = clock; @@ -70,9 +71,17 @@ public MaterializedViewWriter( public MaterializedViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { - this(props, store, keySchema, extraViewParameters, Clock.systemUTC()); + this(props, store, version, keySchema, extraViewParameters, Clock.systemUTC()); + } + + /** + * package private for testing purpose + */ + void setVeniceWriter(VeniceWriter veniceWriter) { + this.veniceWriter = veniceWriter; } @Override @@ -80,21 +89,16 @@ public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord) { - return processRecord(newValue, key, version, newValueSchemaId); + return processRecord(newValue, key, newValueSchemaId); } @Override - public CompletableFuture processRecord( - ByteBuffer newValue, - byte[] key, - int version, - int newValueSchemaId) { + public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } return veniceWriter.put(key, newValue.array(), newValueSchemaId); } @@ -105,16 +109,14 @@ public void processControlMessage( KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { final ControlMessageType type = ControlMessageType.valueOf(controlMessage); // Ignore other control messages for materialized view. if (type == ControlMessageType.START_OF_SEGMENT && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) { maybePropagateHeartbeatLowWatermarkToViewTopic( partition, partitionConsumptionState, - kafkaMessageEnvelope.producerMetadata.messageTimestamp, - version); + kafkaMessageEnvelope.getProducerMetadata().getMessageTimestamp()); } } @@ -137,7 +139,7 @@ VeniceWriterOptions buildWriterOptions(int version) { return setProducerOptimizations(configBuilder).build(); } - synchronized private void initializeVeniceWriter(int version) { + synchronized private void initializeVeniceWriter() { if (veniceWriter == null) { veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null) .createVeniceWriter(buildWriterOptions(version)); @@ -162,10 +164,9 @@ synchronized private void initializeVeniceWriter(int version) { private void maybePropagateHeartbeatLowWatermarkToViewTopic( int partition, PartitionConsumptionState partitionConsumptionState, - long heartbeatTimestamp, - int version) { + long heartbeatTimestamp) { boolean propagate = false; - long oldestHeartbeatTimestamp; + long oldestHeartbeatTimestamp = 0; broadcastHBLock.lock(); try { partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp); @@ -178,14 +179,17 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic( propagate = true; lastHBBroadcastTimestamp = now; } + // We have determined that the oldestHeartbeatTimestamp offers no value in monitoring the lag for this view + // topic since it's within the DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD. We are also clearing the map, so we + // don't need to worry about removing timestamps belonging to partitions that we are no longer leader of. partitionToHeartbeatTimestampMap.clear(); } } finally { broadcastHBLock.unlock(); } - if (propagate) { + if (propagate && oldestHeartbeatTimestamp > 0) { if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } LeaderCompleteState leaderCompleteState = LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported()); @@ -205,7 +209,7 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic( VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, true, leaderCompleteState, - heartbeatTimestamp); + oldestHeartbeatTimestamp); heartBeatFuture.whenComplete((ignore, throwable) -> { if (throwable != null) { completionException.set(new CompletionException(throwable)); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 3a02a435a7d..d28755c7f3d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -28,12 +28,16 @@ * view implementations. */ public abstract class VeniceViewWriter extends VeniceView { + protected final int version; + public VeniceViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { super(props.getCombinedProperties().toProperties(), store, extraViewParameters); + this.version = version; } /** @@ -52,7 +56,6 @@ public abstract CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord); @@ -69,7 +72,6 @@ public abstract CompletableFuture processRecord( public abstract CompletableFuture processRecord( ByteBuffer newValue, byte[] key, - int version, int newValueSchemaId); /** @@ -85,15 +87,13 @@ public abstract CompletableFuture processRecord( * @param controlMessage the control message we're processing * @param partition the partition this control message was delivered to * @param partitionConsumptionState the pcs of the consuming node - * @param version the store version that received this message */ public void processControlMessage( KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { // Optionally act on Control Message } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java index 0e0bc514d74..20d0b18af1d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java @@ -24,7 +24,7 @@ public Map buildStoreViewWriters(Store store, int vers String className = viewConfig.getValue().getViewClassName(); Map extraParams = viewConfig.getValue().getViewParameters(); VeniceViewWriter viewWriter = - ViewWriterUtils.getVeniceViewWriter(className, properties, store, keySchema, extraParams); + ViewWriterUtils.getVeniceViewWriter(className, properties, store, version, keySchema, extraParams); storeViewWriters.put(viewConfig.getKey(), viewWriter); } return storeViewWriters; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java index b0151e89108..4a63abb84df 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java @@ -15,6 +15,7 @@ public static VeniceViewWriter getVeniceViewWriter( String viewClass, VeniceConfigLoader configLoader, Store store, + int version, Schema keySchema, Map extraViewParameters) { Properties params = configLoader.getCombinedProperties().toProperties(); @@ -25,8 +26,8 @@ public static VeniceViewWriter getVeniceViewWriter( VeniceViewWriter viewWriter = ReflectUtils.callConstructor( ReflectUtils.loadClass(view.getWriterClassName()), - new Class[] { VeniceConfigLoader.class, Store.class, Schema.class, Map.class }, - new Object[] { configLoader, store, keySchema, extraViewParameters }); + new Class[] { VeniceConfigLoader.class, Store.class, Integer.TYPE, Schema.class, Map.class }, + new Object[] { configLoader, store, version, keySchema, extraViewParameters }); return viewWriter; } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java index 644159a7c61..194e80dfe52 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java @@ -111,7 +111,7 @@ public void testConstructVersionSwapMessage() { // Build the change capture writer and set the mock writer ChangeCaptureViewWriter changeCaptureViewWriter = - new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap()); + new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap()); changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter); // Verify that we never produce the version swap from a follower replica @@ -120,8 +120,7 @@ public void testConstructVersionSwapMessage() { kafkaMessageEnvelope, controlMessage, 1, - mockFollowerPartitionConsumptionState, - 1); + mockFollowerPartitionConsumptionState); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); @@ -133,8 +132,7 @@ public void testConstructVersionSwapMessage() { kafkaMessageEnvelope, ignoredControlMessage, 1, - mockLeaderPartitionConsumptionState, - 1); + mockLeaderPartitionConsumptionState); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); @@ -148,18 +146,12 @@ public void testConstructVersionSwapMessage() { kafkaMessageEnvelope, ignoredControlMessage, 1, - mockLeaderPartitionConsumptionState, - 1); + mockLeaderPartitionConsumptionState); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); - changeCaptureViewWriter.processControlMessage( - kafkaKey, - kafkaMessageEnvelope, - controlMessage, - 1, - mockLeaderPartitionConsumptionState, - 1); + changeCaptureViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, mockLeaderPartitionConsumptionState); ArgumentCaptor messageArgumentCaptor = ArgumentCaptor.forClass(ControlMessage.class); // Verify and capture input @@ -210,7 +202,7 @@ public void testBuildWriterOptions() { Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); ChangeCaptureViewWriter changeCaptureViewWriter = - new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap()); + new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap()); VeniceWriterOptions writerOptions = changeCaptureViewWriter.buildWriterOptions(1); @@ -245,7 +237,7 @@ public void testProcessRecord() throws ExecutionException, InterruptedException Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); ChangeCaptureViewWriter changeCaptureViewWriter = - new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap()); + new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap()); Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(SCHEMA, 1); List vectors = Arrays.asList(1L, 2L, 3L); @@ -255,13 +247,13 @@ public void testProcessRecord() throws ExecutionException, InterruptedException changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter); // Update Case - changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); // Insert Case - changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); // Deletion Case - changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); // Set up argument captors ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java new file mode 100644 index 00000000000..be993ac33c2 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -0,0 +1,177 @@ +package com.linkedin.davinci.store.view; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.davinci.config.VeniceConfigLoader; +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.partitioner.DefaultVenicePartitioner; +import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.time.Clock; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.mockito.ArgumentCaptor; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class MaterializedViewWriterTest { + private static final Schema SCHEMA = AvroCompatibilityHelper.parse("\"string\""); + + @Test + public void testBuildWriterOptions() { + String storeName = "testStore"; + String viewName = "testMaterializedView"; + Version version = mock(Version.class); + doReturn(true).when(version).isChunkingEnabled(); + doReturn(true).when(version).isRmdChunkingEnabled(); + Store store = getMockStore(storeName, 1, version); + doReturn(true).when(store).isNearlineProducerCompressionEnabled(); + doReturn(3).when(store).getNearlineProducerCountPerWriter(); + ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(6); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + Map viewParamsMap = viewParamsBuilder.build(); + VeniceConfigLoader props = getMockProps(); + MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, store, 1, SCHEMA, viewParamsMap); + VeniceWriterOptions writerOptions = materializedViewWriter.buildWriterOptions(1); + Assert.assertEquals( + writerOptions.getTopicName(), + Version.composeKafkaTopic(storeName, 1) + VeniceView.VIEW_TOPIC_SEPARATOR + viewName + + MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX); + Assert.assertEquals(writerOptions.getPartitionCount(), Integer.valueOf(6)); + Assert.assertEquals(writerOptions.getPartitioner().getClass(), DefaultVenicePartitioner.class); + Assert.assertEquals(writerOptions.getProducerCount(), 3); + Assert.assertTrue(writerOptions.isProducerCompressionEnabled()); + } + + @Test + public void testProcessIngestionHeartbeat() { + String storeName = "testStore"; + String viewName = "testMaterializedView"; + Version version = mock(Version.class); + doReturn(true).when(version).isChunkingEnabled(); + doReturn(true).when(version).isRmdChunkingEnabled(); + Store store = getMockStore(storeName, 1, version); + ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(6); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + Map viewParamsMap = viewParamsBuilder.build(); + VeniceConfigLoader props = getMockProps(); + Clock clock = mock(Clock.class); + long startTime = System.currentTimeMillis(); + doReturn(startTime).when(clock).millis(); + MaterializedViewWriter materializedViewWriter = + new MaterializedViewWriter(props, store, 1, SCHEMA, viewParamsMap, clock); + materializedViewWriter.buildWriterOptions(1); + ControlMessage controlMessage = new ControlMessage(); + controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); + KafkaKey kafkaKey = mock(KafkaKey.class); + doReturn(KafkaKey.HEART_BEAT.getKey()).when(kafkaKey).getKey(); + VeniceWriter veniceWriter = mock(VeniceWriter.class); + when(veniceWriter.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(null)); + doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + materializedViewWriter.setVeniceWriter(veniceWriter); + KafkaMessageEnvelope kafkaMessageEnvelope = mock(KafkaMessageEnvelope.class); + ProducerMetadata producerMetadata = mock(ProducerMetadata.class); + doReturn(producerMetadata).when(kafkaMessageEnvelope).getProducerMetadata(); + doReturn(startTime).when(producerMetadata).getMessageTimestamp(); + PartitionConsumptionState partitionConsumptionState = mock(PartitionConsumptionState.class); + doReturn(true).when(partitionConsumptionState).isCompletionReported(); + + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + long newTime = startTime + TimeUnit.MINUTES.toMillis(4); + doReturn(newTime).when(clock).millis(); + doReturn(startTime + TimeUnit.MINUTES.toMillis(1)).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + // We still don't expect any broadcast from partition 1 leader because staleness is within 5 minutes + verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + doReturn(newTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 0, partitionConsumptionState); + // Partition 0's leader should broadcast based on last broadcast timestamp (0) regardless of staleness threshold + ArgumentCaptor heartbeatTimestampCaptor = ArgumentCaptor.forClass(Long.class); + verify(veniceWriter, times(6)) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); + // The low watermark for this materialized view writer should be the latest heartbeat stamp received by partition 0 + // since the low watermark from partition 1 was ignored due to DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD + for (Long timestamp: heartbeatTimestampCaptor.getAllValues()) { + Assert.assertEquals(timestamp, Long.valueOf(newTime)); + } + newTime = newTime + TimeUnit.SECONDS.toMillis(30); + doReturn(newTime).when(clock).millis(); + doReturn(startTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 2, partitionConsumptionState); + doReturn(newTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 0, partitionConsumptionState); + // No new broadcast since it's still within DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS (1 minute) since last broadcast. + verify(veniceWriter, times(6)) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); + newTime = newTime + TimeUnit.MINUTES.toMillis(3); + doReturn(newTime).when(clock).millis(); + doReturn(newTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + // We should broadcast the stale heartbeat timestamp from partition 2 since it's > than the reporting threshold + verify(veniceWriter, times(12)) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); + Assert.assertEquals(heartbeatTimestampCaptor.getValue(), Long.valueOf(startTime)); + } + + private VeniceConfigLoader getMockProps() { + VeniceConfigLoader props = mock(VeniceConfigLoader.class); + VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); + PubSubClientsFactory pubSubClientsFactory = mock(PubSubClientsFactory.class); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + doReturn(pubSubProducerAdapterFactory).when(pubSubClientsFactory).getProducerAdapterFactory(); + doReturn(pubSubClientsFactory).when(serverConfig).getPubSubClientsFactory(); + doReturn(serverConfig).when(props).getVeniceServerConfig(); + VeniceProperties veniceProperties = new VeniceProperties(new Properties()); + doReturn(veniceProperties).when(props).getCombinedProperties(); + return props; + } + + private Store getMockStore(String storeName, int versionNumber, Version version) { + Store store = mock(Store.class); + doReturn(storeName).when(store).getName(); + doReturn(version).when(store).getVersionOrThrow(versionNumber); + return store; + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java index 9afe5a17270..5a180f7ed41 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java @@ -55,6 +55,7 @@ public void testGetVeniceViewWriter() { ChangeCaptureView.class.getCanonicalName(), mockVeniceConfigLoader, mockStore, + 1, SCHEMA, Collections.EMPTY_MAP); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java similarity index 96% rename from internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java rename to internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java index 7f172acab1f..a710c19e4b0 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MaterializedViewTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java @@ -43,7 +43,7 @@ import org.testng.annotations.Test; -public class MaterializedViewTest { +public class TestMaterializedViewEndToEnd { private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE; private static final String[] CLUSTER_NAMES = IntStream.range(0, 1).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); @@ -62,13 +62,13 @@ public void setUp() { CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + DEFAULT_PARENT_DATA_CENTER_REGION_NAME, "localhost:" + TestUtils.getFreePort()); multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + 2, 1, 1, 1, + 2, 1, - 1, - 1, - 1, + 2, Optional.empty(), Optional.empty(), Optional.of(serverProperties), @@ -144,7 +144,9 @@ public void testLFIngestionWithMaterializedView() throws IOException { versionTopicRecords += endOffset; } Assert.assertTrue(versionTopicRecords > 100, "Version topic records size: " + versionTopicRecords); - Assert.assertTrue(records > 100, "View topic records size: " + records); + if (!veniceClusterWrapper.getRegionName().equals(childDatacenters.get(0).getRegionName())) { + Assert.assertTrue(records > 100, "View topic records size: " + records); + } } } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index f429fe9e59e..34776b2184f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -25,9 +25,10 @@ public class TestViewWriter extends VeniceViewWriter { public TestViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { - super(props, store, keySchema, extraViewParameters); + super(props, store, version, keySchema, extraViewParameters); internalView = new TestView(props.getCombinedProperties().toProperties(), store, extraViewParameters); } @@ -36,7 +37,6 @@ public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord) { @@ -46,11 +46,7 @@ public CompletableFuture processRecord( } @Override - public CompletableFuture processRecord( - ByteBuffer newValue, - byte[] key, - int version, - int newValueSchemaId) { + public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { internalView.incrementRecordCount(store.getName()); return CompletableFuture.completedFuture(null); } @@ -61,8 +57,7 @@ public void processControlMessage( KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { // TODO: The below logic only operates on VersionSwap. We might want to augment this // logic to handle other control messages.