Skip to content

Commit

Permalink
[Flink]fix readPartitionInfo on UpdateCommit (#458)
Browse files Browse the repository at this point in the history
* [Flink]fix readPartitionInfo on UpdateCommit

Signed-off-by: zenghua <[email protected]>

* fix testLakeSoulTableSinkDeleteWithParallelismInBatch expected result

Signed-off-by: zenghua <[email protected]>

* add todo memo

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Mar 26, 2024
1 parent c199b07 commit 5e193e6
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm
Set<CommitOp> middleCommitOps = partitionInfoDao.getCommitOpsBetweenVersions(tableId, partitionDesc,
readPartitionVersion + 1, curVersion);
if (commitOp.equals(CommitOp.UpdateCommit)) {
if (middleCommitOps.contains(CommitOp.UpdateCommit) ||
// TODO: 2024/3/22 further considering for this UpdateCommit conflict case
if (
(middleCommitOps.size() > 1 && middleCommitOps.contains(CommitOp.CompactionCommit))) {
throw new IllegalStateException(
"current operation conflicts with other data writing tasks, table path: " +
Expand Down Expand Up @@ -831,7 +832,7 @@ private String getNameSpaceDomain(String namespace) {
return namespaceInfo.getDomain();
}

public void commitDataCommitInfo(DataCommitInfo dataCommitInfo) {
public void commitDataCommitInfo(DataCommitInfo dataCommitInfo, List<PartitionInfo> readPartitionInfoList) {
String tableId = dataCommitInfo.getTableId();
String partitionDesc = dataCommitInfo.getPartitionDesc().replaceAll("/", LAKESOUL_RANGE_PARTITION_SPLITTER);
Uuid commitId = dataCommitInfo.getCommitId();
Expand Down Expand Up @@ -863,6 +864,9 @@ public void commitDataCommitInfo(DataCommitInfo dataCommitInfo) {

metaInfo.setTableInfo(tableInfo);
metaInfo.addAllListPartition(partitionInfoList);
if (readPartitionInfoList != null) {
metaInfo.addAllReadPartitionInfo(readPartitionInfoList);
}

commitData(metaInfo.build(), false, commitOp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;

import static com.dmetasoul.lakesoul.meta.DBConfig.*;
Expand Down Expand Up @@ -96,6 +93,7 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
dataFileOpList.add(dataFileOp.build());
}
String partition = committable.getBucketId();
List<PartitionInfo> readPartitionInfoList = null;

TableNameId tableNameId =
lakeSoulDBManager.shortTableName(identity.tableId.table(), identity.tableId.schema());
Expand All @@ -105,9 +103,15 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
dataCommitInfo.setPartitionDesc(partition.isEmpty() ? LAKESOUL_NON_PARTITION_TABLE_PART_DESC :
partition.replaceAll("/", LAKESOUL_RANGE_PARTITION_SPLITTER));
dataCommitInfo.addAllFileOps(dataFileOpList);
if(LakeSoulSinkOptions.DELETE.equals(committable.getDmlType())){
if (LakeSoulSinkOptions.DELETE.equals(committable.getDmlType())) {
dataCommitInfo.setCommitOp(CommitOp.UpdateCommit);
}else{
if (!committable.getSourcePartitionInfo().isEmpty()) {
readPartitionInfoList =
JniWrapper
.parseFrom(Base64.getDecoder().decode(committable.getSourcePartitionInfo()))
.getPartitionInfoList();
}
} else {
dataCommitInfo.setCommitOp(CommitOp.AppendCommit);
}
dataCommitInfo.setTimestamp(System.currentTimeMillis());
Expand All @@ -124,7 +128,7 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
dataCommitInfo.getTimestamp(), dataCommitInfo.getCommitId().toString());
}

lakeSoulDBManager.commitDataCommitInfo(dataCommitInfo.build());
lakeSoulDBManager.commitDataCommitInfo(dataCommitInfo.build(), readPartitionInfoList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package org.apache.flink.lakesoul.sink.state;

import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTablesSink;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
Expand All @@ -27,6 +29,7 @@ public class LakeSoulMultiTableSinkCommittable implements Serializable, Comparab
private final String bucketId;

private final TableSchemaIdentity identity;
private String sourcePartitionInfo;

@Nullable
private List<InProgressFileWriter.PendingFileRecoverable> pendingFiles;
Expand All @@ -47,9 +50,19 @@ public LakeSoulMultiTableSinkCommittable(
String bucketId,
List<InProgressFileWriter.PendingFileRecoverable> pendingFiles,
long creationTime,
TableSchemaIdentity identity, long tsMs, String dmlType) {
this(bucketId, identity, pendingFiles, creationTime,
UUID.randomUUID().toString(), tsMs,dmlType
TableSchemaIdentity identity,
long tsMs,
String dmlType,
String sourcePartitionInfo
) {
this(bucketId,
identity,
pendingFiles,
creationTime,
UUID.randomUUID().toString(),
tsMs,
dmlType,
sourcePartitionInfo
);
}

Expand All @@ -63,14 +76,18 @@ public LakeSoulMultiTableSinkCommittable(
@Nullable List<InProgressFileWriter.PendingFileRecoverable> pendingFiles,
long time,
@Nullable String commitId,
long tsMs, String dmlType) {
long tsMs,
String dmlType,
String sourcePartitionInfo
) {
this.bucketId = bucketId;
this.identity = identity;
this.pendingFiles = pendingFiles;
this.creationTime = time;
this.commitId = commitId;
this.tsMs = tsMs;
this.dmlType = dmlType;
this.sourcePartitionInfo = sourcePartitionInfo;
}

public long getTsMs() {
Expand Down Expand Up @@ -128,9 +145,35 @@ public void merge(LakeSoulMultiTableSinkCommittable committable) {
} else {
if (committable.hasPendingFile()) pendingFiles = committable.getPendingFiles();
}
mergeSourcePartitionInfo(committable);
}

private void mergeSourcePartitionInfo(LakeSoulMultiTableSinkCommittable committable) {
if (sourcePartitionInfo == null) {
sourcePartitionInfo = committable.getSourcePartitionInfo();
} else {
try {
JniWrapper jniWrapper = JniWrapper
.parseFrom(sourcePartitionInfo.getBytes())
.toBuilder()
.addAllPartitionInfo(
JniWrapper
.parseFrom(committable.getSourcePartitionInfo().getBytes())
.getPartitionInfoList()
)
.build();
sourcePartitionInfo = new String(jniWrapper.toByteArray());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}

public String getDmlType() {
return dmlType;
}

public String getSourcePartitionInfo() {
return sourcePartitionInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private void serializeV1(LakeSoulMultiTableSinkCommittable committable, DataOutp
dataOutputView.writeUTF(committable.getCommitId());
dataOutputView.writeLong(committable.getTsMs());
dataOutputView.writeUTF(committable.getDmlType());
dataOutputView.writeUTF(committable.getSourcePartitionInfo());
} else {
dataOutputView.writeBoolean(false);
}
Expand All @@ -98,6 +99,7 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV
long time = Long.MIN_VALUE;
long dataTsMs = Long.MAX_VALUE;
String dmlType = null;
String sourcePartitionInfo = "";
if (dataInputView.readBoolean()) {
int size = dataInputView.readInt();
if (size > 0) {
Expand All @@ -111,6 +113,7 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV
commitId = dataInputView.readUTF();
dataTsMs = dataInputView.readLong();
dmlType = dataInputView.readUTF();
sourcePartitionInfo = dataInputView.readUTF();
}
}

Expand All @@ -119,7 +122,7 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV
String bucketId = dataInputView.readUTF();

return new LakeSoulMultiTableSinkCommittable(
bucketId, identity, pendingFile, time, commitId, dataTsMs,dmlType);
bucketId, identity, pendingFile, time, commitId, dataTsMs, dmlType, sourcePartitionInfo);
}

private static void validateMagicNumber(DataInputView in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public void write(IN element, Context context) throws IOException {
@Override
public List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush) throws IOException {
List<LakeSoulMultiTableSinkCommittable> committables = new ArrayList<>();
String dmlType = this.conf.getString(LakeSoulSinkOptions.DMLTYPE);
String dmlType = this.conf.getString(LakeSoulSinkOptions.DML_TYPE);
String sourcePartitionInfo = this.conf.getString(LakeSoulSinkOptions.SOURCE_PARTITION_INFO);
// Every time before we prepare commit, we first check and remove the inactive
// buckets. Checking the activeness right before pre-committing avoid re-creating
// the bucket every time if the bucket use OnCheckpointingRollingPolicy.
Expand All @@ -184,7 +185,7 @@ public List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush) thro
if (!entry.getValue().isActive()) {
activeBucketIt.remove();
} else {
committables.addAll(entry.getValue().prepareCommit(flush,dmlType));
committables.addAll(entry.getValue().prepareCommit(flush, dmlType, sourcePartitionInfo));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void write(RowData element, long currentTime, long tsMs) throws IOException {
inProgressPartWriter.write(element, currentTime);
}

List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush,String dmlType) throws IOException {
List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush, String dmlType, String sourcePartitionInfo) throws IOException {
// we always close part file and do not keep in-progress file
// since the native parquet writer doesn't support resume
if (inProgressPartWriter != null) {
Expand All @@ -167,7 +167,11 @@ List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush,String dmlTy
committables.add(new LakeSoulMultiTableSinkCommittable(
bucketId,
tmpPending,
time, tableId, tsMs, dmlType));
time,
tableId,
tsMs,
dmlType,
sourcePartitionInfo));
pendingFiles.clear();

return committables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public SourceReader<RowData, LakeSoulSplit> createReader(SourceReaderContext rea
@Override
public SplitEnumerator<LakeSoulSplit, LakeSoulPendingSplits> createEnumerator(
SplitEnumeratorContext<LakeSoulSplit> enumContext) {
TableInfo tif = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableId.table(),
TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableId.table(),
tableId.schema());
List<String> readStartTimestampWithTimeZone =
Arrays.asList(optionParams.getOrDefault(LakeSoulOptions.READ_START_TIME(), ""),
Expand All @@ -114,29 +114,29 @@ public SplitEnumerator<LakeSoulSplit, LakeSoulPendingSplits> createEnumerator(
new LakeSoulDynSplitAssigner(optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1")),
Long.parseLong(optionParams.getOrDefault(LakeSoulOptions.DISCOVERY_INTERVAL(), "30000")),
convertTimeFormatWithTimeZone(readStartTimestampWithTimeZone),
tif.getTableId(),
tableInfo.getTableId(),
partDesc,
optionParams.getOrDefault(LakeSoulOptions.HASH_BUCKET_NUM(), "-1"));
} else {
return staticSplitEnumerator(enumContext,
tif,
tableInfo,
readStartTimestampWithTimeZone,
readType);
}
}

private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorContext<LakeSoulSplit> enumContext,
TableInfo tif,
TableInfo tableInfo,
List<String> readStartTimestampWithTimeZone,
String readType) {
List<String> readEndTimestampWithTimeZone =
Arrays.asList(optionParams.getOrDefault(LakeSoulOptions.READ_END_TIME(), ""),
optionParams.getOrDefault(LakeSoulOptions.TIME_ZONE(), ""));
List<DataFileInfo> dfinfos;
List<DataFileInfo> dataFileInfoList;
if (readType.equals("") || readType.equals("fullread")) {
dfinfos = Arrays.asList(getTargetDataFileInfo(tif));
dataFileInfoList = Arrays.asList(getTargetDataFileInfo(tableInfo));
} else {
dfinfos = new ArrayList<>();
dataFileInfoList = new ArrayList<>();
List<String> partDescs = new ArrayList<>();
String partitionDescOpt = optionParams.getOrDefault(LakeSoulOptions.PARTITION_DESC(), "");
if (partitionDescOpt.isEmpty() && remainingPartitions != null) {
Expand All @@ -148,7 +148,7 @@ private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorConte
partDescs.add(partitionDescOpt);
}
for (String desc : partDescs) {
dfinfos.addAll(Arrays.asList(DataOperation.getIncrementalPartitionDataInfo(tif.getTableId(),
dataFileInfoList.addAll(Arrays.asList(DataOperation.getIncrementalPartitionDataInfo(tableInfo.getTableId(),
desc,
convertTimeFormatWithTimeZone(readStartTimestampWithTimeZone),
convertTimeFormatWithTimeZone(readEndTimestampWithTimeZone),
Expand All @@ -157,18 +157,18 @@ private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorConte
}
int capacity = 100;
ArrayList<LakeSoulSplit> splits = new ArrayList<>(capacity);
if (!FlinkUtil.isExistHashPartition(tif)) {
for (DataFileInfo dfinfo : dfinfos) {
if (!FlinkUtil.isExistHashPartition(tableInfo)) {
for (DataFileInfo dataFileInfo : dataFileInfoList) {
ArrayList<Path> tmp = new ArrayList<>();
tmp.add(new Path(dfinfo.path()));
splits.add(new LakeSoulSplit(String.valueOf(dfinfo.hashCode()),
tmp.add(new Path(dataFileInfo.path()));
splits.add(new LakeSoulSplit(String.valueOf(dataFileInfo.hashCode()),
tmp,
0));
}
} else {
Map<String, Map<Integer, List<Path>>> splitByRangeAndHashPartition =
FlinkUtil.splitDataInfosToRangeAndHashPartition(tif.getTableId(),
dfinfos.toArray(new DataFileInfo[0]));
FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo.getTableId(),
dataFileInfoList.toArray(new DataFileInfo[0]));
for (Map.Entry<String, Map<Integer, List<Path>>> entry : splitByRangeAndHashPartition.entrySet()) {
for (Map.Entry<Integer, List<Path>> split : entry.getValue().entrySet()) {
splits.add(new LakeSoulSplit(String.valueOf(split.hashCode()),
Expand All @@ -182,8 +182,8 @@ private LakeSoulStaticSplitEnumerator staticSplitEnumerator(SplitEnumeratorConte
}


private DataFileInfo[] getTargetDataFileInfo(TableInfo tif) {
return FlinkUtil.getTargetDataFileInfo(tif,
private DataFileInfo[] getTargetDataFileInfo(TableInfo tableInfo) {
return FlinkUtil.getTargetDataFileInfo(tableInfo,
this.remainingPartitions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.flink.lakesoul.table;

import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.flink.table.connector.RowLevelModificationScanContext;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;

public class LakeSoulRowLevelModificationScanContext implements RowLevelModificationScanContext {

private final JniWrapper sourcePartitionInfo;

public LakeSoulRowLevelModificationScanContext(List<PartitionInfo> listPartitionInfo) {
sourcePartitionInfo = JniWrapper.newBuilder().addAllPartitionInfo(listPartitionInfo).build();
}

public JniWrapper getSourcePartitionInfo() {
return sourcePartitionInfo;
}

public String getBas64EncodedSourcePartitionInfo() {
return Base64.getEncoder().encodeToString(getSourcePartitionInfo().toByteArray());
}
}
Loading

0 comments on commit 5e193e6

Please sign in to comment.