Skip to content

Commit

Permalink
[duckdb] Utility to compare table schemas (#1442)
Browse files Browse the repository at this point in the history
This is used to check whether a pre-existing table on disk has a
schema equal to the one we want.

Did this by introducing various utility classes, including:

- ColumnDefinition
- TableDefinition
- IndexType
- SQLUtils

Also refactored unit tests to decouple from DuckDB, so that it's
easier to test other engines in the future (e.g., SQLite).
  • Loading branch information
FelixGV authored Jan 15, 2025
1 parent 5e0f862 commit 4bf1fa8
Show file tree
Hide file tree
Showing 14 changed files with 687 additions and 131 deletions.
6 changes: 5 additions & 1 deletion gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<Class name="com.linkedin.davinci.StoreBackendTest"/>
<Class name="com.linkedin.venice.memory.ClassSizeEstimatorTest"/>
<Class name="com.linkedin.venice.controller.server.VeniceControllerAccessManagerTest"/>
<Class name="com.linkedin.venice.sql.SQLUtilsTest"/>
</Or>
</Match>
<Match>
Expand Down Expand Up @@ -500,9 +501,12 @@
<Match>
<Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"/>
<Or>
<Class name="com.linkedin.venice.duckdb.HelloWorldTest"/>
<Class name="com.linkedin.venice.duckdb.DuckDBHelloWorldTest"/>
<Class name="com.linkedin.venice.duckdb.DuckDBAvroToSQLTest"/>
<Class name="com.linkedin.venice.duckdb.DuckDBDaVinciRecordTransformer"/>
<Class name="com.linkedin.venice.sql.SQLHelloWorldTest"/>
<Class name="com.linkedin.venice.sql.SQLUtilsTest"/>
<Class name="com.linkedin.venice.sql.SQLUtils"/>
</Or>
</Match>
<Match>
Expand Down
1 change: 1 addition & 0 deletions integrations/venice-duckdb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
implementation libraries.avro
implementation libraries.avroUtilCompatHelper
implementation libraries.duckdbJdbc
api libraries.log4j2api

implementation project(':clients:da-vinci-client')
implementation project(':internal:venice-client-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.sql.AvroToSQL;
import com.linkedin.venice.sql.InsertProcessor;
import com.linkedin.venice.sql.SQLUtils;
import com.linkedin.venice.sql.TableDefinition;
import com.linkedin.venice.utils.lazy.Lazy;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -18,14 +21,16 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class DuckDBDaVinciRecordTransformer
extends DaVinciRecordTransformer<GenericRecord, GenericRecord, GenericRecord> {
private static final Logger LOGGER = LogManager.getLogger(DuckDBDaVinciRecordTransformer.class);
private static final String duckDBFilePath = "my_database.duckdb";
// ToDo: Don't hardcode the table name. Get it from the storeName
private static final String baseVersionTableName = "my_table_v";
private static final Set<String> primaryKeys = Collections.singleton("firstName");
private static final String deleteStatementTemplate = "DELETE FROM %s WHERE %s = ?;";
private static final String createViewStatementTemplate =
"CREATE OR REPLACE VIEW current_version AS SELECT * FROM %s;";
Expand Down Expand Up @@ -116,14 +121,25 @@ public void processDelete(Lazy<GenericRecord> key) {
public void onStartVersionIngestion(boolean isCurrentVersion) {
try (Connection connection = DriverManager.getConnection(duckDBUrl);
Statement stmt = connection.createStatement()) {
String createTableStatement = AvroToSQL.createTableStatement(
versionTableName,
TableDefinition desiredTableDefinition = AvroToSQL.getTableDefinition(
this.versionTableName,
getKeySchema(),
getOutputValueSchema(),
this.columnsToProject,
FAIL,
true);
stmt.execute(createTableStatement);
TableDefinition existingTableDefinition = SQLUtils.getTableDefinition(this.versionTableName, connection);
if (existingTableDefinition == null) {
LOGGER.info("Table '{}' not found on disk, will create it from scratch", this.versionTableName);
String createTableStatement = SQLUtils.createTableStatement(desiredTableDefinition);
stmt.execute(createTableStatement);
} else if (existingTableDefinition.equals(desiredTableDefinition)) {
LOGGER.info("Table '{}' found on disk and its schema is compatible. Will reuse.", this.versionTableName);
} else {
// TODO: Handle the wiping and re-bootstrap automatically.
throw new VeniceException(
"Table '" + this.versionTableName + "' found on disk, but its schema is incompatible. Please wipe.");
}

if (isCurrentVersion) {
// Unable to convert to prepared statement as table and column names can't be parameterized
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.sql;

import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.LinkedHashSet;
Expand All @@ -26,7 +27,7 @@ public enum UnsupportedTypeHandling {
private static final Map<Schema.Type, JDBCType> AVRO_TO_JDBC_TYPE_MAPPING;

static {
Map<Schema.Type, JDBCType> avroToJdbc = new EnumMap(Schema.Type.class);
Map<Schema.Type, JDBCType> avroToJdbc = new EnumMap<>(Schema.Type.class);

// avroToJdbc.put(Schema.Type.UNION, JDBCType.?); // Unions need special handling, see below
avroToJdbc.put(Schema.Type.FIXED, JDBCType.BINARY);
Expand Down Expand Up @@ -59,29 +60,17 @@ private AvroToSQL() {
*/
}

/**
* @param tableName the name of the table in the CREATE TABLE statement
* @param keySchema the Venice key schema
* @param valueSchema the Venice value schema
* @param columnsToProject if empty, then all columns are included, otherwise, only the specified ones
* @param unsupportedTypeHandling the policy of whether to skip or fail when encountering unsupported types
* @param primaryKey whether to define a PRIMARY KEY constraint on the table, including all key schema columns
* @return
*/
@Nonnull
public static String createTableStatement(
public static TableDefinition getTableDefinition(
@Nonnull String tableName,
@Nonnull Schema keySchema,
@Nonnull Schema valueSchema,
@Nonnull Set<String> columnsToProject,
@Nonnull UnsupportedTypeHandling unsupportedTypeHandling,
boolean primaryKey) {
Set<Schema.Field> allColumns = combineColumns(keySchema, valueSchema, columnsToProject);
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("CREATE TABLE " + cleanTableName(tableName) + "(");
boolean firstColumn = true;

for (Schema.Field field: allColumns) {
List<ColumnDefinition> columnDefinitions = new ArrayList<>();
int jdbcIndex = 1;
for (Schema.Field field: combineColumns(keySchema, valueSchema, columnsToProject)) {
JDBCType correspondingType = getCorrespondingType(field);
if (correspondingType == null) {
switch (unsupportedTypeHandling) {
Expand All @@ -97,31 +86,19 @@ public static String createTableStatement(
}
}

if (firstColumn) {
firstColumn = false;
} else {
stringBuffer.append(", ");
}

stringBuffer.append(cleanColumnName(field.name()) + " " + correspondingType.name());
boolean isPrimaryKey = primaryKey && keySchema.getFields().contains(field);
columnDefinitions.add(
new ColumnDefinition(
SQLUtils.cleanColumnName(field.name()),
correspondingType,
true, // TODO: plug nullability
isPrimaryKey ? IndexType.PRIMARY_KEY : null,
null, // TODO: plug default (if necessary)...
null,
jdbcIndex++));
}

firstColumn = true;
if (primaryKey) {
stringBuffer.append(", PRIMARY KEY(");
for (Schema.Field pkColumn: keySchema.getFields()) {
if (firstColumn) {
firstColumn = false;
} else {
stringBuffer.append(", ");
}
stringBuffer.append(cleanColumnName(pkColumn.name()));
}
stringBuffer.append(")");
}
stringBuffer.append(");");

return stringBuffer.toString();
return new TableDefinition(tableName, columnDefinitions);
}

@Nonnull
Expand All @@ -132,7 +109,7 @@ public static String upsertStatement(
@Nonnull Set<String> columnsToProject) {
Set<Schema.Field> allColumns = combineColumns(keySchema, valueSchema, columnsToProject);
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("INSERT OR REPLACE INTO " + cleanTableName(tableName) + " VALUES (");
stringBuffer.append("INSERT OR REPLACE INTO " + SQLUtils.cleanTableName(tableName) + " VALUES (");
boolean firstColumn = true;

for (Schema.Field field: allColumns) {
Expand Down Expand Up @@ -187,22 +164,6 @@ static JDBCType getCorrespondingType(Schema.Field field) {
return AVRO_TO_JDBC_TYPE_MAPPING.get(fieldType);
}

/**
* This function should encapsulate the handling of any illegal characters (by either failing or converting them).
*/
@Nonnull
private static String cleanTableName(@Nonnull String avroRecordName) {
return Objects.requireNonNull(avroRecordName);
}

/**
* This function should encapsulate the handling of any illegal characters (by either failing or converting them).
*/
@Nonnull
private static String cleanColumnName(@Nonnull String avroFieldName) {
return Objects.requireNonNull(avroFieldName);
}

@Nonnull
static Set<Schema.Field> combineColumns(
@Nonnull Schema keySchema,
Expand All @@ -216,9 +177,7 @@ static Set<Schema.Field> combineColumns(
throw new IllegalArgumentException("Only Avro records can have a corresponding CREATE TABLE statement.");
}
Set<Schema.Field> allColumns = new LinkedHashSet<>(keySchema.getFields().size() + valueSchema.getFields().size());
for (Schema.Field field: keySchema.getFields()) {
allColumns.add(field);
}
allColumns.addAll(keySchema.getFields());
for (Schema.Field field: valueSchema.getFields()) {
if (columnsToProject.isEmpty() || columnsToProject.contains(field.name())) {
if (!allColumns.add(field)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.linkedin.venice.sql;

import java.sql.JDBCType;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


public class ColumnDefinition {
@Nonnull
private final String name;
@Nonnull
private final JDBCType type;
private final boolean nullable;
@Nullable
private final IndexType indexType;
@Nullable
private final String defaultValue;
@Nullable
private final String extra;
private final int jdbcIndex;

public ColumnDefinition(@Nonnull String name, @Nonnull JDBCType type, int jdbcIndex) {
this(name, type, true, null, jdbcIndex);
}

public ColumnDefinition(
@Nonnull String name,
@Nonnull JDBCType type,
boolean nullable,
@Nullable IndexType indexType,
int jdbcIndex) {
this(name, type, nullable, indexType, null, null, jdbcIndex);
}

public ColumnDefinition(
@Nonnull String name,
@Nonnull JDBCType type,
boolean nullable,
@Nullable IndexType indexType,
@Nullable String defaultValue,
@Nullable String extra,
int jdbcIndex) {
this.name = Objects.requireNonNull(name);
this.type = Objects.requireNonNull(type);
this.nullable = nullable;
this.indexType = indexType;
this.defaultValue = defaultValue;
this.extra = extra;
this.jdbcIndex = jdbcIndex;
if (this.jdbcIndex < 1) {
throw new IllegalArgumentException("The jdbcIndex must be at least 1");
}
}

@Nonnull
public String getName() {
return name;
}

@Nonnull
public JDBCType getType() {
return type;
}

public boolean isNullable() {
return nullable;
}

@Nullable
public IndexType getIndexType() {
return indexType;
}

@Nullable
public String getDefaultValue() {
return defaultValue;
}

@Nullable
public String getExtra() {
return extra;
}

public int getJdbcIndex() {
return jdbcIndex;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ColumnDefinition that = (ColumnDefinition) o;

return this.nullable == that.nullable && this.jdbcIndex == that.jdbcIndex && this.name.equals(that.name)
&& this.type == that.type && this.indexType == that.indexType && Objects.equals(defaultValue, that.defaultValue)
&& Objects.equals(extra, that.extra);
}

@Override
public int hashCode() {
return this.name.hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.linkedin.venice.sql;

public enum IndexType {
PRIMARY_KEY, UNIQUE;
}
Loading

0 comments on commit 4bf1fa8

Please sign in to comment.