Skip to content

Commit

Permalink
[server][controller] Add MaterializedViewWriter and support view writ…
Browse files Browse the repository at this point in the history
…ers in L/F

1. View writers will be invoked in L/F SIT too instead of only in A/A SIT. We rely on
view config validation to ensure views that do require A/A are only added to stores
with A/A enabled.

2. This PR only includes creation of materialized view topics, writing of data
records and control messages to the materialized view topics  in server and controller.
  - Materialized view topics are created during version creation time along with other
    view topics.
  - SOP is sent during view topic creation time with same chunking and compression
    configs as the store version.
  - EOP is sent when servers have reported EOP in every partition.
  - Incremental push control messages SOIP and EOIP are not propagated to the view topic
    for now because the end to end incremental push tracking story for view topics is
    not clear yet. Store owners will likely just disable the requirement to wait for
    view consumers to fully ingest the incremental push.
  - Ingestion heartbeats will be propagated in a broadcast manner. See implementation
    for details.
  - Version swap for CDC users will be implemented in a separate PR to keep this PR
    somewhat short for review.

3. TODO: one pending issue to be resolved is that during processing of batch records
in the native replication source fabric, where we consume local VT, a leader transfer
could result in missing records in the materialized view topic. This is because we
don't do any global checkpointing across leader and followers when consuming local VT.
  • Loading branch information
xunyin8 committed Nov 22, 2024
1 parent 0040d45 commit 198c3ae
Show file tree
Hide file tree
Showing 28 changed files with 1,121 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ protected void processMessageAndMaybeProduceToKafka(
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores

// Write to views
if (this.viewWriters.size() > 0) {
if (!this.viewWriters.isEmpty()) {
/**
* The ordering guarantees we want is the following:
*
Expand All @@ -647,24 +647,9 @@ protected void processMessageAndMaybeProduceToKafka(
* producer (but not necessarily acked).
*/
long preprocessingTime = System.currentTimeMillis();
CompletableFuture currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResult.getNewValue(),
oldValueBB,
keyBytes,
versionNumber,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
}
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, keyBytes, mergeConflictResultWrapper, null);
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
Expand Down Expand Up @@ -1512,6 +1497,33 @@ public boolean isReadyToServeAnnouncedWithRTLag() {
return false;
}

@Override
protected CompletableFuture[] processViewWriters(
PartitionConsumptionState partitionConsumptionState,
byte[] keyBytes,
MergeConflictResultWrapper mergeConflictResultWrapper,
WriteComputeResultWrapper writeComputeResultWrapper) {
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
MergeConflictResult mergeConflictResult = mergeConflictResultWrapper.getMergeConflictResult();
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResult.getNewValue(),
oldValueBB,
keyBytes,
versionNumber,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
}
return viewWriterFutures;
}

