Skip to content

Commit

Permalink
Added CompositeVeniceWriter to write batch records from VPJ in NR sou…
Browse files Browse the repository at this point in the history
…rce fabric
  • Loading branch information
xunyin8 committed Nov 24, 2024
1 parent d180bd6 commit 6b541a1
Show file tree
Hide file tree
Showing 29 changed files with 544 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2489,8 +2489,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
* consumes the first message; potential message type: SOS, EOS, SOP, EOP, data message (consider server restart).
*/
case END_OF_PUSH:
// CMs that may be produced with DIV pass-through mode can break DIV without synchronization with view
// writers
// CMs that are produced with DIV pass-through mode can break DIV without synchronization with view writers
checkAndWaitForLastVTProduceFuture(partitionConsumptionState);
/**
* Simply produce this EOP to local VT. It will be processed in order in the drainer queue later
Expand Down Expand Up @@ -2558,6 +2557,8 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
} else {
if (controlMessageType == START_OF_SEGMENT
&& Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
// We also want to synchronize with view writers for heartbeat CMs, so we can detect hanging VWs
checkAndWaitForLastVTProduceFuture(partitionConsumptionState);
propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
partitionConsumptionState,
consumerRecord,
Expand Down Expand Up @@ -3344,7 +3345,7 @@ protected void processMessageAndMaybeProduceToKafka(
// Write to views
if (!viewWriters.isEmpty()) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, keyBytes, null, writeComputeResultWrapper);
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,10 @@
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.views.ChangeCaptureView;
import com.linkedin.venice.writer.VeniceWriter;
Expand All @@ -45,18 +39,23 @@ public class ChangeCaptureViewWriter extends VeniceViewWriter {
private final int maxColoIdValue;

private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory;
private final String changeCaptureTopicName;

public ChangeCaptureViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Version version,
Schema keySchema,
Map<String, String> extraViewParameters) {
super(props, store, version, keySchema, extraViewParameters);
internalView = new ChangeCaptureView(props.getCombinedProperties().toProperties(), store, extraViewParameters);
super(props, version, keySchema, extraViewParameters);
internalView = new ChangeCaptureView(
props.getCombinedProperties().toProperties(),
version.getStoreName(),
extraViewParameters);
kafkaClusterUrlToIdMap = props.getVeniceServerConfig().getKafkaClusterUrlToIdMap();
pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory();
maxColoIdValue = kafkaClusterUrlToIdMap.values().stream().max(Integer::compareTo).orElse(-1);
changeCaptureTopicName =
this.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get();
}

@Override
Expand Down Expand Up @@ -111,7 +110,8 @@ public void processControlMessage(
VersionSwap versionSwapMessage = (VersionSwap) controlMessage.getControlMessageUnion();

// Only the version we're transiting FROM needs to populate the topic switch message into the change capture topic
if (Version.parseVersionFromVersionTopicName(versionSwapMessage.oldServingVersionTopic.toString()) != version) {
if (Version
.parseVersionFromVersionTopicName(versionSwapMessage.oldServingVersionTopic.toString()) != versionNumber) {
return;
}

Expand Down Expand Up @@ -165,35 +165,16 @@ void setVeniceWriter(VeniceWriter veniceWriter) {
this.veniceWriter = veniceWriter;
}

VeniceWriterOptions buildWriterOptions(int version) {
String changeCaptureTopicName = this.getTopicNamesAndConfigsForVersion(version).keySet().stream().findAny().get();

// Build key/value Serializers for the kafka producer
VeniceWriterOptions.Builder configBuilder = new VeniceWriterOptions.Builder(changeCaptureTopicName);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(RecordChangeEvent.getClassSchema());
configBuilder.setValueSerializer(valueSerializer);

// Set writer properties based on the store version config
Version storeVersionConfig = store.getVersionOrThrow(version);
PartitionerConfig partitionerConfig = storeVersionConfig.getPartitionerConfig();

if (partitionerConfig != null) {
// TODO: It would make sense to give the option to set a different partitioner for this view. Might
// want to consider adding it as a param available to this view type.
VenicePartitioner venicePartitioner = PartitionUtils.getVenicePartitioner(partitionerConfig);
configBuilder.setPartitioner(venicePartitioner);
}

configBuilder.setChunkingEnabled(storeVersionConfig.isChunkingEnabled());
return setProducerOptimizations(configBuilder).build();
VeniceWriterOptions buildWriterOptions() {
return internalView.getWriterOptionsBuilder(changeCaptureTopicName, version).build();
}

synchronized private void initializeVeniceWriter() {
if (veniceWriter != null) {
return;
}
veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null)
.createVeniceWriter(buildWriterOptions(version));
veniceWriter =
new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null).createVeniceWriter(buildWriterOptions());
}

private ValueBytes constructValueBytes(ByteBuffer value, int schemaId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
Expand Down Expand Up @@ -41,8 +40,8 @@ public class MaterializedViewWriter extends VeniceViewWriter {
private final ReentrantLock broadcastHBLock = new ReentrantLock();
private final Map<Integer, Long> partitionToHeartbeatTimestampMap = new HashMap<>();
private final Clock clock;
private final String materializedViewTopicName;
private VeniceWriter veniceWriter;
private String materializedViewTopicName;
private long lastHBBroadcastTimestamp;

/**
Expand All @@ -57,24 +56,25 @@ public class MaterializedViewWriter extends VeniceViewWriter {

public MaterializedViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Version version,
Schema keySchema,
Map<String, String> extraViewParameters,
Clock clock) {
super(props, store, version, keySchema, extraViewParameters);
super(props, version, keySchema, extraViewParameters);
pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory();
internalView = new MaterializedView(props.getCombinedProperties().toProperties(), store, extraViewParameters);
internalView =
new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters);
materializedViewTopicName =
internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get();
this.clock = clock;
}

public MaterializedViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Version version,
Schema keySchema,
Map<String, String> extraViewParameters) {
this(props, store, version, keySchema, extraViewParameters, Clock.systemUTC());
this(props, version, keySchema, extraViewParameters, Clock.systemUTC());
}

/**
Expand Down Expand Up @@ -125,24 +125,15 @@ public String getWriterClassName() {
return internalView.getWriterClassName();
}

// package private for testing purposes.
VeniceWriterOptions buildWriterOptions(int version) {
// We need to change this and have a map of writers if one materialized view will have many topics.
materializedViewTopicName =
internalView.getTopicNamesAndConfigsForVersion(version).keySet().stream().findAny().get();
VeniceWriterOptions.Builder configBuilder = new VeniceWriterOptions.Builder(materializedViewTopicName);
Version storeVersionConfig = store.getVersionOrThrow(version);
configBuilder.setPartitionCount(internalView.getViewPartitionCount());
configBuilder.setChunkingEnabled(storeVersionConfig.isChunkingEnabled());
configBuilder.setRmdChunkingEnabled(storeVersionConfig.isRmdChunkingEnabled());
configBuilder.setPartitioner(internalView.getViewPartitioner());
return setProducerOptimizations(configBuilder).build();
// Package private for testing
VeniceWriterOptions buildWriterOptions() {
return setProducerOptimizations(internalView.getWriterOptionsBuilder(materializedViewTopicName, version)).build();
}

synchronized private void initializeVeniceWriter() {
if (veniceWriter == null) {
veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null)
.createVeniceWriter(buildWriterOptions(version));
veniceWriter =
new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null).createVeniceWriter(buildWriterOptions());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -28,16 +30,19 @@
* view implementations.
*/
public abstract class VeniceViewWriter extends VeniceView {
protected final int version;
protected final Version version;
protected final int versionNumber;
protected Optional<Boolean> isNearlineProducerCompressionEnabled = Optional.empty();
protected Optional<Integer> nearlineProducerCountPerWriter = Optional.empty();

public VeniceViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Version version,
Schema keySchema,
Map<String, String> extraViewParameters) {
super(props.getCombinedProperties().toProperties(), store, extraViewParameters);
super(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters);
this.version = version;
this.versionNumber = version.getNumber();
}

