Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[duckdb] Utility to compare table schemas #1442

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<Class name="com.linkedin.davinci.StoreBackendTest"/>
<Class name="com.linkedin.venice.memory.ClassSizeEstimatorTest"/>
<Class name="com.linkedin.venice.controller.server.VeniceControllerAccessManagerTest"/>
<Class name="com.linkedin.venice.sql.SQLUtilsTest"/>
</Or>
</Match>
<Match>
Expand Down Expand Up @@ -500,9 +501,12 @@
<Match>
<Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"/>
<Or>
<Class name="com.linkedin.venice.duckdb.HelloWorldTest"/>
<Class name="com.linkedin.venice.duckdb.DuckDBHelloWorldTest"/>
<Class name="com.linkedin.venice.duckdb.DuckDBAvroToSQLTest"/>
<Class name="com.linkedin.venice.duckdb.DuckDBDaVinciRecordTransformer"/>
<Class name="com.linkedin.venice.sql.SQLHelloWorldTest"/>
<Class name="com.linkedin.venice.sql.SQLUtilsTest"/>
<Class name="com.linkedin.venice.sql.SQLUtils"/>
</Or>
</Match>
<Match>
Expand Down
1 change: 1 addition & 0 deletions integrations/venice-duckdb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
implementation libraries.avro
implementation libraries.avroUtilCompatHelper
implementation libraries.duckdbJdbc
api libraries.log4j2api