Runnable buildRepairTask(
String sourceKafkaUrl,
PubSubTopicPartition sourceTopicPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE;
import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER;
Expand All @@ -30,6 +31,7 @@
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.view.ChangeCaptureViewWriter;
import com.linkedin.davinci.store.view.MaterializedViewWriter;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.davinci.validation.PartitionTracker;
Expand All @@ -55,6 +57,7 @@
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.partitioner.VenicePartitioner;
Expand Down Expand Up @@ -196,6 +199,8 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {

protected final Map<String, VeniceViewWriter> viewWriters;
protected final boolean hasChangeCaptureView;
protected final boolean hasMaterializedView;
protected final boolean hasVersionCompleted;

protected final AvroStoreDeserializerCache storeDeserializerCache;

Expand Down Expand Up @@ -326,17 +331,25 @@ public LeaderFollowerStoreIngestionTask(
version.getNumber(),
schemaRepository.getKeySchema(store.getName()).getSchema());
boolean tmpValueForHasChangeCaptureViewWriter = false;
boolean tmpValueForHasMaterializedViewWriter = false;
for (Map.Entry<String, VeniceViewWriter> viewWriter: viewWriters.entrySet()) {
if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) {
tmpValueForHasChangeCaptureViewWriter = true;
} else if (viewWriter.getValue() instanceof MaterializedViewWriter) {
tmpValueForHasMaterializedViewWriter = true;
}
if (tmpValueForHasChangeCaptureViewWriter && tmpValueForHasMaterializedViewWriter) {
break;
}
}
hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter;
hasMaterializedView = tmpValueForHasMaterializedViewWriter;
} else {
viewWriters = Collections.emptyMap();
hasChangeCaptureView = false;
hasMaterializedView = false;
}
hasVersionCompleted = version.getStatus().equals(VersionStatus.ONLINE);
this.storeDeserializerCache = new AvroStoreDeserializerCache(
builder.getSchemaRepo(),
getStoreName(),
Expand Down Expand Up @@ -2389,7 +2402,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState);
// UPDATE message is only expected in LEADER which must be produced to kafka.
MessageType msgType = MessageType.valueOf(kafkaValue);
if (msgType == MessageType.UPDATE && !produceToLocalKafka) {
if (msgType == UPDATE && !produceToLocalKafka) {
throw new VeniceMessageException(
ingestionTaskName + " hasProducedToKafka: Received UPDATE message in non-leader for: "
+ consumerRecord.getTopicPartition() + " Offset " + consumerRecord.getOffset());
Expand Down Expand Up @@ -2423,6 +2436,39 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
partitionConsumptionState.getVeniceWriterLazyRef().ifPresent(vw -> vw.flush());
partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime);
}
/**
* For materialized view we still need to produce to the view topic when we are consuming batch data from the
* local VT for the very first time for a new version. Once the version has completed any new SIT will have its
* {@link hasVersionCompleted} set to true to prevent duplicate processing of batch records by the view writers.
* Duplicate processing is still possible in the case when leader replica crashes before the version becomes
* ONLINE, but it should be harmless since eventually it will be correct.
* TODO We need to introduce additional checkpointing and rewind for L/F transition during batch processing to
* prevent data loss in the view topic for the following scenario:
* 1. VT contains 100 batch records between SOP and EOP
* 2. Initial leader consumed to 50th record while followers consumed to 80th
* 3. Initial leader crashes and one of the followers becomes the new leader and start processing and writing
* to view topic from 80th record until ingestion is completed and version becomes ONLINE.
* 4. View topic will have a gap for batch data between 50th record and 80th record.
*/
if (hasMaterializedView && shouldViewWritersProcessBatchRecords(partitionConsumptionState)
&& msgType == MessageType.PUT) {
WriteComputeResultWrapper writeComputeResultWrapper =
new WriteComputeResultWrapper((Put) kafkaValue.payloadUnion, null, false);
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, kafkaKey.getKey(), null, writeComputeResultWrapper);
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});
}
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
}

Expand Down Expand Up @@ -3321,9 +3367,60 @@ protected void processMessageAndMaybeProduceToKafka(
beforeProcessingRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs).getWriteComputeResultWrapper();
}
if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) {
return;
}
// Write to views
if (viewWriters != null && !viewWriters.isEmpty()) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, keyBytes, null, writeComputeResultWrapper);
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
} else {
produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
}
}

