Skip to content

Commit

Permalink
[Flink] Support Delete statement on partition column (#459)
Browse files Browse the repository at this point in the history
* support delete on partition col

Signed-off-by: zenghua <[email protected]>

* fix npe

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Mar 29, 2024
1 parent a48c063 commit e4b21f4
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,8 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm
newPartitionList.add(curPartitionInfo.build());
}
} else if (commitOp.equals(CommitOp.CompactionCommit) || commitOp.equals(CommitOp.UpdateCommit)) {
if (readPartitionInfo != null) {
for (PartitionInfo p : readPartitionInfo) {
readPartitionMap.put(p.getPartitionDesc(), p);
}
for (PartitionInfo p : readPartitionInfo) {
readPartitionMap.put(p.getPartitionDesc(), p);
}
for (PartitionInfo partitionInfo : listPartitionInfo) {
String partitionDesc = partitionInfo.getPartitionDesc();
Expand Down Expand Up @@ -507,6 +505,32 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm
newMap.put(partitionDesc, curPartitionInfo.build());
newPartitionList.add(curPartitionInfo.build());
}
} else if (commitOp.equals(CommitOp.DeleteCommit)) {
for (PartitionInfo p : readPartitionInfo) {
readPartitionMap.put(p.getPartitionDesc(), p);
}

for (PartitionInfo partitionInfo : listPartitionInfo) {
String partitionDesc = partitionInfo.getPartitionDesc();
PartitionInfo.Builder curPartitionInfo = getOrCreateCurPartitionInfo(curMap, partitionDesc, tableId).toBuilder();
int curVersion = curPartitionInfo.getVersion();

PartitionInfo readPartition = readPartitionMap.get(partitionDesc);
if (readPartition == null) {
continue;
}

int newVersion = curVersion + 1;
curPartitionInfo
.setVersion(newVersion)
.setCommitOp(commitOp)
.setExpression(partitionInfo.getExpression())
.clearSnapshot();

newMap.put(partitionDesc, curPartitionInfo.build());
newPartitionList.add(curPartitionInfo.build());
}

} else {
throw new IllegalStateException("this operation is Illegal of the table:" + tableInfo.getTablePath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
dataCommitInfo.setPartitionDesc(partition.isEmpty() ? LAKESOUL_NON_PARTITION_TABLE_PART_DESC :
partition.replaceAll("/", LAKESOUL_RANGE_PARTITION_SPLITTER));
dataCommitInfo.addAllFileOps(dataFileOpList);

if (!committable.getSourcePartitionInfo().isEmpty()) {
readPartitionInfoList =
JniWrapper
.parseFrom(Base64.getDecoder().decode(committable.getSourcePartitionInfo()))
.getPartitionInfoList();
}

if (LakeSoulSinkOptions.DELETE.equals(committable.getDmlType())) {
dataCommitInfo.setCommitOp(CommitOp.UpdateCommit);
if (!committable.getSourcePartitionInfo().isEmpty()) {
readPartitionInfoList =
JniWrapper
.parseFrom(Base64.getDecoder().decode(committable.getSourcePartitionInfo()))
.getPartitionInfoList();
}
} else if (LakeSoulSinkOptions.PARTITION_DELETE.equals(committable.getDmlType())) {
dataCommitInfo.setCommitOp(CommitOp.DeleteCommit);
} else {
dataCommitInfo.setCommitOp(CommitOp.AppendCommit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public DynamicTableSource copy() {
lsts.projectedFields = this.projectedFields;
lsts.remainingPartitions = this.remainingPartitions;
lsts.filter = this.filter;
lsts.modificationContext = this.modificationContext;
return lsts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.flink.table.connector.RowLevelModificationScanContext;
import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class LakeSoulRowLevelModificationScanContext implements RowLevelModificationScanContext {

private final JniWrapper sourcePartitionInfo;
private final SupportsRowLevelModificationScan.RowLevelModificationType type;

public LakeSoulRowLevelModificationScanContext(List<PartitionInfo> listPartitionInfo) {
private List<Map<String, String>> remainingPartitions;

public LakeSoulRowLevelModificationScanContext(SupportsRowLevelModificationScan.RowLevelModificationType type, List<PartitionInfo> listPartitionInfo) {
this.type = type;
sourcePartitionInfo = JniWrapper.newBuilder().addAllPartitionInfo(listPartitionInfo).build();
remainingPartitions = null;
}

public JniWrapper getSourcePartitionInfo() {
Expand All @@ -24,4 +29,16 @@ public JniWrapper getSourcePartitionInfo() {
public String getBas64EncodedSourcePartitionInfo() {
return Base64.getEncoder().encodeToString(getSourcePartitionInfo().toByteArray());
}

public SupportsRowLevelModificationScan.RowLevelModificationType getType() {
return type;
}

public void setRemainingPartitions(List<Map<String, String>> remainingPartitions) {
this.remainingPartitions = remainingPartitions;
}

public List<Map<String, String>> getRemainingPartitions() {
return remainingPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package org.apache.flink.lakesoul.table;

import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -36,7 +35,6 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -54,6 +52,7 @@ public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning
private final List<String> primaryKeyList;
private final List<String> partitionKeyList;
private boolean overwrite;
private LakeSoulRowLevelModificationScanContext modificationContext;

public LakeSoulTableSink(String summaryName, String tableName, DataType dataType, List<String> primaryKeyList,
List<String> partitionKeyList, ReadableConfig flinkConf, ResolvedSchema schema) {
Expand Down Expand Up @@ -113,6 +112,12 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
*/
private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream, Context sinkContext)
throws IOException {

if (modificationContext != null) {
if (modificationContext.getRemainingPartitions() != null) {
flinkConf.set(DML_TYPE, PARTITION_DELETE);
}
}
Path path = FlinkUtil.makeQualifiedPath(new Path(flinkConf.getString(CATALOG_PATH)));
int bucketParallelism = flinkConf.getInteger(HASH_BUCKET_NUM);
//rowData key tools
Expand Down Expand Up @@ -162,8 +167,10 @@ public void applyStaticPartition(Map<String, String> map) {

@Override
public RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) {

if (context instanceof LakeSoulRowLevelModificationScanContext) {
flinkConf.set(SOURCE_PARTITION_INFO, ((LakeSoulRowLevelModificationScanContext) context).getBas64EncodedSourcePartitionInfo());
this.modificationContext = (LakeSoulRowLevelModificationScanContext) context;
flinkConf.set(SOURCE_PARTITION_INFO, modificationContext.getBas64EncodedSourcePartitionInfo());
if (flinkConf.getBoolean(USE_CDC, false)) {
flinkConf.set(DML_TYPE, DELETE_CDC);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class LakeSoulTableSource
protected List<Map<String, String>> remainingPartitions;

protected FilterPredicate filter;
protected LakeSoulRowLevelModificationScanContext modificationContext;

public LakeSoulTableSource(TableId tableId,
RowType rowType,
Expand All @@ -72,6 +73,7 @@ public LakeSoulTableSource(TableId tableId,
this.isStreaming = isStreaming;
this.pkColumns = pkColumns;
this.optionParams = optionParams;
this.modificationContext = null;
}

@Override
Expand Down Expand Up @@ -134,10 +136,33 @@ public Optional<List<Map<String, String>>> listPartitions() {

@Override
public void applyPartitions(List<Map<String, String>> remainingPartitions) {
this.remainingPartitions = remainingPartitions;
if (isDelete()) {
this.remainingPartitions = complementPartition(remainingPartitions);
getModificationContext().setRemainingPartitions(this.remainingPartitions);
} else {
this.remainingPartitions = remainingPartitions;
}
LOG.info("Applied partitions to native io: {}", this.remainingPartitions);
}

private boolean isDelete() {
LakeSoulRowLevelModificationScanContext context = getModificationContext();
return context != null && context.getType() == RowLevelModificationType.DELETE;
}

private List<Map<String, String>> complementPartition(List<Map<String, String>> remainingPartitions) {
List<PartitionInfo> allPartitionInfo = listPartitionInfo();
Set<String> remainingPartitionDesc = remainingPartitions.stream().map(DBUtil::formatPartitionDesc).collect(Collectors.toSet());
List<Map<String, String>> partitions = new ArrayList<>();
for (PartitionInfo info : allPartitionInfo) {
String partitionDesc = info.getPartitionDesc();
if (!partitionDesc.equals(DBConfig.LAKESOUL_NON_PARTITION_TABLE_PART_DESC) && !remainingPartitionDesc.contains(partitionDesc)) {
partitions.add(DBUtil.parsePartitionDesc(partitionDesc));
}
}
return partitions;
}

@Override
public boolean supportsNestedProjection() {
return false;
Expand Down Expand Up @@ -242,9 +267,14 @@ public RowLevelModificationScanContext applyRowLevelModificationScan(
@Nullable
RowLevelModificationScanContext previousContext) {
if (previousContext == null || previousContext instanceof LakeSoulRowLevelModificationScanContext) {
// TODO: 2024/3/22 partiontion pruning should be handled
return new LakeSoulRowLevelModificationScanContext(listPartitionInfo());
// TODO: 2024/3/22 partiontion pruning should be handled
this.modificationContext = new LakeSoulRowLevelModificationScanContext(rowLevelModificationType, listPartitionInfo());
return modificationContext;
}
throw new RuntimeException("LakeSoulTableSource.applyRowLevelModificationScan only supports LakeSoulRowLevelModificationScanContext");
}

public LakeSoulRowLevelModificationScanContext getModificationContext() {
return modificationContext;
}
}
Loading

0 comments on commit e4b21f4

Please sign in to comment.