Skip to content

Commit

Permalink
[duckdb] Cleanup and refactor DVRT interface to be more configurable (#…
Browse files Browse the repository at this point in the history
…1445)

Previously, the user would have to define the key and output schema inside their DVRT implementation, which wasn't clean. Now, they're moved into the constructor. The user now only needs to provide the output schema, as the key schema will be automatically passed in by DVC.

I also added a constructor parameter for original output schema to support reader/writer schemas, but added a ToDo on how to implement the logic to pass in the correct schema.

Lastly, I make the DuckDB DVRT implementation more configurable, as it was previously more hardcoded.
  • Loading branch information
kvargha authored Jan 16, 2025
1 parent 7fb1577 commit c77004a
Show file tree
Hide file tree
Showing 26 changed files with 385 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.linkedin.davinci.blobtransfer.BlobTransferManager;
import com.linkedin.davinci.blobtransfer.BlobTransferUtil;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface;
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.StoreBackendConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
Expand Down Expand Up @@ -122,7 +122,7 @@ public DaVinciBackend(
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
DaVinciRecordTransformerConfig recordTransformerConfig) {
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();
Expand Down Expand Up @@ -271,7 +271,7 @@ public DaVinciBackend(
false,
compressorFactory,
cacheBackend,
recordTransformerFunction,
recordTransformerConfig,
true,
// TODO: consider how/if a repair task would be valid for Davinci users?
null,
Expand All @@ -294,7 +294,7 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerFunction != null) {
if (recordTransformerConfig != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ protected void initBackend(
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
DaVinciRecordTransformerConfig recordTransformerConfig) {
synchronized (AvroGenericDaVinciClient.class) {
if (daVinciBackend == null) {
logger
Expand All @@ -744,7 +744,7 @@ protected void initBackend(
managedClients,
icProvider,
cacheConfig,
recordTransformerFunction),
recordTransformerConfig),
backend -> {
// Ensure that existing backend is fully closed before a new one can be created.
synchronized (AvroGenericDaVinciClient.class) {
Expand Down Expand Up @@ -782,13 +782,7 @@ public synchronized void start() {
logger.info("Starting client, storeName=" + getStoreName());
VeniceConfigLoader configLoader = buildVeniceConfig();
Optional<ObjectCacheConfig> cacheConfig = Optional.ofNullable(daVinciConfig.getCacheConfig());
initBackend(
clientConfig,
configLoader,
managedClients,
icProvider,
cacheConfig,
daVinciConfig.getRecordTransformerFunction());
initBackend(clientConfig, configLoader, managedClients, icProvider, cacheConfig, recordTransformerConfig);

try {
getBackend().verifyCacheConfigEquality(daVinciConfig.getCacheConfig(), getStoreName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ public class BlockingDaVinciRecordTransformer<K, V, O> extends DaVinciRecordTran
private final DaVinciRecordTransformer recordTransformer;
private final CountDownLatch startLatch = new CountDownLatch(1);

public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer, boolean storeRecordsInDaVinci) {
super(recordTransformer.getStoreVersion(), storeRecordsInDaVinci);
public BlockingDaVinciRecordTransformer(
DaVinciRecordTransformer recordTransformer,
Schema keySchema,
Schema inputValueSchema,
Schema outputValueSchema,
boolean storeRecordsInDaVinci) {
super(recordTransformer.getStoreVersion(), keySchema, inputValueSchema, outputValueSchema, storeRecordsInDaVinci);
this.recordTransformer = recordTransformer;
}

public Schema getKeySchema() {
return this.recordTransformer.getKeySchema();
}

public Schema getOutputValueSchema() {
return this.recordTransformer.getOutputValueSchema();
}

public DaVinciRecordTransformerResult<O> transform(Lazy<K> key, Lazy<V> value) {
return this.recordTransformer.transform(key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,6 @@ public DaVinciRecordTransformerConfig getRecordTransformerConfig() {
return recordTransformerConfig;
}

public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) {
if (recordTransformerConfig == null) {
return null;
}
return recordTransformerConfig.getRecordTransformer(storeVersion);
}

public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction() {
if (recordTransformerConfig == null) {
return null;
}
return recordTransformerConfig.getRecordTransformerFunction();
}

public boolean isReadMetricsEnabled() {
return readMetricsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,46 @@ public abstract class DaVinciRecordTransformer<K, V, O> {
*/
private final boolean storeRecordsInDaVinci;

/**
* The key schema, which is immutable inside DaVinciClient. Users can modify the key if they are storing records in an external storage engine, but this must be managed by the user.
*/
private final Schema keySchema;

/**
* The value schema before transformation, which is provided by the DaVinciClient.
*/
private final Schema inputValueSchema;

/**
* The value schema after transformation, which is provided by the user.
*/
private final Schema outputValueSchema;

private final DaVinciRecordTransformerUtility<K, O> recordTransformerUtility;

/**
* @param storeVersion the version of the store
* @param keySchema the key schema, which is immutable inside DaVinciClient. Users can modify the key if they are storing records in an external storage engine, but this must be managed by the user
* @param inputValueSchema the value schema before transformation
* @param outputValueSchema the value schema after transformation
* @param storeRecordsInDaVinci set this to false if you intend to store records in a custom storage,
* and not in the Da Vinci Client.
*/
public DaVinciRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci) {
* and not in the Da Vinci Client
*/
public DaVinciRecordTransformer(
int storeVersion,
Schema keySchema,
Schema inputValueSchema,
Schema outputValueSchema,
boolean storeRecordsInDaVinci) {
this.storeVersion = storeVersion;
this.storeRecordsInDaVinci = storeRecordsInDaVinci;
this.keySchema = keySchema;
// ToDo: Make use of inputValueSchema to support reader/writer schemas
this.inputValueSchema = inputValueSchema;
this.outputValueSchema = outputValueSchema;
this.recordTransformerUtility = new DaVinciRecordTransformerUtility<>(this);
}

/**
* Returns the schema for the key used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link K}.
*/
public abstract Schema getKeySchema();

/**
* Returns the schema for the output value used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link O}.
*/
public abstract Schema getOutputValueSchema();

/**
* Implement this method to transform records before they are stored.
* This can be useful for tasks such as filtering out unused fields to save storage space.
Expand Down Expand Up @@ -206,6 +219,33 @@ public final boolean getStoreRecordsInDaVinci() {
return storeRecordsInDaVinci;
}

/**
* Returns the schema for the key used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link K}.
*/
public final Schema getKeySchema() {
return keySchema;
}

/**
* Returns the schema for the input value used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link V}.
*/
public final Schema getInputValueSchema() {
return inputValueSchema;
}

/**
* Returns the schema for the output value used in {@link DaVinciClient}'s operations.
*
* @return a {@link Schema} corresponding to the type of {@link O}.
*/
public final Schema getOutputValueSchema() {
return outputValueSchema;
}

/**
* @return {@link #recordTransformerUtility}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction(
return recordTransformerFunction;
}

/**
* @param storeVersion the store version
* @return a new {@link DaVinciRecordTransformer}
*/
public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) {
return recordTransformerFunction.apply(storeVersion);
}

/**
* @return {@link #outputValueClass}
*/
Expand All @@ -53,5 +45,4 @@ public Class getOutputValueClass() {
public Schema getOutputValueSchema() {
return outputValueSchema;
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.linkedin.davinci.client;

import org.apache.avro.Schema;


/**
* This describes the implementation for the functional interface of {@link DaVinciRecordTransformer}
*/

@FunctionalInterface
public interface DaVinciRecordTransformerFunctionalInterface {
DaVinciRecordTransformer apply(Integer storeVersion);
DaVinciRecordTransformer apply(
Integer storeVersion,
Schema keySchema,
Schema inputValueSchema,
Schema outputValueSchema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@
*/
public class DaVinciRecordTransformerUtility<K, O> {
private final DaVinciRecordTransformer recordTransformer;
private AvroGenericDeserializer<K> keyDeserializer;
private AvroGenericDeserializer<O> outputValueDeserializer;
private AvroSerializer<O> outputValueSerializer;
private final AvroGenericDeserializer<K> keyDeserializer;
private final AvroGenericDeserializer<O> outputValueDeserializer;
private final AvroSerializer<O> outputValueSerializer;

public DaVinciRecordTransformerUtility(DaVinciRecordTransformer recordTransformer) {
this.recordTransformer = recordTransformer;

Schema keySchema = recordTransformer.getKeySchema();
Schema outputValueSchema = recordTransformer.getOutputValueSchema();
this.keyDeserializer = new AvroGenericDeserializer<>(keySchema, keySchema);
this.outputValueDeserializer = new AvroGenericDeserializer<>(outputValueSchema, outputValueSchema);
this.outputValueSerializer = new AvroSerializer<>(outputValueSchema);
}

/**
Expand All @@ -40,7 +46,7 @@ public DaVinciRecordTransformerUtility(DaVinciRecordTransformer recordTransforme
* @return a ByteBuffer containing the schema ID followed by the serialized and compressed value
*/
public final ByteBuffer prependSchemaIdToHeader(O value, int schemaId, VeniceCompressor compressor) {
byte[] serializedValue = getOutputValueSerializer().serialize(value);
byte[] serializedValue = outputValueSerializer.serialize(value);
byte[] compressedValue;
try {
compressedValue = compressor.compress(serializedValue);
Expand Down Expand Up @@ -116,7 +122,7 @@ public final void onRecovery(
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
byte[] keyBytes = iterator.key();
byte[] valueBytes = iterator.value();
Lazy<K> lazyKey = Lazy.of(() -> getKeyDeserializer().deserialize(ByteBuffer.wrap(keyBytes)));
Lazy<K> lazyKey = Lazy.of(() -> keyDeserializer.deserialize(keyBytes));
Lazy<O> lazyValue = Lazy.of(() -> {
ByteBuffer valueByteBuffer = ByteBuffer.wrap(valueBytes);
// Skip schema id
Expand All @@ -127,35 +133,11 @@ public final void onRecovery(
} catch (IOException e) {
throw new RuntimeException(e);
}
return getOutputValueDeserializer().deserialize(decompressedValueBytes);
return outputValueDeserializer.deserialize(decompressedValueBytes);
});

recordTransformer.processPut(lazyKey, lazyValue);
}
}
}

public AvroGenericDeserializer<K> getKeyDeserializer() {
if (keyDeserializer == null) {
Schema keySchema = recordTransformer.getKeySchema();
keyDeserializer = new AvroGenericDeserializer<>(keySchema, keySchema);
}
return keyDeserializer;
}

public AvroGenericDeserializer<O> getOutputValueDeserializer() {
if (outputValueDeserializer == null) {
Schema outputValueSchema = recordTransformer.getOutputValueSchema();
outputValueDeserializer = new AvroGenericDeserializer<>(outputValueSchema, outputValueSchema);
}
return outputValueDeserializer;
}

public AvroSerializer<O> getOutputValueSerializer() {
if (outputValueSerializer == null) {
Schema outputValueSchema = recordTransformer.getOutputValueSchema();
outputValueSerializer = new AvroSerializer<>(outputValueSchema);
}
return outputValueSerializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface;
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
Expand Down Expand Up @@ -114,7 +114,7 @@ public ActiveActiveStoreIngestionTask(
int errorPartitionId,
boolean isIsolatedIngestion,
Optional<ObjectCacheBackend> cacheBackend,
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction,
DaVinciRecordTransformerConfig recordTransformerConfig,
Lazy<ZKHelixAdmin> zkHelixAdmin) {
super(
storageService,
Expand All @@ -127,7 +127,7 @@ public ActiveActiveStoreIngestionTask(
errorPartitionId,
isIsolatedIngestion,
cacheBackend,
recordTransformerFunction,
recordTransformerConfig,
zkHelixAdmin);

this.rmdProtocolVersionId = version.getRmdVersionId();
Expand Down
Loading

0 comments on commit c77004a

Please sign in to comment.