Skip to content

Commit

Permalink
Add unit tests and make VeniceViewWriter version specific (it already…
Browse files Browse the repository at this point in the history
… is implicitly)
  • Loading branch information
xunyin8 committed Nov 22, 2024
1 parent 198c3ae commit d180bd6
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,6 @@ protected CompletableFuture[] processViewWriters(
mergeConflictResult.getNewValue(),
oldValueBB,
keyBytes,
versionNumber,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.view.ChangeCaptureViewWriter;
import com.linkedin.davinci.store.view.MaterializedViewWriter;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.davinci.validation.PartitionTracker;
Expand All @@ -57,7 +56,6 @@
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.partitioner.VenicePartitioner;
Expand Down Expand Up @@ -103,6 +101,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -199,8 +198,6 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {

protected final Map<String, VeniceViewWriter> viewWriters;
protected final boolean hasChangeCaptureView;
protected final boolean hasMaterializedView;
protected final boolean hasVersionCompleted;

protected final AvroStoreDeserializerCache storeDeserializerCache;

Expand Down Expand Up @@ -331,25 +328,19 @@ public LeaderFollowerStoreIngestionTask(
version.getNumber(),
schemaRepository.getKeySchema(store.getName()).getSchema());
boolean tmpValueForHasChangeCaptureViewWriter = false;
boolean tmpValueForHasMaterializedViewWriter = false;
for (Map.Entry<String, VeniceViewWriter> viewWriter: viewWriters.entrySet()) {
if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) {
tmpValueForHasChangeCaptureViewWriter = true;
} else if (viewWriter.getValue() instanceof MaterializedViewWriter) {
tmpValueForHasMaterializedViewWriter = true;
}
if (tmpValueForHasChangeCaptureViewWriter && tmpValueForHasMaterializedViewWriter) {
if (tmpValueForHasChangeCaptureViewWriter) {
break;
}
}
hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter;
hasMaterializedView = tmpValueForHasMaterializedViewWriter;
} else {
viewWriters = Collections.emptyMap();
hasChangeCaptureView = false;
hasMaterializedView = false;
}
hasVersionCompleted = version.getStatus().equals(VersionStatus.ONLINE);
this.storeDeserializerCache = new AvroStoreDeserializerCache(
builder.getSchemaRepo(),
getStoreName(),
Expand Down Expand Up @@ -2437,38 +2428,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime);
}
/**
* For materialized view we still need to produce to the view topic when we are consuming batch data from the
* local VT for the very first time for a new version. Once the version has completed any new SIT will have its
* {@link hasVersionCompleted} set to true to prevent duplicate processing of batch records by the view writers.
* Duplicate processing is still possible in the case when leader replica crashes before the version becomes
* ONLINE, but it should be harmless since eventually it will be correct.
* TODO We need to introduce additional checkpointing and rewind for L/F transition during batch processing to
* prevent data loss in the view topic for the following scenario:
* 1. VT contains 100 batch records between SOP and EOP
* 2. Initial leader consumed to 50th record while followers consumed to 80th
* 3. Initial leader crashes and one of the followers becomes the new leader and start processing and writing
* to view topic from 80th record until ingestion is completed and version becomes ONLINE.
* 4. View topic will have a gap for batch data between 50th record and 80th record.
* Materialized view need to produce to the corresponding view topic for the batch portion of the data. This is
* achieved in the following ways:
* 1. Remote fabric(s) will leverage NR where the leader will replicate VT from NR source fabric and produce
* to local view topic(s).
* 2. NR source fabric's view topic will be produced by VPJ. This is because there is no checkpointing and
* easy way to add checkpointing for leaders consuming the local VT. Making it difficult and error prone if
* we let the leader produce to view topic(s) in NR source fabric.
*/
if (hasMaterializedView && shouldViewWritersProcessBatchRecords(partitionConsumptionState)
&& msgType == MessageType.PUT) {
WriteComputeResultWrapper writeComputeResultWrapper =
new WriteComputeResultWrapper((Put) kafkaValue.payloadUnion, null, false);
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, kafkaKey.getKey(), null, writeComputeResultWrapper);
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});
}
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
}

