Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[duckdb] Cleanup and refactor DVRT interface to be more configurable #1445

Merged
merged 12 commits into from
Jan 16, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import com.linkedin.davinci.blobtransfer.BlobTransferManager;
import com.linkedin.davinci.blobtransfer.BlobTransferUtil;
import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface;
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.StoreBackendConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
Expand Down Expand Up @@ -121,7 +121,7 @@ public DaVinciBackend(
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
DaVinciRecordTransformerConfig recordTransformerConfig) {
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();
Expand Down Expand Up @@ -270,7 +270,7 @@ public DaVinciBackend(
false,
compressorFactory,
cacheBackend,
recordTransformerFunction,
recordTransformerConfig,
true,
// TODO: consider how/if a repair task would be valid for Davinci users?
null,
Expand All @@ -293,7 +293,7 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerFunction != null) {
if (recordTransformerConfig != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ protected void initBackend(
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
DaVinciRecordTransformerConfig recordTransformerConfig) {
synchronized (AvroGenericDaVinciClient.class) {
if (daVinciBackend == null) {
logger
Expand All @@ -744,7 +744,7 @@ protected void initBackend(
managedClients,
icProvider,
cacheConfig,
recordTransformerFunction),
recordTransformerConfig),
backend -> {
// Ensure that existing backend is fully closed before a new one can be created.
synchronized (AvroGenericDaVinciClient.class) {
Expand Down Expand Up @@ -782,13 +782,7 @@ public synchronized void start() {
logger.info("Starting client, storeName=" + getStoreName());
VeniceConfigLoader configLoader = buildVeniceConfig();
Optional<ObjectCacheConfig> cacheConfig = Optional.ofNullable(daVinciConfig.getCacheConfig());
initBackend(
clientConfig,
configLoader,
managedClients,
icProvider,
cacheConfig,
daVinciConfig.getRecordTransformerFunction());
initBackend(clientConfig, configLoader, managedClients, icProvider, cacheConfig, recordTransformerConfig);

try {
getBackend().verifyCacheConfigEquality(daVinciConfig.getCacheConfig(), getStoreName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ public class BlockingDaVinciRecordTransformer<K, V, O> extends DaVinciRecordTran
private final DaVinciRecordTransformer recordTransformer;
private final CountDownLatch startLatch = new CountDownLatch(1);

public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer, boolean storeRecordsInDaVinci) {
super(recordTransformer.getStoreVersion(), storeRecordsInDaVinci);
public BlockingDaVinciRecordTransformer(
DaVinciRecordTransformer recordTransformer,
Schema keySchema,
Schema inputValueSchema,
Schema outputValueSchema,
boolean storeRecordsInDaVinci) {
super(recordTransformer.getStoreVersion(), keySchema, inputValueSchema, outputValueSchema, storeRecordsInDaVinci);
this.recordTransformer = recordTransformer;
}

public Schema getKeySchema() {
return this.recordTransformer.getKeySchema();
}

public Schema getOutputValueSchema() {
return this.recordTransformer.getOutputValueSchema();
}

public DaVinciRecordTransformerResult<O> transform(Lazy<K> key, Lazy<V> value) {
return this.recordTransformer.transform(key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,6 @@ public DaVinciRecordTransformerConfig getRecordTransformerConfig() {
return recordTransformerConfig;
}

public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) {
if (recordTransformerConfig == null) {
return null;
}
return recordTransformerConfig.getRecordTransformer(storeVersion);
}

public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction() {
if (recordTransformerConfig == null) {
return null;
}
return recordTransformerConfig.getRecordTransformerFunction();
}

public boolean isReadMetricsEnabled() {
return readMetricsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,46 @@ public abstract class DaVinciRecordTransformer<K, V, O> {
*/
private final boolean storeRecordsInDaVinci;

/**
* The key schema, which is immutable.
*/
private final Schema keySchema;
ZacAttack marked this conversation as resolved.
Show resolved Hide resolved

/**
* The value schema before transformation, which is provided by the DaVinciClient.
*/
private final Schema inputValueSchema;

/**
* The value schema after transformation, which is provided by the user.
*/
private final Schema outputValueSchema;

private final DaVinciRecordTransformerUtility<K, O> recordTransformerUtility;

/**
* @param storeVersion the version of the store
* @param keySchema the key schema, which is immutable
* @param inputValueSchema the value schema before transformation
* @param outputValueSchema the value schema after transformation
* @param storeRecordsInDaVinci set this to false if you intend to store records in a custom storage,
* and not in the Da Vinci Client.
*/
public DaVinciRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) {
* and not in the Da Vinci Client
*/
public DaVinciRecordTransformer(
int storeVersion,
Schema keySchema,
Schema inputValueSchema,
Schema outputValueSchema,
boolean storeRecordsInDaVinci) {
this.storeVersion = storeVersion;
this.storeRecordsInDaVinci = storeRecordsInDaVinci;
this.keySchema = keySchema;
// ToDo: Make use of inputValueSchema to support reader/writer schemas
this.inputValueSchema = inputValueSchema;
this.outputValueSchema = outputValueSchema;
this.recordTransformerUtility = new DaVinciRecordTransformerUtility<>(this);
}

/**
* Returns the schema for the key used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link K}.
*/
public abstract Schema getKeySchema();

/**
* Returns the schema for the output value used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link O}.
*/
public abstract Schema getOutputValueSchema();

/**
* Implement this method to transform records before they are stored.
* This can be useful for tasks such as filtering out unused fields to save storage space.
Expand Down Expand Up @@ -206,6 +219,33 @@ public final boolean getStoreRecordsInDaVinci() {
return storeRecordsInDaVinci;
}

/**
* Returns the schema for the key used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link K}.
*/
public final Schema getKeySchema() {
return keySchema;
}

/**
* Returns the schema for the input value used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link V}.
*/
public final Schema getInputValueSchema() {
return inputValueSchema;
}

/**
* Returns the schema for the output value used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link O}.
*/
public final Schema getOutputValueSchema() {
return outputValueSchema;
}

/**
* @return {@link #recordTransformerUtility}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction(
return recordTransformerFunction;
}

/**
* @param storeVersion the store version
* @return a new {@link DaVinciRecordTransformer}
*/
public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) {
return recordTransformerFunction.apply(storeVersion);
}

/**
* @return {@link #outputValueClass}
*/
Expand All @@ -53,5 +45,4 @@ public Class getOutputValueClass() {
public Schema getOutputValueSchema() {
return outputValueSchema;
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.linkedin.davinci.client;

import org.apache.avro.Schema;


/**
* This describes the implementation for the functional interface of {@link DaVinciRecordTransformer}
*/

@FunctionalInterface
public interface DaVinciRecordTransformerFunctionalInterface {
DaVinciRecordTransformer apply(Integer storeVersion);
DaVinciRecordTransformer apply(
Integer storeVersion,
Schema keySchema,
Schema inputValueSchema,
Schema outputValueSchema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@
*/
public class DaVinciRecordTransformerUtility<K, O> {
private final DaVinciRecordTransformer recordTransformer;
private AvroGenericDeserializer<K> keyDeserializer;
private AvroGenericDeserializer<O> outputValueDeserializer;
private AvroSerializer<O> outputValueSerializer;
private final AvroGenericDeserializer<K> keyDeserializer;
private final AvroGenericDeserializer<O> outputValueDeserializer;
private final AvroSerializer<O> outputValueSerializer;

public DaVinciRecordTransformerUtility(DaVinciRecordTransformer recordTransformer) {
this.recordTransformer = recordTransformer;

Schema keySchema = recordTransformer.getKeySchema();
Schema outputValueSchema = recordTransformer.getOutputValueSchema();
this.keyDeserializer = new AvroGenericDeserializer<>(keySchema, keySchema);
this.outputValueDeserializer = new AvroGenericDeserializer<>(outputValueSchema, outputValueSchema);
this.outputValueSerializer = new AvroSerializer<>(outputValueSchema);
kvargha marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -40,7 +46,7 @@ public DaVinciRecordTransformerUtility(DaVinciRecordTransformer recordTransforme
* @return a ByteBuffer containing the schema ID followed by the serialized and compressed value
*/
public final ByteBuffer prependSchemaIdToHeader(O value, int schemaId, VeniceCompressor compressor) {
byte[] serializedValue = getOutputValueSerializer().serialize(value);
byte[] serializedValue = outputValueSerializer.serialize(value);
byte[] compressedValue;
try {
compressedValue = compressor.compress(serializedValue);
Expand Down Expand Up @@ -116,7 +122,7 @@ public final void onRecovery(
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
byte[] keyBytes = iterator.key();
byte[] valueBytes = iterator.value();
Lazy<K> lazyKey = Lazy.of(() -> getKeyDeserializer().deserialize(ByteBuffer.wrap(keyBytes)));
Lazy<K> lazyKey = Lazy.of(() -> keyDeserializer.deserialize(keyBytes));
Lazy<O> lazyValue = Lazy.of(() -> {
ByteBuffer valueByteBuffer = ByteBuffer.wrap(valueBytes);
// Skip schema id
Expand All @@ -127,35 +133,11 @@ public final void onRecovery(
} catch (IOException e) {
throw new RuntimeException(e);
}
return getOutputValueDeserializer().deserialize(decompressedValueBytes);
return outputValueDeserializer.deserialize(decompressedValueBytes);
});

recordTransformer.processPut(lazyKey, lazyValue);
}
}
}

public AvroGenericDeserializer<K> getKeyDeserializer() {
if (keyDeserializer == null) {
Schema keySchema = recordTransformer.getKeySchema();
keyDeserializer = new AvroGenericDeserializer<>(keySchema, keySchema);
}
return keyDeserializer;
}

public AvroGenericDeserializer<O> getOutputValueDeserializer() {
if (outputValueDeserializer == null) {
Schema outputValueSchema = recordTransformer.getOutputValueSchema();
outputValueDeserializer = new AvroGenericDeserializer<>(outputValueSchema, outputValueSchema);
}
return outputValueDeserializer;
}

public AvroSerializer<O> getOutputValueSerializer() {
if (outputValueSerializer == null) {
Schema outputValueSchema = recordTransformer.getOutputValueSchema();
outputValueSerializer = new AvroSerializer<>(outputValueSchema);
}
return outputValueSerializer;
}
kvargha marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface;
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
Expand Down Expand Up @@ -114,7 +114,7 @@ public ActiveActiveStoreIngestionTask(
int errorPartitionId,
boolean isIsolatedIngestion,
Optional<ObjectCacheBackend> cacheBackend,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction,
DaVinciRecordTransformerConfig recordTransformerConfig,
Lazy<ZKHelixAdmin> zkHelixAdmin) {
super(
storageService,
Expand All @@ -127,7 +127,7 @@ public ActiveActiveStoreIngestionTask(
errorPartitionId,
isIsolatedIngestion,
cacheBackend,
recordTransformerFunction,
recordTransformerConfig,
zkHelixAdmin);

this.rmdProtocolVersionId = version.getRmdVersionId();
Expand Down
Loading
Loading