Put newPut = writeComputeResultWrapper.getNewPut();
private void produceToLocalKafkaHelper(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
PartitionConsumptionState partitionConsumptionState,
WriteComputeResultWrapper writeComputeResultWrapper,
int partition,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs) {
KafkaKey kafkaKey = consumerRecord.getKey();
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
byte[] keyBytes = kafkaKey.getKey();
MessageType msgType = MessageType.valueOf(kafkaValue.messageType);
LeaderProducedRecordContext leaderProducedRecordContext;
Put newPut = writeComputeResultWrapper.getNewPut();
switch (msgType) {
case PUT:
leaderProducedRecordContext =
Expand Down Expand Up @@ -3377,10 +3474,6 @@ protected void processMessageAndMaybeProduceToKafka(
break;

case UPDATE:
if (writeComputeResultWrapper.isSkipProduce()) {
return;
}

leaderProducedRecordContext =
LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, newPut);
BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceFunction =
Expand Down Expand Up @@ -3594,15 +3687,22 @@ protected long measureRTOffsetLagForSingleRegion(
}

@Override
protected void processVersionSwapMessage(
protected void processControlMessageForViews(
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState) {

// Iterate through list of views for the store and process the control message.
for (VeniceViewWriter viewWriter: viewWriters.values()) {
// TODO: at some point, we should do this on more or all control messages potentially as we add more view types
viewWriter.processControlMessage(controlMessage, partition, partitionConsumptionState, this.versionNumber);
viewWriter.processControlMessage(
kafkaKey,
kafkaMessageEnvelope,
controlMessage,
partition,
partitionConsumptionState,
this.versionNumber);
}
}

Expand Down Expand Up @@ -3861,6 +3961,33 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
}
}

protected CompletableFuture[] processViewWriters(
PartitionConsumptionState partitionConsumptionState,
byte[] keyBytes,
MergeConflictResultWrapper mergeConflictResultWrapper,
WriteComputeResultWrapper writeComputeResultWrapper) {
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
writeComputeResultWrapper.getNewPut().putValue,
keyBytes,
versionNumber,
writeComputeResultWrapper.getNewPut().schemaId);
}
return viewWriterFutures;
}

private boolean shouldViewWritersProcessBatchRecords(PartitionConsumptionState partitionConsumptionState) {
if (hasVersionCompleted) {
return false;
}
return Objects.equals(partitionConsumptionState.getLeaderFollowerState(), LEADER)
|| Objects.equals(partitionConsumptionState.getLeaderFollowerState(), IN_TRANSITION_FROM_STANDBY_TO_LEADER);
}

/**
* Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp}
* such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2976,10 +2976,12 @@ protected void processEndOfIncrementalPush(
}

/**
* This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation in
* {@link ActiveActiveStoreIngestionTask}
* This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation in
* {@link LeaderFollowerStoreIngestionTask}
*/
protected void processVersionSwapMessage(
protected void processControlMessageForViews(
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState) {
Expand All @@ -3001,6 +3003,7 @@ protected boolean processTopicSwitch(
* offset is stale and is not updated until the very end
*/
private boolean processControlMessage(
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
Expand Down Expand Up @@ -3034,6 +3037,7 @@ private boolean processControlMessage(
break;
case START_OF_SEGMENT:
case END_OF_SEGMENT:
case VERSION_SWAP:
/**
* Nothing to do here as all the processing is being done in {@link StoreIngestionTask#delegateConsumerRecord(ConsumerRecord, int, String)}.
*/
Expand All @@ -3048,13 +3052,11 @@ private boolean processControlMessage(
checkReadyToServeAfterProcess =
processTopicSwitch(controlMessage, partition, offset, partitionConsumptionState);
break;
case VERSION_SWAP:
processVersionSwapMessage(controlMessage, partition, partitionConsumptionState);
break;
default:
throw new UnsupportedMessageTypeException(
"Unrecognized Control message type " + controlMessage.controlMessageType);
}
processControlMessageForViews(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState);
return checkReadyToServeAfterProcess;
}

Expand Down Expand Up @@ -3172,6 +3174,7 @@ private int internalProcessConsumerRecord(
? (ControlMessage) kafkaValue.payloadUnion
: (ControlMessage) leaderProducedRecordContext.getValueUnion());
checkReadyToServeAfterProcess = processControlMessage(
kafkaKey,
kafkaValue,
controlMessage,
consumerRecord.getTopicPartition().getPartitionNumber(),
Expand Down
Loading

0 comments on commit 198c3ae

Please sign in to comment.