From d1abe377453de956ffab2dc74d3806c38a653c0a Mon Sep 17 00:00:00 2001 From: Felix GV Date: Thu, 16 Jan 2025 16:59:50 -0500 Subject: [PATCH] [duckdb] Do not wipe current version in onEndVersionIngestion (#1448) Only wipe the non-current versions (e.g. backup). --- .../DuckDBDaVinciRecordTransformer.java | 12 +++++--- ...VinciRecordTransformerIntegrationTest.java | 30 +++++++++++-------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java b/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java index 0b12791e33..5215dfb80e 100644 --- a/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java +++ b/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java @@ -138,10 +138,14 @@ public void onEndVersionIngestion(int currentVersion) { String createViewStatement = String.format(createViewStatementTemplate, currentVersionTableName); stmt.execute(createViewStatement); - // Unable to convert to prepared statement as table and column names can't be parameterized - // Drop DuckDB table for storeVersion as it's retired - String dropTableStatement = String.format(dropTableStatementTemplate, versionTableName); - stmt.execute(dropTableStatement); + if (currentVersion != getStoreVersion()) { + // Only drop non-current versions, e.g., the backup version getting retired. + + // Unable to convert to prepared statement as table and column names can't be parameterized + // Drop DuckDB table for storeVersion as it's retired + String dropTableStatement = String.format(dropTableStatementTemplate, versionTableName); + stmt.execute(dropTableStatement); + } } catch (SQLException e) { throw new RuntimeException(e); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DuckDBDaVinciRecordTransformerIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DuckDBDaVinciRecordTransformerIntegrationTest.java index 3faaa99688..71b1acb0f2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DuckDBDaVinciRecordTransformerIntegrationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DuckDBDaVinciRecordTransformerIntegrationTest.java @@ -52,6 +52,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.Collections; import java.util.Properties; @@ -129,6 +130,7 @@ public void testRecordTransformer() throws Exception { VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); MetricsRepository metricsRepository = new MetricsRepository(); + String duckDBUrl = "jdbc:duckdb:" + tmpDir.getAbsolutePath() + "/my_database.duckdb"; try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, @@ -155,21 +157,25 @@ public void testRecordTransformer() throws Exception { clientWithRecordTransformer.subscribeAll().get(); - String duckDBUrl = "jdbc:duckdb:" + tmpDir.getAbsolutePath() + "/my_database.duckdb"; - try (Connection connection = DriverManager.getConnection(duckDBUrl); - Statement stmt = connection.createStatement()) { - - try (ResultSet rs = stmt.executeQuery("SELECT * FROM current_version")) { - int rowCount = 0; - while (rs.next()) { - rowCount++; - } - assertEquals(rowCount, DEFAULT_USER_DATA_RECORD_COUNT); - } - } + assertRowCount(duckDBUrl, "subscribeAll() finishes!"); clientWithRecordTransformer.unsubscribeAll(); } + + assertRowCount(duckDBUrl, "DVC gets closed!"); + } + + private void assertRowCount(String duckDBUrl, String assertionErrorMsg) throws SQLException { + try (Connection connection = DriverManager.getConnection(duckDBUrl); + Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery("SELECT count(*) FROM current_version")) { + assertTrue(rs.next()); + int rowCount = rs.getInt(1); + assertEquals( + rowCount, + DEFAULT_USER_DATA_RECORD_COUNT, + "The DB should contain " + DEFAULT_USER_DATA_RECORD_COUNT + " right after " + assertionErrorMsg); + } } protected void setUpStore(