implementation project(':clients:da-vinci-client')
implementation project(':internal:venice-client-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.sql.AvroToSQL;
import com.linkedin.venice.sql.InsertProcessor;
import com.linkedin.venice.sql.SQLUtils;
import com.linkedin.venice.sql.TableDefinition;
import com.linkedin.venice.utils.lazy.Lazy;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -18,14 +21,16 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class DuckDBDaVinciRecordTransformer
extends DaVinciRecordTransformer<GenericRecord, GenericRecord, GenericRecord> {
private static final Logger LOGGER = LogManager.getLogger(DuckDBDaVinciRecordTransformer.class);
private static final String duckDBFilePath = "my_database.duckdb";
// ToDo: Don't hardcode the table name. Get it from the storeName
private static final String baseVersionTableName = "my_table_v";
private static final Set<String> primaryKeys = Collections.singleton("firstName");
private static final String deleteStatementTemplate = "DELETE FROM %s WHERE %s = ?;";
private static final String createViewStatementTemplate =
"CREATE OR REPLACE VIEW current_version AS SELECT * FROM %s;";
Expand Down Expand Up @@ -116,14 +121,25 @@ public void processDelete(Lazy<GenericRecord> key) {
public void onStartVersionIngestion(boolean isCurrentVersion) {
try (Connection connection = DriverManager.getConnection(duckDBUrl);
Statement stmt = connection.createStatement()) {
String createTableStatement = AvroToSQL.createTableStatement(
versionTableName,
TableDefinition desiredTableDefinition = AvroToSQL.getTableDefinition(
this.versionTableName,
getKeySchema(),
getOutputValueSchema(),
this.columnsToProject,
FAIL,
true);
stmt.execute(createTableStatement);
TableDefinition existingTableDefinition = SQLUtils.getTableDefinition(this.versionTableName, connection);
if (existingTableDefinition == null) {
LOGGER.info("Table '{}' not found on disk, will create it from scratch", this.versionTableName);
String createTableStatement = SQLUtils.createTableStatement(desiredTableDefinition);
stmt.execute(createTableStatement);
} else if (existingTableDefinition.equals(desiredTableDefinition)) {
LOGGER.info("Table '{}' found on disk and its schema is compatible. Will reuse.", this.versionTableName);
} else {
// TODO: Handle the wiping and re-bootstrap automatically.
throw new VeniceException(
"Table '" + this.versionTableName + "' found on disk, but its schema is incompatible. Please wipe.");
}

if (isCurrentVersion) {
// Unable to convert to prepared statement as table and column names can't be parameterized
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.sql;

import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.LinkedHashSet;
Expand All @@ -26,7 +27,7 @@ public enum UnsupportedTypeHandling {
private static final Map<Schema.Type, JDBCType> AVRO_TO_JDBC_TYPE_MAPPING;

static {
Map<Schema.Type, JDBCType> avroToJdbc = new EnumMap(Schema.Type.class);
Map<Schema.Type, JDBCType> avroToJdbc = new EnumMap<>(Schema.Type.class);

// avroToJdbc.put(Schema.Type.UNION, JDBCType.?); // Unions need special handling, see below
avroToJdbc.put(Schema.Type.FIXED, JDBCType.BINARY);
Expand Down Expand Up @@ -59,29 +60,17 @@ private AvroToSQL() {
*/
}

/**
* @param tableName the name of the table in the CREATE TABLE statement
* @param keySchema the Venice key schema
* @param valueSchema the Venice value schema
* @param columnsToProject if empty, then all columns are included, otherwise, only the specified ones
* @param unsupportedTypeHandling the policy of whether to skip or fail when encountering unsupported types
* @param primaryKey whether to define a PRIMARY KEY constraint on the table, including all key schema columns
* @return
*/
@Nonnull
public static String createTableStatement(
public static TableDefinition getTableDefinition(
@Nonnull String tableName,
@Nonnull Schema keySchema,
@Nonnull Schema valueSchema,
@Nonnull Set<String> columnsToProject,
@Nonnull UnsupportedTypeHandling unsupportedTypeHandling,
boolean primaryKey) {
Set<Schema.Field> allColumns = combineColumns(keySchema, valueSchema, columnsToProject);
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("CREATE TABLE " + cleanTableName(tableName) + "(");
boolean firstColumn = true;

for (Schema.Field field: allColumns) {
List<ColumnDefinition> columnDefinitions = new ArrayList<>();
int jdbcIndex = 1;
for (Schema.Field field: combineColumns(keySchema, valueSchema, columnsToProject)) {
JDBCType correspondingType = getCorrespondingType(field);
if (correspondingType == null) {
switch (unsupportedTypeHandling) {
Expand All @@ -97,31 +86,19 @@ public static String createTableStatement(
}
}

if (firstColumn) {
firstColumn = false;
} else {
stringBuffer.append(", ");
}

stringBuffer.append(cleanColumnName(field.name()) + " " + correspondingType.name());
boolean isPrimaryKey = primaryKey && keySchema.getFields().contains(field);
columnDefinitions.add(
new ColumnDefinition(
SQLUtils.cleanColumnName(field.name()),
correspondingType,
true, // TODO: plug nullability
isPrimaryKey ? IndexType.PRIMARY_KEY : null,
null, // TODO: plug default (if necessary)...
null,
jdbcIndex++));
}

firstColumn = true;
if (primaryKey) {
stringBuffer.append(", PRIMARY KEY(");
for (Schema.Field pkColumn: keySchema.getFields()) {
if (firstColumn) {
firstColumn = false;
} else {
stringBuffer.append(", ");
}
stringBuffer.append(cleanColumnName(pkColumn.name()));
}
stringBuffer.append(")");
}
stringBuffer.append(");");

return stringBuffer.toString();
return new TableDefinition(tableName, columnDefinitions);
}

@Nonnull
Expand All @@ -132,7 +109,7 @@ public static String upsertStatement(
@Nonnull Set<String> columnsToProject) {
Set<Schema.Field> allColumns = combineColumns(keySchema, valueSchema, columnsToProject);
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("INSERT OR REPLACE INTO " + cleanTableName(tableName) + " VALUES (");
stringBuffer.append("INSERT OR REPLACE INTO " + SQLUtils.cleanTableName(tableName) + " VALUES (");
boolean firstColumn = true;

for (Schema.Field field: allColumns) {
Expand Down Expand Up @@ -187,22 +164,6 @@ static JDBCType getCorrespondingType(Schema.Field field) {
return AVRO_TO_JDBC_TYPE_MAPPING.get(fieldType);
}

/**
* This function should encapsulate the handling of any illegal characters (by either failing or converting them).
*/
@Nonnull
private static String cleanTableName(@Nonnull String avroRecordName) {
return Objects.requireNonNull(avroRecordName);
}

/**
* This function should encapsulate the handling of any illegal characters (by either failing or converting them).
*/
@Nonnull
private static String cleanColumnName(@Nonnull String avroFieldName) {
return Objects.requireNonNull(avroFieldName);
}

@Nonnull
static Set<Schema.Field> combineColumns(
@Nonnull Schema keySchema,
Expand All @@ -216,9 +177,7 @@ static Set<Schema.Field> combineColumns(
throw new IllegalArgumentException("Only Avro records can have a corresponding CREATE TABLE statement.");
}
Set<Schema.Field> allColumns = new LinkedHashSet<>(keySchema.getFields().size() + valueSchema.getFields().size());
for (Schema.Field field: keySchema.getFields()) {
allColumns.add(field);
}
allColumns.addAll(keySchema.getFields());
for (Schema.Field field: valueSchema.getFields()) {
if (columnsToProject.isEmpty() || columnsToProject.contains(field.name())) {
if (!allColumns.add(field)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.linkedin.venice.sql;

import java.sql.JDBCType;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


public class ColumnDefinition {
@Nonnull
private final String name;
@Nonnull
private final JDBCType type;
private final boolean nullable;
@Nullable
private final IndexType indexType;
@Nullable
private final String defaultValue;
@Nullable
private final String extra;
private final int jdbcIndex;

public ColumnDefinition(@Nonnull String name, @Nonnull JDBCType type, int jdbcIndex) {
this(name, type, true, null, jdbcIndex);
}

public ColumnDefinition(
@Nonnull String name,
@Nonnull JDBCType type,
boolean nullable,
@Nullable IndexType indexType,
int jdbcIndex) {
this(name, type, nullable, indexType, null, null, jdbcIndex);
}

public ColumnDefinition(
@Nonnull String name,
@Nonnull JDBCType type,
boolean nullable,
@Nullable IndexType indexType,
@Nullable String defaultValue,
@Nullable String extra,
int jdbcIndex) {
this.name = Objects.requireNonNull(name);
this.type = Objects.requireNonNull(type);
this.nullable = nullable;
this.indexType = indexType;
this.defaultValue = defaultValue;
this.extra = extra;
this.jdbcIndex = jdbcIndex;
if (this.jdbcIndex < 1) {
throw new IllegalArgumentException("The jdbcIndex must be at least 1");
}
}

@Nonnull
public String getName() {
return name;
}

@Nonnull
public JDBCType getType() {
return type;
}

public boolean isNullable() {
return nullable;
}

@Nullable
public IndexType getIndexType() {
return indexType;
}

@Nullable
public String getDefaultValue() {
return defaultValue;
}

@Nullable
public String getExtra() {
return extra;
}

public int getJdbcIndex() {
return jdbcIndex;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ColumnDefinition that = (ColumnDefinition) o;

return this.nullable == that.nullable && this.jdbcIndex == that.jdbcIndex && this.name.equals(that.name)
&& this.type == that.type && this.indexType == that.indexType && Objects.equals(defaultValue, that.defaultValue)
&& Objects.equals(extra, that.extra);
}

@Override
public int hashCode() {
return this.name.hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.linkedin.venice.sql;

public enum IndexType {
PRIMARY_KEY, UNIQUE;
}
Loading
Loading