Skip to content

Commit

Permalink
fix case of schema migration
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Jul 31, 2024
1 parent c71d1f1 commit 6b94ceb
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
for (String file : files) {
DataFileOp.Builder dataFileOp = DataFileOp.newBuilder();
dataFileOp.setFileOp(FileOp.add);
dataFileOp.setPath(file);
Path path = new Path(file);
dataFileOp.setPath(path.toString());
FileStatus fileStatus = FileSystem.get(path.toUri()).getFileStatus(path);
dataFileOp.setSize(fileStatus.getLen());
dataFileOp.setFileExistCols(fileExistCols);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
}
}
FileSystem fileSystem = new Path(identity.tableLocation).getFileSystem();
Path qp = new Path(identity.tableLocation).makeQualified(fileSystem);
FlinkUtil.createAndSetTableDirPermission(qp, true);
dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.toJson(),
Path path = new Path(identity.tableLocation).makeQualified(fileSystem);
FlinkUtil.createAndSetTableDirPermission(path, true);
dbManager.createNewTable(tableId, tableNamespace, tableName, path.toString(), msgSchema.toJson(),
properties, partition);
} else {
DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ protected long getDataDmlTsMs(IN element) {

@Override
public void write(IN element, Context context) throws IOException {
LOG.info("{}", element);
if (element == null) {
return;
}
Expand All @@ -161,6 +162,7 @@ public void write(IN element, Context context) throws IOException {
} catch (Exception e) {
throw new IOException(e);
}
LOG.info("{}", schemaAndRowDatas);
for (Tuple2<TableSchemaIdentity, RowData> schemaAndRowData : schemaAndRowDatas) {
TableSchemaIdentity identity = schemaAndRowData.f0;
RowData rowData = schemaAndRowData.f1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import org.apache.flink.lakesoul.types.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand All @@ -36,16 +39,18 @@ public void testCollectionSource() throws Exception {

List<BinarySourceRecord> recordCollection = new ArrayList<>();
String topic = "";
List<String> primaryKeys = Collections.emptyList();
List<String> primaryKeys = Arrays.asList("id");
List<String> partitionKeys = Collections.emptyList();
String tableName = "TestBinarySourceRecordSink";
String path = getTempDirUri(tableName);
TableId tableId = new TableId(LakeSoulCatalog.CATALOG_NAME, "default", tableName);
LakeSoulRowDataWrapper data = mockLakeSoulRowDataWrapper(0, useCDC, tableId);

LakeSoulRowDataWrapper data = mockInsertLakeSoulRowDataWrapper(1, useCDC, tableId);
recordCollection.add(new BinarySourceRecord(topic, primaryKeys, tableId, path, partitionKeys, false, data, ""));

DataStreamSource<BinarySourceRecord> source = env.fromCollection(recordCollection);
LakeSoulRowDataWrapper data1 = mockUpdateLakeSoulRowDataWrapper(2, useCDC, tableId);
recordCollection.add(new BinarySourceRecord(topic, primaryKeys, tableId, path, partitionKeys, false, data1, ""));

DataStreamSource<BinarySourceRecord> source = env.fromCollection(recordCollection).setParallelism(1);
LakeSoulRecordConvert lakeSoulRecordConvert = new LakeSoulRecordConvert(conf, conf.getString(SERVER_TIME_ZONE));

MySqlSource<BinarySourceRecord> mySqlSource = MySqlSource.<BinarySourceRecord>builder()
Expand All @@ -62,44 +67,86 @@ public void testCollectionSource() throws Exception {
env.execute("test BinarySourceRecord Sink with CollectionSource");
}

private LakeSoulRowDataWrapper mockLakeSoulRowDataWrapper(int seed, boolean useCDC, TableId tableId) {
int arity = 1 + 1;
if (useCDC) ++arity;
private LakeSoulRowDataWrapper mockInsertLakeSoulRowDataWrapper(int seed, boolean useCDC, TableId tableId) {
LakeSoulRowDataWrapper.Builder builder = LakeSoulRowDataWrapper
.newBuilder()
.setTableId(tableId)
.setAfterType(genRowType(seed, useCDC))
.setAfterRowData(genRowData(seed, useCDC, RowKind.INSERT));
return builder.build();
}

// construct RowData
BinaryRowData rowdata = new BinaryRowData(arity);
BinaryRowWriter writer = new BinaryRowWriter(rowdata);
writer.writeInt(0, seed);
private LakeSoulRowDataWrapper mockUpdateLakeSoulRowDataWrapper(int seed, boolean useCDC, TableId tableId) {

RowKind rowKind = RowKind.INSERT;
long sortField = 1;
writer.writeLong(useCDC ? arity - 2 : arity - 1, sortField);
writer.writeRowKind(rowKind);
if (useCDC) {
setCDCRowKindField(writer, rowKind, arity - 1);
LakeSoulRowDataWrapper.Builder builder = LakeSoulRowDataWrapper
.newBuilder()
.setTableId(tableId)
.setBeforeRowType(genRowType(seed - 1, useCDC))
.setBeforeRowData(genRowData(seed - 1, useCDC, RowKind.UPDATE_BEFORE))
.setAfterType(genRowType(seed, useCDC))
.setAfterRowData(genRowData(seed, useCDC, RowKind.UPDATE_AFTER));
return builder.build();
}

public static RowType genRowType(int seed, boolean useCDC) {
int arity;
if (seed % 2 == 1) {
arity = 2;
} else {
arity = 3;
}
writer.complete();
arity += 1;
if (useCDC) ++arity;


// construct RowType
String[] colNames = new String[arity];
LogicalType[] colTypes = new LogicalType[arity];
colNames[0] = "int";
colTypes[0] = new IntType();
colNames[0] = "id";
colTypes[0] = new IntType(false);
if (seed % 2 == 1) {
colNames[1] = "int";
colTypes[1] = new IntType();
} else {
colNames[1] = "int";
colTypes[1] = new IntType();
colNames[2] = "str";
colTypes[2] = new VarCharType();
}

colNames[useCDC ? arity - 2 : arity - 1] = SORT_FIELD;
colTypes[useCDC ? arity - 2 : arity - 1] = new BigIntType();
if (useCDC) {
colNames[arity - 1] = CDC_CHANGE_COLUMN;
colNames[arity - 1] = CDC_CHANGE_COLUMN_DEFAULT;
colTypes[arity - 1] = new VarCharType(false, Integer.MAX_VALUE);
}
return RowType.of(colTypes, colNames);
}

RowType rowType = RowType.of(colTypes, colNames);
public static RowData genRowData(int seed, boolean useCDC, RowKind rowKind) {
int arity;
if (seed % 2 == 1) {
arity = 2;
} else {
arity = 3;
}
arity += 1;
if (useCDC) ++arity;
// construct RowData
BinaryRowData rowdata = new BinaryRowData(arity);
BinaryRowWriter writer = new BinaryRowWriter(rowdata);
writer.writeInt(0, 0);
writer.writeInt(1, seed);
if (seed % 2 == 0) {
writer.writeString(2, StringData.fromString(String.valueOf(seed)));
}
long sortField = seed;
writer.writeLong(useCDC ? arity - 2 : arity - 1, sortField);
writer.writeRowKind(rowKind);

LakeSoulRowDataWrapper.Builder builder = LakeSoulRowDataWrapper
.newBuilder()
.setTableId(tableId)
.setAfterType(rowType)
.setAfterRowData(rowdata);
return builder.build();
if (useCDC) {
setCDCRowKindField(writer, rowKind, arity - 1);
}
writer.complete();
return rowdata;
}
}

0 comments on commit 6b94ceb

Please sign in to comment.