/**
Expand Down Expand Up @@ -97,6 +102,15 @@ public void processControlMessage(
// Optionally act on Control Message
}

/**
* Configure view writer options based on the configs of the provided Store
* @param store to extract the relevant configs from
*/
public void configureWriterOptions(Store store) {
isNearlineProducerCompressionEnabled = Optional.of(store.isNearlineProducerCompressionEnabled());
nearlineProducerCountPerWriter = Optional.of(store.getNearlineProducerCountPerWriter());
}

/**
* A store could have many views and to reduce the impact to write throughput we want to check and enable producer
* optimizations that can be configured at the store level. To change the producer optimization configs the ingestion
Expand All @@ -106,7 +120,8 @@ public void processControlMessage(
* @return
*/
protected VeniceWriterOptions.Builder setProducerOptimizations(VeniceWriterOptions.Builder configBuilder) {
return configBuilder.setProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled())
.setProducerCount(store.getNearlineProducerCountPerWriter());
isNearlineProducerCompressionEnabled.ifPresent(configBuilder::setProducerCompressionEnabled);
nearlineProducerCountPerWriter.ifPresent(configBuilder::setProducerCount);
return configBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public Map<String, VeniceViewWriter> buildStoreViewWriters(Store store, int vers
Map<String, String> extraParams = viewConfig.getValue().getViewParameters();
VeniceViewWriter viewWriter =
ViewWriterUtils.getVeniceViewWriter(className, properties, store, version, keySchema, extraParams);
viewWriter.configureWriterOptions(store);
storeViewWriters.put(viewConfig.getKey(), viewWriter);
}
return storeViewWriters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.ReflectUtils;
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.views.ViewUtils;
Expand All @@ -21,13 +22,13 @@ public static VeniceViewWriter getVeniceViewWriter(
Properties params = configLoader.getCombinedProperties().toProperties();
VeniceView view = ReflectUtils.callConstructor(
ReflectUtils.loadClass(viewClass),
new Class<?>[] { Properties.class, Store.class, Map.class },
new Object[] { params, store, extraViewParameters });
new Class<?>[] { Properties.class, String.class, Map.class },
new Object[] { params, store.getName(), extraViewParameters });

