diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index f4a0dbd13..0c26bd1d5 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -445,10 +445,8 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm newPartitionList.add(curPartitionInfo.build()); } } else if (commitOp.equals(CommitOp.CompactionCommit) || commitOp.equals(CommitOp.UpdateCommit)) { - if (readPartitionInfo != null) { - for (PartitionInfo p : readPartitionInfo) { - readPartitionMap.put(p.getPartitionDesc(), p); - } + for (PartitionInfo p : readPartitionInfo) { + readPartitionMap.put(p.getPartitionDesc(), p); } for (PartitionInfo partitionInfo : listPartitionInfo) { String partitionDesc = partitionInfo.getPartitionDesc(); @@ -507,6 +505,32 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm newMap.put(partitionDesc, curPartitionInfo.build()); newPartitionList.add(curPartitionInfo.build()); } + } else if (commitOp.equals(CommitOp.DeleteCommit)) { + for (PartitionInfo p : readPartitionInfo) { + readPartitionMap.put(p.getPartitionDesc(), p); + } + + for (PartitionInfo partitionInfo : listPartitionInfo) { + String partitionDesc = partitionInfo.getPartitionDesc(); + PartitionInfo.Builder curPartitionInfo = getOrCreateCurPartitionInfo(curMap, partitionDesc, tableId).toBuilder(); + int curVersion = curPartitionInfo.getVersion(); + + PartitionInfo readPartition = readPartitionMap.get(partitionDesc); + if (readPartition == null) { + continue; + } + + int newVersion = curVersion + 1; + curPartitionInfo + .setVersion(newVersion) + .setCommitOp(commitOp) + .setExpression(partitionInfo.getExpression()) + .clearSnapshot(); + + newMap.put(partitionDesc, curPartitionInfo.build()); + newPartitionList.add(curPartitionInfo.build()); + } + } else { throw new IllegalStateException("this operation is Illegal of the table:" + tableInfo.getTablePath()); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java index a7329aad8..b32694b01 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java @@ -103,14 +103,18 @@ public List commit(List listPartitionInfo) { + private List> remainingPartitions; + + public LakeSoulRowLevelModificationScanContext(SupportsRowLevelModificationScan.RowLevelModificationType type, List listPartitionInfo) { + this.type = type; sourcePartitionInfo = JniWrapper.newBuilder().addAllPartitionInfo(listPartitionInfo).build(); + remainingPartitions = null; } public JniWrapper getSourcePartitionInfo() { @@ -24,4 +29,16 @@ public JniWrapper getSourcePartitionInfo() { public String getBas64EncodedSourcePartitionInfo() { return Base64.getEncoder().encodeToString(getSourcePartitionInfo().toByteArray()); } + + public SupportsRowLevelModificationScan.RowLevelModificationType getType() { + return type; + } + + public void setRemainingPartitions(List> remainingPartitions) { + this.remainingPartitions = remainingPartitions; + } + + public List> getRemainingPartitions() { + return remainingPartitions; + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java index f81fed287..40364ade6 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java @@ -4,7 +4,6 @@ package org.apache.flink.lakesoul.table; -import com.dmetasoul.lakesoul.meta.entity.JniWrapper; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; @@ -36,7 +35,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Optional; @@ -54,6 +52,7 @@ public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning private final List primaryKeyList; private final List partitionKeyList; private boolean overwrite; + private LakeSoulRowLevelModificationScanContext modificationContext; public LakeSoulTableSink(String summaryName, String tableName, DataType dataType, List primaryKeyList, List partitionKeyList, ReadableConfig flinkConf, ResolvedSchema schema) { @@ -113,6 +112,12 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { */ private DataStreamSink createStreamingSink(DataStream dataStream, Context sinkContext) throws IOException { + + if (modificationContext != null) { + if (modificationContext.getRemainingPartitions() != null) { + flinkConf.set(DML_TYPE, PARTITION_DELETE); + } + } Path path = FlinkUtil.makeQualifiedPath(new Path(flinkConf.getString(CATALOG_PATH))); int bucketParallelism = flinkConf.getInteger(HASH_BUCKET_NUM); //rowData key tools @@ -162,8 +167,10 @@ public void applyStaticPartition(Map map) { @Override public RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) { + if (context instanceof LakeSoulRowLevelModificationScanContext) { - flinkConf.set(SOURCE_PARTITION_INFO, ((LakeSoulRowLevelModificationScanContext) context).getBas64EncodedSourcePartitionInfo()); + this.modificationContext = (LakeSoulRowLevelModificationScanContext) context; + flinkConf.set(SOURCE_PARTITION_INFO, modificationContext.getBas64EncodedSourcePartitionInfo()); if (flinkConf.getBoolean(USE_CDC, false)) { flinkConf.set(DML_TYPE, DELETE_CDC); } else { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java index 2e2b0689b..1021eec75 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java @@ -61,6 +61,7 @@ public class LakeSoulTableSource protected List> remainingPartitions; protected FilterPredicate filter; + protected LakeSoulRowLevelModificationScanContext modificationContext; public LakeSoulTableSource(TableId tableId, RowType rowType, @@ -72,6 +73,7 @@ public LakeSoulTableSource(TableId tableId, this.isStreaming = isStreaming; this.pkColumns = pkColumns; this.optionParams = optionParams; + this.modificationContext = null; } @Override @@ -134,10 +136,33 @@ public Optional>> listPartitions() { @Override public void applyPartitions(List> remainingPartitions) { - this.remainingPartitions = remainingPartitions; + if (isDelete()) { + this.remainingPartitions = complementPartition(remainingPartitions); + getModificationContext().setRemainingPartitions(this.remainingPartitions); + } else { + this.remainingPartitions = remainingPartitions; + } LOG.info("Applied partitions to native io: {}", this.remainingPartitions); } + private boolean isDelete() { + LakeSoulRowLevelModificationScanContext context = getModificationContext(); + return context != null && context.getType() == RowLevelModificationType.DELETE; + } + + private List> complementPartition(List> remainingPartitions) { + List allPartitionInfo = listPartitionInfo(); + Set remainingPartitionDesc = remainingPartitions.stream().map(DBUtil::formatPartitionDesc).collect(Collectors.toSet()); + List> partitions = new ArrayList<>(); + for (PartitionInfo info : allPartitionInfo) { + String partitionDesc = info.getPartitionDesc(); + if (!partitionDesc.equals(DBConfig.LAKESOUL_NON_PARTITION_TABLE_PART_DESC) && !remainingPartitionDesc.contains(partitionDesc)) { + partitions.add(DBUtil.parsePartitionDesc(partitionDesc)); + } + } + return partitions; + } + @Override public boolean supportsNestedProjection() { return false; @@ -242,9 +267,14 @@ public RowLevelModificationScanContext applyRowLevelModificationScan( @Nullable RowLevelModificationScanContext previousContext) { if (previousContext == null || previousContext instanceof LakeSoulRowLevelModificationScanContext) { - // TODO: 2024/3/22 partiontion pruning should be handled - return new LakeSoulRowLevelModificationScanContext(listPartitionInfo()); + // TODO: 2024/3/22 partiontion pruning should be handled + this.modificationContext = new LakeSoulRowLevelModificationScanContext(rowLevelModificationType, listPartitionInfo()); + return modificationContext; } throw new RuntimeException("LakeSoulTableSource.applyRowLevelModificationScan only supports LakeSoulRowLevelModificationScanContext"); } + + public LakeSoulRowLevelModificationScanContext getModificationContext() { + return modificationContext; + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java index a623a2acd..4d00807a8 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java @@ -11,7 +11,6 @@ import com.dmetasoul.lakesoul.meta.dao.TableInfoDao; import com.dmetasoul.lakesoul.meta.entity.TableInfo; import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileSystem; @@ -22,7 +21,6 @@ import org.apache.flink.table.api.Schema.Builder; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.exceptions.CatalogException; @@ -38,12 +36,9 @@ import org.apache.flink.types.RowKind; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import java.io.IOException; import java.time.LocalDateTime; @@ -56,26 +51,16 @@ import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType; -import static org.apache.spark.sql.types.DataTypes.StringType; public class FlinkUtil { private static final Logger LOG = LoggerFactory.getLogger(FlinkUtil.class); - private FlinkUtil() { - } - - private static final String NOT_NULL = " NOT NULL"; - public static String convert(TableSchema schema) { return schema.toRowDataType().toString(); } - public static String getRangeValue(CatalogPartitionSpec cps) { - return "Null"; - } - public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(RowType rowType, Optional cdcColumn) throws CatalogException { List fields = new ArrayList<>(); String cdcColName = null; @@ -145,95 +130,6 @@ public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchem return new org.apache.arrow.vector.types.pojo.Schema(fields); } - public static StructType toSparkSchema(RowType rowType, Optional cdcColumn) throws CatalogException { - StructType stNew = new StructType(); - - for (RowType.RowField field : rowType.getFields()) { - String name = field.getName(); - if (name.equals(SORT_FIELD)) continue; - LogicalType logicalType = field.getType(); - org.apache.spark.sql.types.DataType - dataType = - org.apache.spark.sql.arrow.ArrowUtils.fromArrowField(ArrowUtils.toArrowField(name, logicalType)); - stNew = stNew.add(name, dataType, logicalType.isNullable()); - } - - if (cdcColumn.isPresent()) { - String cdcColName = cdcColumn.get(); - StructField cdcField = new StructField(cdcColName, StringType, false, null); - Option cdcFieldIndex = stNew.getFieldIndex(cdcColName); - - if (cdcFieldIndex.isEmpty()) { - stNew = stNew.add(cdcField); - } else { - StructField field = stNew.fields()[(Integer) cdcFieldIndex.get()]; - if (!field.toString().equals(cdcField.toString())) - throw new CatalogException(CDC_CHANGE_COLUMN + - "=" + - cdcColName + - "has an invalid field of" + - field + - "," + - CDC_CHANGE_COLUMN + - " require field of " + - cdcField); - } - } - return stNew; - } - - public static StructType toSparkSchema(TableSchema tsc, Optional cdcColumn) throws CatalogException { - StructType stNew = new StructType(); - - for (int i = 0; i < tsc.getFieldCount(); i++) { - String name = tsc.getFieldName(i).get(); - DataType dt = tsc.getFieldDataType(i).get(); - org.apache.spark.sql.types.DataType - dataType = - org.apache.spark.sql.arrow.ArrowUtils.fromArrowField(ArrowUtils.toArrowField(name, - dt.getLogicalType())); - stNew = stNew.add(name, dataType, dt.getLogicalType().isNullable()); - } - if (cdcColumn.isPresent()) { - String cdcColName = cdcColumn.get(); - StructField cdcField = new StructField(cdcColName, StringType, false, Metadata.empty()); - Option cdcFieldIndex = stNew.getFieldIndex(cdcColName); - - if (cdcFieldIndex.isEmpty()) { - stNew = stNew.add(cdcField); - } else { - StructField field = stNew.fields()[(Integer) cdcFieldIndex.get()]; - if (!field.toString().equals(cdcField.toString())) - throw new CatalogException(CDC_CHANGE_COLUMN + - "=" + - cdcColName + - " has an invalid field of " + - field + - "," + - CDC_CHANGE_COLUMN + - " require field of " + - cdcField); - } - } - return stNew; - } - - public static StringData rowKindToOperation(String rowKind) { - if ("+I".equals(rowKind)) { - return StringData.fromString("insert"); - } - if ("-U".equals(rowKind)) { - return StringData.fromString("delete"); - } - if ("+U".equals(rowKind)) { - return StringData.fromString("update"); - } - if ("-D".equals(rowKind)) { - return StringData.fromString("delete"); - } - return null; - } - public static StringData rowKindToOperation(RowKind rowKind) { if (RowKind.INSERT.equals(rowKind)) { return StringData.fromString("insert"); @@ -622,7 +518,7 @@ public static void createAndSetTableDirPermission(Path p) throws IOException { } org.apache.hadoop.fs.Path tbDir = HadoopFileSystem.toHadoopPath(p); if (hdfs.exists(tbDir)) { - throw new IOException("Table directory already exists: " + tbDir.toString()); + throw new IOException("Table directory already exists: " + tbDir); } hdfs.mkdirs(tbDir); hdfs.setOwner(tbDir, userName, domain); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java index f4268032b..5a55fc0b6 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java @@ -36,6 +36,7 @@ public class LakeSoulSinkOptions { public static final String DELETE_CDC = "delete_cdc"; + public static final String PARTITION_DELETE = "part_delete"; public static final String UPDATE = "update"; public static final String INSERT = "insert"; diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java index 2db71aab9..cc4b614d2 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java @@ -224,27 +224,27 @@ public void testLakeSoulTableSinkDeleteWithParallelismInBatch() { final TableEnvironment tEnv = LakeSoulTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); testLakeSoulTableSinkDeleteWithParallelismBase( tEnv, "== Abstract Syntax Tree ==\n" + - "LogicalSink(table=[lakesoul.db1.test_table], fields=[id, real_col])\n" + - "+- LogicalProject(id=[$0], real_col=[$1])\n" + - " +- LogicalFilter(condition=[false])\n" + + "LogicalSink(table=[lakesoul.db1.test_table], fields=[id, real_col, part])\n" + + "+- LogicalProject(id=[$0], real_col=[$1], part=[$2])\n" + + " +- LogicalFilter(condition=[NOT(=($2, _UTF-16LE'1'))])\n" + " +- LogicalTableScan(table=[[lakesoul, db1, test_table]])\n" + "\n" + "== Optimized Physical Plan ==\n" + - "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col])\n" + - "+- Values(tuples=[[]], values=[id, real_col])\n" + + "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col, part])\n" + + "+- TableSourceScan(table=[[lakesoul, db1, test_table, partitions=[]]], fields=[id, real_col, part])\n" + "\n" + "== Optimized Execution Plan ==\n" + - "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col])\n" + - "+- Values(tuples=[[]], values=[id, real_col])\n" + + "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col, part])\n" + + "+- TableSourceScan(table=[[lakesoul, db1, test_table, partitions=[]]], fields=[id, real_col, part])\n" + "\n" + "== Physical Execution Plan ==\n" + "{\n" + " \"nodes\" : [ {\n" + " \"id\" : ,\n" + - " \"type\" : \"Source: Values[]\",\n" + + " \"type\" : \"Source: test_table[]\",\n" + " \"pact\" : \"Data Source\",\n" + - " \"contents\" : \"[]:Values(tuples=[[]], values=[id, real_col])\",\n" + - " \"parallelism\" : 1\n" + + " \"contents\" : \"[]:TableSourceScan(table=[[lakesoul, db1, test_table, partitions=[]]], fields=[id, real_col, part])\",\n" + + " \"parallelism\" : 2\n" + " }, {\n" + " \"id\" : ,\n" + " \"type\" : \"Sink: Writer\",\n" + @@ -253,7 +253,7 @@ public void testLakeSoulTableSinkDeleteWithParallelismInBatch() { " \"parallelism\" : 2,\n" + " \"predecessors\" : [ {\n" + " \"id\" : ,\n" + - " \"ship_strategy\" : \"REBALANCE\",\n" + + " \"ship_strategy\" : \"FORWARD\",\n" + " \"side\" : \"second\"\n" + " } ]\n" + " }, {\n" + @@ -312,6 +312,7 @@ private void testLakeSoulTableSinkWithParallelismBase( final String actual = tEnv.explainSql( "insert into test_table select 1, 1", ExplainDetail.JSON_EXECUTION_PLAN); + System.out.println(actual); String plan = replaceFlinkVersion(replaceNodeIdInOperator(replaceExecNodeId(replaceStreamNodeId(replaceStageId(actual))))); System.out.println(plan); assertEquals(expected, plan); @@ -331,8 +332,11 @@ private void testLakeSoulTableSinkDeleteWithParallelismBase( String.format( "CREATE TABLE test_table (" + " id int," - + " real_col int" - + ") WITH (" + + " real_col int, " + + " part string " + + ") " + + " PARTITIONED BY ( part )" + + " WITH (" + "'" + HASH_BUCKET_NUM.key() + "'= '3'," @@ -347,10 +351,10 @@ private void testLakeSoulTableSinkDeleteWithParallelismBase( + ")")); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tEnv.executeSql( - "insert into test_table select 1, 1"); + "insert into test_table select 1, 1, '1'"); final String actual = tEnv.explainSql( - "delete from test_table", ExplainDetail.JSON_EXECUTION_PLAN); + "delete from test_table where part='1' ", ExplainDetail.JSON_EXECUTION_PLAN); String plan = replaceFlinkVersion(replaceNodeIdInOperator(replaceExecNodeId(replaceStreamNodeId(replaceStageId(actual))))); System.out.println(plan); assertEquals(expected, plan); diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java index 68b2137f2..e89b362ea 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java @@ -36,6 +36,23 @@ public void testInsertSQL() throws ExecutionException, InterruptedException { new String[]{"+I[2, Alice, 80]", "+I[3, Jack, 75]", "+I[4, Mike, 70]"}); } + @Test + public void testInsertPartitionTableSQL() throws ExecutionException, InterruptedException { + TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); + createLakeSoulSourceTableUserWithRange(tEnv); + tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75)").await(); + StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE); + String testSelect = "select * from user_info_1"; + TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); + List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Jack, 75]"}); + tEnv.executeSql("INSERT INTO user_info_1 VALUES (4, 'Mike', 70)").await(); + TableImpl flinkTable1 = (TableImpl) streamEnv.sqlQuery(testSelect); + List results1 = CollectionUtil.iteratorToList(flinkTable1.execute().collect()); + TestUtils.checkEqualInAnyOrder(results1, + new String[]{"+I[2, Alice, 80]", "+I[3, Jack, 75]", "+I[4, Mike, 70]"}); + } + @Test public void testUpdateNonPkAndPartitionSQL() throws ExecutionException, InterruptedException { @@ -164,12 +181,13 @@ public void testDeleteCDCPkSQL() throws ExecutionException, InterruptedException } @Test - public void testDeletePartitionSQLNotSupported() throws ExecutionException, InterruptedException { + public void testDeletePartitionAndPkSQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUserWithRange(tEnv); - tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95)").await(); + tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); try { - tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3").await(); + // LakeSoulTableSource::applyPartition will not be called and LakeSoulTableSource::applyFilters will be called + tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3 and name = 'Jack'").await(); } catch (Throwable e) { System.out.println("Unsupported DELETE SQL"); } @@ -177,9 +195,26 @@ public void testDeletePartitionSQLNotSupported() throws ExecutionException, Inte String testSelect = "select * from user_info_1"; TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); - TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Jack, 75]", "+I[3, Amy, 95]"}); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]", "+I[4, Bob, 110]"}); } + @Test + public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedException { + TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); + createLakeSoulSourceTableUserWithRange(tEnv); + tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + try { + // LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called + tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3").await(); + } catch (Throwable e) { + System.out.println("Unsupported DELETE SQL"); + } + StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE); + String testSelect = "select * from user_info_1"; + TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); + List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"}); + } private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createUserSql = "create table user_info (" +