Expand Down Expand Up @@ -2522,6 +2489,9 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
* consumes the first message; potential message type: SOS, EOS, SOP, EOP, data message (consider server restart).
*/
case END_OF_PUSH:
// CMs that may be produced with DIV pass-through mode can break DIV without synchronization with view
// writers
checkAndWaitForLastVTProduceFuture(partitionConsumptionState);
/**
* Simply produce this EOP to local VT. It will be processed in order in the drainer queue later
* after successfully producing to kafka.
Expand Down Expand Up @@ -2568,6 +2538,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
* In such case the heartbeat is produced to VT with updated {@link LeaderMetadataWrapper}.
*/
if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) {
checkAndWaitForLastVTProduceFuture(partitionConsumptionState);
produceToLocalKafka(
consumerRecord,
partitionConsumptionState,
Expand Down Expand Up @@ -3371,7 +3342,7 @@ protected void processMessageAndMaybeProduceToKafka(
return;
}
// Write to views
if (viewWriters != null && !viewWriters.isEmpty()) {
if (!viewWriters.isEmpty()) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures =
Expand Down Expand Up @@ -3696,13 +3667,8 @@ protected void processControlMessageForViews(

// Iterate through list of views for the store and process the control message.
for (VeniceViewWriter viewWriter: viewWriters.values()) {
viewWriter.processControlMessage(
kafkaKey,
kafkaMessageEnvelope,
controlMessage,
partition,
partitionConsumptionState,
this.versionNumber);
viewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState);
}
}

Expand Down Expand Up @@ -3974,20 +3940,11 @@ protected CompletableFuture[] processViewWriters(
viewWriterFutures[index++] = writer.processRecord(
writeComputeResultWrapper.getNewPut().putValue,
keyBytes,
versionNumber,
writeComputeResultWrapper.getNewPut().schemaId);
}
return viewWriterFutures;
}

private boolean shouldViewWritersProcessBatchRecords(PartitionConsumptionState partitionConsumptionState) {
if (hasVersionCompleted) {
return false;
}
return Objects.equals(partitionConsumptionState.getLeaderFollowerState(), LEADER)
|| Objects.equals(partitionConsumptionState.getLeaderFollowerState(), IN_TRANSITION_FROM_STANDBY_TO_LEADER);
}

/**
* Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp}
* such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics
Expand All @@ -4001,4 +3958,9 @@ void reportCompleted(PartitionConsumptionState partitionConsumptionState, boolea
lastSendIngestionHeartbeatTimestamp.set(0);
}
}

private void checkAndWaitForLastVTProduceFuture(PartitionConsumptionState partitionConsumptionState)
throws ExecutionException, InterruptedException {
partitionConsumptionState.getLastVTProduceCallFuture().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public class ChangeCaptureViewWriter extends VeniceViewWriter {
public ChangeCaptureViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters) {
super(props, store, keySchema, extraViewParameters);
super(props, store, version, keySchema, extraViewParameters);
internalView = new ChangeCaptureView(props.getCombinedProperties().toProperties(), store, extraViewParameters);
kafkaClusterUrlToIdMap = props.getVeniceServerConfig().getKafkaClusterUrlToIdMap();
pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory();
Expand All @@ -63,7 +64,6 @@ public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
ByteBuffer oldValue,
byte[] key,
int version,
int newValueSchemaId,
int oldValueSchemaId,
GenericRecord replicationMetadataRecord) {
Expand All @@ -77,18 +77,14 @@ public CompletableFuture<PubSubProduceResult> processRecord(
recordChangeEvent.replicationCheckpointVector = RmdUtils.extractOffsetVectorFromRmd(replicationMetadataRecord);

if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}
// TODO: RecordChangeEvent isn't versioned today.
return veniceWriter.put(key, recordChangeEvent, 1);
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
byte[] key,
int version,
int newValueSchemaId) {
public CompletableFuture<PubSubProduceResult> processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) {
// No op
return CompletableFuture.completedFuture(null);
}
Expand All @@ -99,8 +95,7 @@ public void processControlMessage(
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState,
int version) {
PartitionConsumptionState partitionConsumptionState) {

// We only care (for now) about version swap control Messages
if (!(controlMessage.getControlMessageUnion() instanceof VersionSwap)) {
Expand Down Expand Up @@ -136,7 +131,7 @@ public void processControlMessage(

// Write the message on veniceWriter to the change capture topic
if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}

veniceWriter.sendControlMessage(
Expand Down Expand Up @@ -193,7 +188,7 @@ VeniceWriterOptions buildWriterOptions(int version) {
return setProducerOptimizations(configBuilder).build();
}

synchronized private void initializeVeniceWriter(int version) {
synchronized private void initializeVeniceWriter() {
if (veniceWriter != null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ public class MaterializedViewWriter extends VeniceViewWriter {
public MaterializedViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters,
Clock clock) {
super(props, store, keySchema, extraViewParameters);
super(props, store, version, keySchema, extraViewParameters);
pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory();
internalView = new MaterializedView(props.getCombinedProperties().toProperties(), store, extraViewParameters);
this.clock = clock;
Expand All @@ -70,31 +71,34 @@ public MaterializedViewWriter(
public MaterializedViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters) {
this(props, store, keySchema, extraViewParameters, Clock.systemUTC());
this(props, store, version, keySchema, extraViewParameters, Clock.systemUTC());
}

/**
* package private for testing purpose
*/
void setVeniceWriter(VeniceWriter veniceWriter) {
this.veniceWriter = veniceWriter;
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
ByteBuffer oldValue,
byte[] key,
int version,
int newValueSchemaId,
int oldValueSchemaId,
GenericRecord replicationMetadataRecord) {
return processRecord(newValue, key, version, newValueSchemaId);
return processRecord(newValue, key, newValueSchemaId);
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
byte[] key,
int version,
int newValueSchemaId) {
public CompletableFuture<PubSubProduceResult> processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) {
if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}
return veniceWriter.put(key, newValue.array(), newValueSchemaId);
}
Expand All @@ -105,16 +109,14 @@ public void processControlMessage(
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState,
int version) {
PartitionConsumptionState partitionConsumptionState) {
final ControlMessageType type = ControlMessageType.valueOf(controlMessage);
// Ignore other control messages for materialized view.
if (type == ControlMessageType.START_OF_SEGMENT && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) {
maybePropagateHeartbeatLowWatermarkToViewTopic(
partition,
partitionConsumptionState,
kafkaMessageEnvelope.producerMetadata.messageTimestamp,
version);
kafkaMessageEnvelope.getProducerMetadata().getMessageTimestamp());
}
}