VeniceViewWriter viewWriter = ReflectUtils.callConstructor(
ReflectUtils.loadClass(view.getWriterClassName()),
new Class<?>[] { VeniceConfigLoader.class, Store.class, Integer.TYPE, Schema.class, Map.class },
new Object[] { configLoader, store, version, keySchema, extraViewParameters });
new Class<?>[] { VeniceConfigLoader.class, Version.class, Schema.class, Map.class },
new Object[] { configLoader, store.getVersionOrThrow(version), keySchema, extraViewParameters });

return viewWriter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testConstructVersionSwapMessage() {
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageUnion = versionSwapMessage;

Store mockStore = mock(Store.class);
Version version = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID);
VeniceProperties props = VeniceProperties.empty();
Object2IntMap<String> urlMappingMap = new Object2IntOpenHashMap<>();
// Add ID's to the region's to name the sort order of the RMD
Expand All @@ -111,7 +111,7 @@ public void testConstructVersionSwapMessage() {

// Build the change capture writer and set the mock writer
ChangeCaptureViewWriter changeCaptureViewWriter =
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap());
new ChangeCaptureViewWriter(mockVeniceConfigLoader, version, SCHEMA, Collections.emptyMap());
changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter);

// Verify that we never produce the version swap from a follower replica
Expand Down Expand Up @@ -202,9 +202,9 @@ public void testBuildWriterOptions() {
Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig);

ChangeCaptureViewWriter changeCaptureViewWriter =
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap());
new ChangeCaptureViewWriter(mockVeniceConfigLoader, version, SCHEMA, Collections.emptyMap());

VeniceWriterOptions writerOptions = changeCaptureViewWriter.buildWriterOptions(1);
VeniceWriterOptions writerOptions = changeCaptureViewWriter.buildWriterOptions();

Assert
.assertEquals(writerOptions.getTopicName(), STORE_NAME + "_v1" + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
Expand All @@ -217,7 +217,7 @@ public void testBuildWriterOptions() {
@Test
public void testProcessRecord() throws ExecutionException, InterruptedException {
// Set up mocks
Store mockStore = mock(Store.class);
Version version = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID);
VeniceProperties props = VeniceProperties.empty();
Object2IntMap<String> urlMappingMap = new Object2IntOpenHashMap<>();
CompletableFuture<PubSubProduceResult> mockFuture = mock(CompletableFuture.class);
Expand All @@ -237,7 +237,7 @@ public void testProcessRecord() throws ExecutionException, InterruptedException
Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig);

ChangeCaptureViewWriter changeCaptureViewWriter =
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap());
new ChangeCaptureViewWriter(mockVeniceConfigLoader, version, SCHEMA, Collections.emptyMap());

Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(SCHEMA, 1);
List<Long> vectors = Arrays.asList(1L, 2L, 3L);
Expand Down
Loading

0 comments on commit 6b541a1

Please sign in to comment.