Skip to content

Commit

Permalink
Handle leader complete state and refactoring for code dup and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Jan 14, 2025
1 parent 8c21b85 commit 3ad1097
Show file tree
Hide file tree
Showing 11 changed files with 438 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.utils.ByteArrayKey;
import com.linkedin.venice.exceptions.PersistenceFailureException;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -68,7 +67,6 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -641,40 +639,35 @@ protected void processMessageAndMaybeProduceToKafka(
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores

// Write to views
if (viewWriters != null && !this.viewWriters.isEmpty()) {
if (hasViewWriters()) {
/**
* The ordering guarantees we want is the following:
*
* 1. Write to all view topics (in parallel).
* 2. Write to the VT only after we get the ack for all views AND the previous write to VT was queued into the
* producer (but not necessarily acked).
*/
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, keyBytes, mergeConflictResultWrapper);
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats
.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
producePutOrDeleteToKafka(
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(
mergeConflictResultWrapper.getUpdatedValueBytes(),
oldValueBB,
keyBytes,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord()),
(pcs) -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
pcs,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
beforeProcessingRecordTimestampNs));
} else {
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
// after
Expand Down Expand Up @@ -1428,30 +1421,6 @@ public boolean isReadyToServeAnnouncedWithRTLag() {
return false;
}

CompletableFuture[] processViewWriters(
PartitionConsumptionState partitionConsumptionState,
byte[] keyBytes,
MergeConflictResultWrapper mergeConflictResultWrapper) {
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
MergeConflictResult mergeConflictResult = mergeConflictResultWrapper.getMergeConflictResult();
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResultWrapper.getUpdatedValueBytes(),
oldValueBB,
keyBytes,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
}
return viewWriterFutures;
}

Runnable buildRepairTask(
String sourceKafkaUrl,
PubSubTopicPartition sourceTopicPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -2554,14 +2556,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
*
* We want to ensure correct ordering for any SOS and EOS that we do decide to write to VT. This is done by
* coordinating with the corresponding {@link PartitionConsumptionState#getLastVTProduceCallFuture}.
* However, this coordination is only needed if there are view writers. i.e. the VT writes and CM writes
* need to be in the same mode. Either both coordinate with lastVTProduceCallFuture or neither.
*/
if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) {

final LeaderProducedRecordContext segmentCMLeaderProduceRecordContext = leaderProducedRecordContext;
CompletableFuture<Void> propagateSegmentCMWrite = new CompletableFuture<>();
partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> {
if (exception == null) {
produceToLocalKafka(
maybeQueueCMWritesToVersionTopic(
partitionConsumptionState,
() -> produceToLocalKafka(
consumerRecord,
partitionConsumptionState,
segmentCMLeaderProduceRecordContext,
Expand All @@ -2576,38 +2578,21 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs);
propagateSegmentCMWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
propagateSegmentCMWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(propagateSegmentCMWrite);
beforeProcessingPerRecordTimestampNs));
} else {
if (controlMessageType == START_OF_SEGMENT
&& Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
CompletableFuture<Void> propagateHeartbeatWrite = new CompletableFuture<>();
final LeaderProducedRecordContext heartbeatLeaderProducedRecordContext = leaderProducedRecordContext;
partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> {
if (exception == null) {
propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
maybeQueueCMWritesToVersionTopic(
partitionConsumptionState,
() -> propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
partitionConsumptionState,
consumerRecord,
heartbeatLeaderProducedRecordContext,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs);
propagateHeartbeatWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
propagateHeartbeatWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(propagateHeartbeatWrite);
beforeProcessingPerRecordTimestampNs));
} else {
/**
* Based on current design handling this case (specially EOS) is tricky as we don't produce the SOS/EOS
Expand Down Expand Up @@ -3384,31 +3369,19 @@ protected void processMessageAndMaybeProduceToKafka(
return;
}
// Write to views
if (viewWriters != null && !viewWriters.isEmpty()) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, keyBytes, writeComputeResultWrapper);
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
produceToLocalKafkaHelper(
if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
(pcs) -> produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
pcs,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
beforeProcessingRecordTimestampNs));
} else {
produceToLocalKafkaHelper(
consumerRecord,
Expand Down Expand Up @@ -3975,21 +3948,33 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
}
}

CompletableFuture[] processViewWriters(
protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
byte[] keyBytes,
WriteComputeResultWrapper writeComputeResultWrapper) {
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Consumer<PartitionConsumptionState> versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
writeComputeResultWrapper.getNewPut().putValue,
keyBytes,
writeComputeResultWrapper.getNewPut().schemaId);
}
return viewWriterFutures;
viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer);
}
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
versionTopicWrite.accept(partitionConsumptionState);
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});

partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
}

/**
Expand Down Expand Up @@ -4022,4 +4007,29 @@ private void checkAndWaitForLastVTProduceFuture(PartitionConsumptionState partit
throws ExecutionException, InterruptedException {
partitionConsumptionState.getLastVTProduceCallFuture().get();
}

protected boolean hasViewWriters() {
return viewWriters != null && !viewWriters.isEmpty();
}

private void maybeQueueCMWritesToVersionTopic(
PartitionConsumptionState partitionConsumptionState,
Runnable produceCall) {
if (hasViewWriters()) {
CompletableFuture<Void> propagateSegmentCMWrite = new CompletableFuture<>();
partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> {
if (exception == null) {
produceCall.run();
propagateSegmentCMWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
propagateSegmentCMWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(propagateSegmentCMWrite);
} else {
produceCall.run();
}
}
}
Loading

0 comments on commit 3ad1097

Please sign in to comment.