Skip to content

Commit

Permalink
[duckdb] Do not wipe current version in onEndVersionIngestion (#1448)
Browse files Browse the repository at this point in the history
Only wipe the non-current versions (e.g. backup).
  • Loading branch information
FelixGV authored Jan 16, 2025
1 parent c77004a commit d1abe37
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,14 @@ public void onEndVersionIngestion(int currentVersion) {
String createViewStatement = String.format(createViewStatementTemplate, currentVersionTableName);
stmt.execute(createViewStatement);

// Unable to convert to prepared statement as table and column names can't be parameterized
// Drop DuckDB table for storeVersion as it's retired
String dropTableStatement = String.format(dropTableStatementTemplate, versionTableName);
stmt.execute(dropTableStatement);
if (currentVersion != getStoreVersion()) {
// Only drop non-current versions, e.g., the backup version getting retired.

// Unable to convert to prepared statement as table and column names can't be parameterized
// Drop DuckDB table for storeVersion as it's retired
String dropTableStatement = String.format(dropTableStatementTemplate, versionTableName);
stmt.execute(dropTableStatement);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Properties;
Expand Down Expand Up @@ -129,6 +130,7 @@ public void testRecordTransformer() throws Exception {

VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled);
MetricsRepository metricsRepository = new MetricsRepository();
String duckDBUrl = "jdbc:duckdb:" + tmpDir.getAbsolutePath() + "/my_database.duckdb";

try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
Expand All @@ -155,21 +157,25 @@ public void testRecordTransformer() throws Exception {

clientWithRecordTransformer.subscribeAll().get();

String duckDBUrl = "jdbc:duckdb:" + tmpDir.getAbsolutePath() + "/my_database.duckdb";
try (Connection connection = DriverManager.getConnection(duckDBUrl);
Statement stmt = connection.createStatement()) {

try (ResultSet rs = stmt.executeQuery("SELECT * FROM current_version")) {
int rowCount = 0;
while (rs.next()) {
rowCount++;
}
assertEquals(rowCount, DEFAULT_USER_DATA_RECORD_COUNT);
}
}
assertRowCount(duckDBUrl, "subscribeAll() finishes!");

clientWithRecordTransformer.unsubscribeAll();
}

assertRowCount(duckDBUrl, "DVC gets closed!");
}

private void assertRowCount(String duckDBUrl, String assertionErrorMsg) throws SQLException {
try (Connection connection = DriverManager.getConnection(duckDBUrl);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT count(*) FROM current_version")) {
assertTrue(rs.next());
int rowCount = rs.getInt(1);
assertEquals(
rowCount,
DEFAULT_USER_DATA_RECORD_COUNT,
"The DB should contain " + DEFAULT_USER_DATA_RECORD_COUNT + " right after " + assertionErrorMsg);
}
}

protected void setUpStore(
Expand Down

0 comments on commit d1abe37

Please sign in to comment.