Expand All @@ -137,7 +139,7 @@ VeniceWriterOptions buildWriterOptions(int version) {
return setProducerOptimizations(configBuilder).build();
}

synchronized private void initializeVeniceWriter(int version) {
synchronized private void initializeVeniceWriter() {
if (veniceWriter == null) {
veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null)
.createVeniceWriter(buildWriterOptions(version));
Expand All @@ -162,10 +164,9 @@ synchronized private void initializeVeniceWriter(int version) {
private void maybePropagateHeartbeatLowWatermarkToViewTopic(
int partition,
PartitionConsumptionState partitionConsumptionState,
long heartbeatTimestamp,
int version) {
long heartbeatTimestamp) {
boolean propagate = false;
long oldestHeartbeatTimestamp;
long oldestHeartbeatTimestamp = 0;
broadcastHBLock.lock();
try {
partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp);
Expand All @@ -178,14 +179,17 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic(
propagate = true;
lastHBBroadcastTimestamp = now;
}
// We have determined that the oldestHeartbeatTimestamp offers no value in monitoring the lag for this view
// topic since it's within the DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD. We are also clearing the map, so we
// don't need to worry about removing timestamps belonging to partitions that we are no longer leader of.
partitionToHeartbeatTimestampMap.clear();
}
} finally {
broadcastHBLock.unlock();
}
if (propagate) {
if (propagate && oldestHeartbeatTimestamp > 0) {
if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}
LeaderCompleteState leaderCompleteState =
LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported());
Expand All @@ -205,7 +209,7 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic(
VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER,
true,
leaderCompleteState,
heartbeatTimestamp);
oldestHeartbeatTimestamp);
heartBeatFuture.whenComplete((ignore, throwable) -> {
if (throwable != null) {
completionException.set(new CompletionException(throwable));
Expand Down
Loading

0 comments on commit d180bd6

Please sign in to comment.