From 5e193e606ceae0855169997cb9bf67b8c954d4e1 Mon Sep 17 00:00:00 2001 From: Ceng <441651826@qq.com> Date: Tue, 26 Mar 2024 10:29:36 +0800 Subject: [PATCH] [Flink]fix readPartitionInfo on UpdateCommit (#458) * [Flink]fix readPartitionInfo on UpdateCommit Signed-off-by: zenghua * fix testLakeSoulTableSinkDeleteWithParallelismInBatch expected result Signed-off-by: zenghua * add todo memo Signed-off-by: zenghua --------- Signed-off-by: zenghua Co-authored-by: zenghua --- .../dmetasoul/lakesoul/meta/DBManager.java | 8 +- .../sink/committer/LakeSoulSinkCommitter.java | 18 ++-- .../LakeSoulMultiTableSinkCommittable.java | 51 ++++++++- .../LakeSoulSinkCommittableSerializer.java | 5 +- .../AbstractLakeSoulMultiTableSinkWriter.java | 5 +- .../sink/writer/LakeSoulWriterBucket.java | 8 +- .../flink/lakesoul/source/LakeSoulSource.java | 32 +++--- ...keSoulRowLevelModificationScanContext.java | 27 +++++ .../lakesoul/table/LakeSoulTableSink.java | 26 +++-- .../lakesoul/table/LakeSoulTableSource.java | 18 +++- .../lakesoul/tool/LakeSoulSinkOptions.java | 10 +- .../connector/sink/LakeSoulTableSinkCase.java | 102 ++++++++++++++++++ .../lakesoul/test/flinkSource/DMLSuite.java | 15 +-- 13 files changed, 270 insertions(+), 55 deletions(-) create mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index 81f2fcc1c..f4a0dbd13 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -469,7 +469,8 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm Set middleCommitOps = partitionInfoDao.getCommitOpsBetweenVersions(tableId, partitionDesc, readPartitionVersion + 1, curVersion); if (commitOp.equals(CommitOp.UpdateCommit)) { - if (middleCommitOps.contains(CommitOp.UpdateCommit) || + // TODO: 2024/3/22 further considering for this UpdateCommit conflict case + if ( (middleCommitOps.size() > 1 && middleCommitOps.contains(CommitOp.CompactionCommit))) { throw new IllegalStateException( "current operation conflicts with other data writing tasks, table path: " + @@ -831,7 +832,7 @@ private String getNameSpaceDomain(String namespace) { return namespaceInfo.getDomain(); } - public void commitDataCommitInfo(DataCommitInfo dataCommitInfo) { + public void commitDataCommitInfo(DataCommitInfo dataCommitInfo, List readPartitionInfoList) { String tableId = dataCommitInfo.getTableId(); String partitionDesc = dataCommitInfo.getPartitionDesc().replaceAll("/", LAKESOUL_RANGE_PARTITION_SPLITTER); Uuid commitId = dataCommitInfo.getCommitId(); @@ -863,6 +864,9 @@ public void commitDataCommitInfo(DataCommitInfo dataCommitInfo) { metaInfo.setTableInfo(tableInfo); metaInfo.addAllListPartition(partitionInfoList); + if (readPartitionInfoList != null) { + metaInfo.addAllReadPartitionInfo(readPartitionInfoList); + } commitData(metaInfo.build(), false, commitOp); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java index 0ae7f13a7..a7329aad8 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java @@ -22,10 +22,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; import static com.dmetasoul.lakesoul.meta.DBConfig.*; @@ -96,6 +93,7 @@ public List commit(List readPartitionInfoList = null; TableNameId tableNameId = lakeSoulDBManager.shortTableName(identity.tableId.table(), identity.tableId.schema()); @@ -105,9 +103,15 @@ public List commit(List commit(List pendingFiles; @@ -47,9 +50,19 @@ public LakeSoulMultiTableSinkCommittable( String bucketId, List pendingFiles, long creationTime, - TableSchemaIdentity identity, long tsMs, String dmlType) { - this(bucketId, identity, pendingFiles, creationTime, - UUID.randomUUID().toString(), tsMs,dmlType + TableSchemaIdentity identity, + long tsMs, + String dmlType, + String sourcePartitionInfo + ) { + this(bucketId, + identity, + pendingFiles, + creationTime, + UUID.randomUUID().toString(), + tsMs, + dmlType, + sourcePartitionInfo ); } @@ -63,7 +76,10 @@ public LakeSoulMultiTableSinkCommittable( @Nullable List pendingFiles, long time, @Nullable String commitId, - long tsMs, String dmlType) { + long tsMs, + String dmlType, + String sourcePartitionInfo + ) { this.bucketId = bucketId; this.identity = identity; this.pendingFiles = pendingFiles; @@ -71,6 +87,7 @@ public LakeSoulMultiTableSinkCommittable( this.commitId = commitId; this.tsMs = tsMs; this.dmlType = dmlType; + this.sourcePartitionInfo = sourcePartitionInfo; } public long getTsMs() { @@ -128,9 +145,35 @@ public void merge(LakeSoulMultiTableSinkCommittable committable) { } else { if (committable.hasPendingFile()) pendingFiles = committable.getPendingFiles(); } + mergeSourcePartitionInfo(committable); + } + + private void mergeSourcePartitionInfo(LakeSoulMultiTableSinkCommittable committable) { + if (sourcePartitionInfo == null) { + sourcePartitionInfo = committable.getSourcePartitionInfo(); + } else { + try { + JniWrapper jniWrapper = JniWrapper + .parseFrom(sourcePartitionInfo.getBytes()) + .toBuilder() + .addAllPartitionInfo( + JniWrapper + .parseFrom(committable.getSourcePartitionInfo().getBytes()) + .getPartitionInfoList() + ) + .build(); + sourcePartitionInfo = new String(jniWrapper.toByteArray()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } } public String getDmlType() { return dmlType; } + + public String getSourcePartitionInfo() { + return sourcePartitionInfo; + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java index 026f329dd..c846855b6 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java @@ -83,6 +83,7 @@ private void serializeV1(LakeSoulMultiTableSinkCommittable committable, DataOutp dataOutputView.writeUTF(committable.getCommitId()); dataOutputView.writeLong(committable.getTsMs()); dataOutputView.writeUTF(committable.getDmlType()); + dataOutputView.writeUTF(committable.getSourcePartitionInfo()); } else { dataOutputView.writeBoolean(false); } @@ -98,6 +99,7 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV long time = Long.MIN_VALUE; long dataTsMs = Long.MAX_VALUE; String dmlType = null; + String sourcePartitionInfo = ""; if (dataInputView.readBoolean()) { int size = dataInputView.readInt(); if (size > 0) { @@ -111,6 +113,7 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV commitId = dataInputView.readUTF(); dataTsMs = dataInputView.readLong(); dmlType = dataInputView.readUTF(); + sourcePartitionInfo = dataInputView.readUTF(); } } @@ -119,7 +122,7 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV String bucketId = dataInputView.readUTF(); return new LakeSoulMultiTableSinkCommittable( - bucketId, identity, pendingFile, time, commitId, dataTsMs,dmlType); + bucketId, identity, pendingFile, time, commitId, dataTsMs, dmlType, sourcePartitionInfo); } private static void validateMagicNumber(DataInputView in) throws IOException { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java index baeada13d..17255c3a4 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java @@ -173,7 +173,8 @@ public void write(IN element, Context context) throws IOException { @Override public List prepareCommit(boolean flush) throws IOException { List committables = new ArrayList<>(); - String dmlType = this.conf.getString(LakeSoulSinkOptions.DMLTYPE); + String dmlType = this.conf.getString(LakeSoulSinkOptions.DML_TYPE); + String sourcePartitionInfo = this.conf.getString(LakeSoulSinkOptions.SOURCE_PARTITION_INFO); // Every time before we prepare commit, we first check and remove the inactive // buckets. Checking the activeness right before pre-committing avoid re-creating // the bucket every time if the bucket use OnCheckpointingRollingPolicy. @@ -184,7 +185,7 @@ public List prepareCommit(boolean flush) thro if (!entry.getValue().isActive()) { activeBucketIt.remove(); } else { - committables.addAll(entry.getValue().prepareCommit(flush,dmlType)); + committables.addAll(entry.getValue().prepareCommit(flush, dmlType, sourcePartitionInfo)); } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java index 8e53dbd8c..63484794b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java @@ -149,7 +149,7 @@ void write(RowData element, long currentTime, long tsMs) throws IOException { inProgressPartWriter.write(element, currentTime); } - List prepareCommit(boolean flush,String dmlType) throws IOException { + List prepareCommit(boolean flush, String dmlType, String sourcePartitionInfo) throws IOException { // we always close part file and do not keep in-progress file // since the native parquet writer doesn't support resume if (inProgressPartWriter != null) { @@ -167,7 +167,11 @@ List prepareCommit(boolean flush,String dmlTy committables.add(new LakeSoulMultiTableSinkCommittable( bucketId, tmpPending, - time, tableId, tsMs, dmlType)); + time, + tableId, + tsMs, + dmlType, + sourcePartitionInfo)); pendingFiles.clear(); return committables; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java index 6f10d9bee..edbba5a45 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java @@ -92,7 +92,7 @@ public SourceReader createReader(SourceReaderContext rea @Override public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { - TableInfo tif = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableId.table(), + TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableId.table(), tableId.schema()); List readStartTimestampWithTimeZone = Arrays.asList(optionParams.getOrDefault(LakeSoulOptions.READ_START_TIME(), ""), @@ -114,29 +114,29 @@ public SplitEnumerator createEnumerator( new LakeSoulDynSplitAssigner(optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1")), Long.parseLong(optionParams.getOrDefault(LakeSoulOptions.DISCOVERY_INTERVAL(), "30000")), convertTimeFormatWithTimeZone(readStartTimestampWithTimeZone), - tif.getTableId(), + tableInfo.getTableId(), partDesc, optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1")); } else { return staticSplitEnumerator(enumContext, - tif, + tableInfo, readStartTimestampWithTimeZone, readType); } } private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorContext enumContext, - TableInfo tif, + TableInfo tableInfo, List readStartTimestampWithTimeZone, String readType) { List readEndTimestampWithTimeZone = Arrays.asList(optionParams.getOrDefault(LakeSoulOptions.READ_END_TIME(), ""), optionParams.getOrDefault(LakeSoulOptions.TIME_ZONE(), "")); - List dfinfos; + List dataFileInfoList; if (readType.equals("") || readType.equals("fullread")) { - dfinfos = Arrays.asList(getTargetDataFileInfo(tif)); + dataFileInfoList = Arrays.asList(getTargetDataFileInfo(tableInfo)); } else { - dfinfos = new ArrayList<>(); + dataFileInfoList = new ArrayList<>(); List partDescs = new ArrayList<>(); String partitionDescOpt = optionParams.getOrDefault(LakeSoulOptions.PARTITION_DESC(), ""); if (partitionDescOpt.isEmpty() && remainingPartitions != null) { @@ -148,7 +148,7 @@ private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorConte partDescs.add(partitionDescOpt); } for (String desc : partDescs) { - dfinfos.addAll(Arrays.asList(DataOperation.getIncrementalPartitionDataInfo(tif.getTableId(), + dataFileInfoList.addAll(Arrays.asList(DataOperation.getIncrementalPartitionDataInfo(tableInfo.getTableId(), desc, convertTimeFormatWithTimeZone(readStartTimestampWithTimeZone), convertTimeFormatWithTimeZone(readEndTimestampWithTimeZone), @@ -157,18 +157,18 @@ private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorConte } int capacity = 100; ArrayList splits = new ArrayList<>(capacity); - if (!FlinkUtil.isExistHashPartition(tif)) { - for (DataFileInfo dfinfo : dfinfos) { + if (!FlinkUtil.isExistHashPartition(tableInfo)) { + for (DataFileInfo dataFileInfo : dataFileInfoList) { ArrayList tmp = new ArrayList<>(); - tmp.add(new Path(dfinfo.path())); - splits.add(new LakeSoulSplit(String.valueOf(dfinfo.hashCode()), + tmp.add(new Path(dataFileInfo.path())); + splits.add(new LakeSoulSplit(String.valueOf(dataFileInfo.hashCode()), tmp, 0)); } } else { Map>> splitByRangeAndHashPartition = - FlinkUtil.splitDataInfosToRangeAndHashPartition(tif.getTableId(), - dfinfos.toArray(new DataFileInfo[0])); + FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo.getTableId(), + dataFileInfoList.toArray(new DataFileInfo[0])); for (Map.Entry>> entry : splitByRangeAndHashPartition.entrySet()) { for (Map.Entry> split : entry.getValue().entrySet()) { splits.add(new LakeSoulSplit(String.valueOf(split.hashCode()), @@ -182,8 +182,8 @@ private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorConte } - private DataFileInfo[] getTargetDataFileInfo(TableInfo tif) { - return FlinkUtil.getTargetDataFileInfo(tif, + private DataFileInfo[] getTargetDataFileInfo(TableInfo tableInfo) { + return FlinkUtil.getTargetDataFileInfo(tableInfo, this.remainingPartitions); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java new file mode 100644 index 000000000..8199cfacc --- /dev/null +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulRowLevelModificationScanContext.java @@ -0,0 +1,27 @@ +package org.apache.flink.lakesoul.table; + +import com.dmetasoul.lakesoul.meta.entity.JniWrapper; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.flink.table.connector.RowLevelModificationScanContext; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.List; + +public class LakeSoulRowLevelModificationScanContext implements RowLevelModificationScanContext { + + private final JniWrapper sourcePartitionInfo; + + public LakeSoulRowLevelModificationScanContext(List listPartitionInfo) { + sourcePartitionInfo = JniWrapper.newBuilder().addAllPartitionInfo(listPartitionInfo).build(); + } + + public JniWrapper getSourcePartitionInfo() { + return sourcePartitionInfo; + } + + public String getBas64EncodedSourcePartitionInfo() { + return Base64.getEncoder().encodeToString(getSourcePartitionInfo().toByteArray()); + } +} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java index d9262ce5c..f81fed287 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSink.java @@ -4,6 +4,7 @@ package org.apache.flink.lakesoul.table; +import com.dmetasoul.lakesoul.meta.entity.JniWrapper; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; @@ -35,6 +36,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Optional; @@ -160,20 +162,28 @@ public void applyStaticPartition(Map map) { @Override public RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) { - if (flinkConf.getBoolean(USE_CDC, false)) { - flinkConf.set(DMLTYPE, DELETE_CDC); - } else { - flinkConf.set(DMLTYPE, DELETE); - } + if (context instanceof LakeSoulRowLevelModificationScanContext) { + flinkConf.set(SOURCE_PARTITION_INFO, ((LakeSoulRowLevelModificationScanContext) context).getBas64EncodedSourcePartitionInfo()); + if (flinkConf.getBoolean(USE_CDC, false)) { + flinkConf.set(DML_TYPE, DELETE_CDC); + } else { + flinkConf.set(DML_TYPE, DELETE); + } - return new LakeSoulRowLevelDelete(); + return new LakeSoulRowLevelDelete(); + } + throw new RuntimeException("LakeSoulTableSink.applyRowLevelDelete only supports LakeSoulRowLevelModificationScanContext"); } @Override public RowLevelUpdateInfo applyRowLevelUpdate(List updatedColumns, @Nullable RowLevelModificationScanContext context) { - flinkConf.set(DMLTYPE, UPDATE); - return new LakeSoulRowLevelUpdate(); + if (context instanceof LakeSoulRowLevelModificationScanContext) { + flinkConf.set(SOURCE_PARTITION_INFO, ((LakeSoulRowLevelModificationScanContext) context).getBas64EncodedSourcePartitionInfo()); + flinkConf.set(DML_TYPE, UPDATE); + return new LakeSoulRowLevelUpdate(); + } + throw new RuntimeException("LakeSoulTableSink.applyRowLevelUpdate only supports LakeSoulRowLevelModificationScanContext"); } private class LakeSoulRowLevelDelete implements RowLevelDeleteInfo { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java index 4e79afe84..2e2b0689b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java @@ -122,10 +122,7 @@ public Result applyFilters(List filters) { @Override public Optional>> listPartitions() { - DBManager dbManager = new DBManager(); - TableInfo tableInfo = - dbManager.getTableInfoByNameAndNamespace(tableId.table(), tableId.schema()); - List allPartitionInfo = dbManager.getAllPartitionInfo(tableInfo.getTableId()); + List allPartitionInfo = listPartitionInfo(); List> partitions = new ArrayList<>(); for (PartitionInfo info : allPartitionInfo) { if (!info.getPartitionDesc().equals(DBConfig.LAKESOUL_NON_PARTITION_TABLE_PART_DESC)) { @@ -151,6 +148,13 @@ public void applyProjection(int[][] projectedFields) { this.projectedFields = projectedFields; } + private List listPartitionInfo() { + DBManager dbManager = new DBManager(); + TableInfo tableInfo = + dbManager.getTableInfoByNameAndNamespace(tableId.table(), tableId.schema()); + return dbManager.getAllPartitionInfo(tableInfo.getTableId()); + } + private int[] getFieldIndexs() { return (projectedFields == null || projectedFields.length == 0) ? IntStream.range(0, this.rowType.getFieldCount()).toArray() : @@ -237,6 +241,10 @@ public RowLevelModificationScanContext applyRowLevelModificationScan( RowLevelModificationType rowLevelModificationType, @Nullable RowLevelModificationScanContext previousContext) { - return null; + if (previousContext == null || previousContext instanceof LakeSoulRowLevelModificationScanContext) { + // TODO: 2024/3/22 partiontion pruning should be handled + return new LakeSoulRowLevelModificationScanContext(listPartitionInfo()); + } + throw new RuntimeException("LakeSoulTableSource.applyRowLevelModificationScan only supports LakeSoulRowLevelModificationScanContext"); } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java index 87051d84e..f4268032b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java @@ -113,11 +113,17 @@ public class LakeSoulSinkOptions { .stringType() .defaultValue("Asia/Shanghai") .withDescription("server time zone"); - public static final ConfigOption DMLTYPE = ConfigOptions + public static final ConfigOption DML_TYPE = ConfigOptions .key("dml_type") .stringType() .defaultValue(INSERT) - .withDescription("dmltype"); + .withDescription("DML type"); + + public static final ConfigOption SOURCE_PARTITION_INFO = ConfigOptions + .key("source_partition_info") + .stringType() + .defaultValue("") + .withDescription("Protobuf-encoded source partition info from Update/Delete DML"); public static final ConfigOption LAKESOUL_VIEW = ConfigOptions .key("lakesoul_view") .booleanType() diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java index 1e94537d0..2db71aab9 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulTableSinkCase.java @@ -219,6 +219,69 @@ public void testLakeSoulTableSinkWithParallelismInStreaming() { "}"); } + @Test + public void testLakeSoulTableSinkDeleteWithParallelismInBatch() { + final TableEnvironment tEnv = LakeSoulTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); + testLakeSoulTableSinkDeleteWithParallelismBase( + tEnv, "== Abstract Syntax Tree ==\n" + + "LogicalSink(table=[lakesoul.db1.test_table], fields=[id, real_col])\n" + + "+- LogicalProject(id=[$0], real_col=[$1])\n" + + " +- LogicalFilter(condition=[false])\n" + + " +- LogicalTableScan(table=[[lakesoul, db1, test_table]])\n" + + "\n" + + "== Optimized Physical Plan ==\n" + + "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col])\n" + + "+- Values(tuples=[[]], values=[id, real_col])\n" + + "\n" + + "== Optimized Execution Plan ==\n" + + "Sink(table=[lakesoul.db1.test_table], fields=[id, real_col])\n" + + "+- Values(tuples=[[]], values=[id, real_col])\n" + + "\n" + + "== Physical Execution Plan ==\n" + + "{\n" + + " \"nodes\" : [ {\n" + + " \"id\" : ,\n" + + " \"type\" : \"Source: Values[]\",\n" + + " \"pact\" : \"Data Source\",\n" + + " \"contents\" : \"[]:Values(tuples=[[]], values=[id, real_col])\",\n" + + " \"parallelism\" : 1\n" + + " }, {\n" + + " \"id\" : ,\n" + + " \"type\" : \"Sink: Writer\",\n" + + " \"pact\" : \"Operator\",\n" + + " \"contents\" : \"Sink: Writer\",\n" + + " \"parallelism\" : 2,\n" + + " \"predecessors\" : [ {\n" + + " \"id\" : ,\n" + + " \"ship_strategy\" : \"REBALANCE\",\n" + + " \"side\" : \"second\"\n" + + " } ]\n" + + " }, {\n" + + " \"id\" : ,\n" + + " \"type\" : \"Sink: Committer\",\n" + + " \"pact\" : \"Operator\",\n" + + " \"contents\" : \"Sink: Committer\",\n" + + " \"parallelism\" : 2,\n" + + " \"predecessors\" : [ {\n" + + " \"id\" : ,\n" + + " \"ship_strategy\" : \"FORWARD\",\n" + + " \"side\" : \"second\"\n" + + " } ]\n" + + " }, {\n" + + " \"id\" : ,\n" + + " \"type\" : \"Sink: Global Committer\",\n" + + " \"pact\" : \"Operator\",\n" + + " \"contents\" : \"Sink: Global Committer\",\n" + + " \"parallelism\" : 1,\n" + + " \"predecessors\" : [ {\n" + + " \"id\" : ,\n" + + " \"ship_strategy\" : \"GLOBAL\",\n" + + " \"side\" : \"second\"\n" + + " } ]\n" + + " } ]\n" + + "}"); + } + private void testLakeSoulTableSinkWithParallelismBase( final TableEnvironment tEnv, String expected) { tEnv.registerCatalog(lakeSoulCatalog.getName(), lakeSoulCatalog); @@ -256,6 +319,45 @@ private void testLakeSoulTableSinkWithParallelismBase( tEnv.executeSql("drop database db1 cascade"); } + private void testLakeSoulTableSinkDeleteWithParallelismBase( + final TableEnvironment tEnv, String expected) { + tEnv.registerCatalog(lakeSoulCatalog.getName(), lakeSoulCatalog); + tEnv.useCatalog(lakeSoulCatalog.getName()); + tEnv.executeSql("create database db1"); + tEnv.useDatabase("db1"); + tEnv.executeSql("DROP TABLE IF EXISTS test_table"); + + tEnv.executeSql( + String.format( + "CREATE TABLE test_table (" + + " id int," + + " real_col int" + + ") WITH (" + + "'" + + HASH_BUCKET_NUM.key() + + "'= '3'," + + "'" + + LAKESOUL_TABLE_PATH.key() + + "'='" + + getTempDirUri("/test_table") + + "'," + + "'" + + "connector" + + "'='lakesoul'" + + ")")); + tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + tEnv.executeSql( + "insert into test_table select 1, 1"); + final String actual = + tEnv.explainSql( + "delete from test_table", ExplainDetail.JSON_EXECUTION_PLAN); + String plan = replaceFlinkVersion(replaceNodeIdInOperator(replaceExecNodeId(replaceStreamNodeId(replaceStageId(actual))))); + System.out.println(plan); + assertEquals(expected, plan); + + tEnv.executeSql("drop database db1 cascade"); + } + // @Test // public void testPartStreamingWrite() throws Exception { // testStreamingWrite(true, false, "parquet", this::checkSuccessFiles); diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java index 2babb5484..68b2137f2 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java @@ -55,6 +55,7 @@ public void testUpdateNonPkAndPartitionSQL() throws ExecutionException, Interrup TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 100]", "+I[3, Jack, 100]", "+I[4, Mike, 70]"}); } + @Test public void testUpdatePkSQLNotSupported() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); @@ -71,8 +72,9 @@ public void testUpdatePkSQLNotSupported() throws ExecutionException, Interrupted TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); TestUtils.checkEqualInAnyOrder(results, - new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]", "+I[3, Jack, 75]", "+I[4, John, 70]","+I[4, Mike, 70]"}); + new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]", "+I[3, Jack, 75]", "+I[4, John, 70]", "+I[4, Mike, 70]"}); } + @Test public void testUpdatePartitionSQLNotSupported() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); @@ -132,7 +134,7 @@ public void testDeleteNonPkAndPartitionSQL() throws ExecutionException, Interrup public void testDeletePkSQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUser(tEnv); - tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95)").await(); + tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); try { tEnv.executeSql("DELETE FROM user_info where name = 'Jack'").await(); } catch (Throwable e) { @@ -141,14 +143,14 @@ public void testDeletePkSQL() throws ExecutionException, InterruptedException { String testSelect = "select * from user_info"; TableImpl flinkTable = (TableImpl) tEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); - TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]"}); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]", "+I[4, Bob, 110]"}); } @Test public void testDeleteCDCPkSQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulCDCSourceTableUser(tEnv); - tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95)").await(); + tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); try { tEnv.executeSql("DELETE FROM user_info where name = 'Jack'").await(); } catch (Throwable e) { @@ -158,7 +160,7 @@ public void testDeleteCDCPkSQL() throws ExecutionException, InterruptedException String testSelect = "select * from user_info"; TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); - TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]"}); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]", "+I[4, Bob, 110]"}); } @Test @@ -175,7 +177,7 @@ public void testDeletePartitionSQLNotSupported() throws ExecutionException, Inte String testSelect = "select * from user_info_1"; TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); - TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]","+I[3, Jack, 75]","+I[3, Amy, 95]"}); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[3, Jack, 75]", "+I[3, Amy, 95]"}); } @@ -207,6 +209,7 @@ private void createLakeSoulSourceTableUserWithRange(TableEnvironment tEnvs) thro tEnvs.executeSql("DROP TABLE if exists user_info_1"); tEnvs.executeSql(createUserSql); } + private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createUserSql = "create table user_info (" + " order_id INT," +