From 415f7148baa5d5ce26d1070fdd6e1856e1ebdc72 Mon Sep 17 00:00:00 2001 From: zenghua Date: Wed, 31 Jul 2024 11:28:46 +0800 Subject: [PATCH] fix case of schema migration Signed-off-by: zenghua --- .../sink/committer/LakeSoulSinkCommitter.java | 2 +- .../LakeSoulSinkGlobalCommitter.java | 6 +- .../AbstractLakeSoulMultiTableSinkWriter.java | 2 + .../sink/LakeSoulDataStreamSinkCase.java | 105 +++++++++++++----- 4 files changed, 82 insertions(+), 33 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java index c5dd92ecb..2661643f3 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java @@ -83,8 +83,8 @@ public List commit(List commit( } } FileSystem fileSystem = new Path(identity.tableLocation).getFileSystem(); - Path qp = new Path(identity.tableLocation).makeQualified(fileSystem); - FlinkUtil.createAndSetTableDirPermission(qp, true); - dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.toJson(), + Path path = new Path(identity.tableLocation).makeQualified(fileSystem); + FlinkUtil.createAndSetTableDirPermission(path, true); + dbManager.createNewTable(tableId, tableNamespace, tableName, path.toString(), msgSchema.toJson(), properties, partition); } else { DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java index 3eb3a2a3d..48eea7ddd 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java @@ -145,6 +145,7 @@ protected long getDataDmlTsMs(IN element) { @Override public void write(IN element, Context context) throws IOException { + LOG.info("{}", element); if (element == null) { return; } @@ -161,6 +162,7 @@ public void write(IN element, Context context) throws IOException { } catch (Exception e) { throw new IOException(e); } + LOG.info("{}", schemaAndRowDatas); for (Tuple2 schemaAndRowData : schemaAndRowDatas) { TableSchemaIdentity identity = schemaAndRowData.f0; RowData rowData = schemaAndRowData.f1; diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulDataStreamSinkCase.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulDataStreamSinkCase.java index a50c7589d..2f4107a6e 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulDataStreamSinkCase.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/connector/sink/LakeSoulDataStreamSinkCase.java @@ -9,6 +9,8 @@ import org.apache.flink.lakesoul.types.*; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.data.writer.BinaryRowWriter; import org.apache.flink.table.types.logical.*; @@ -16,6 +18,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -36,16 +39,18 @@ public void testCollectionSource() throws Exception { List recordCollection = new ArrayList<>(); String topic = ""; - List primaryKeys = Collections.emptyList(); + List primaryKeys = Arrays.asList("id"); List partitionKeys = Collections.emptyList(); String tableName = "TestBinarySourceRecordSink"; String path = getTempDirUri(tableName); TableId tableId = new TableId(LakeSoulCatalog.CATALOG_NAME, "default", tableName); - LakeSoulRowDataWrapper data = mockLakeSoulRowDataWrapper(0, useCDC, tableId); - + LakeSoulRowDataWrapper data = mockInsertLakeSoulRowDataWrapper(1, useCDC, tableId); recordCollection.add(new BinarySourceRecord(topic, primaryKeys, tableId, path, partitionKeys, false, data, "")); - DataStreamSource source = env.fromCollection(recordCollection); + LakeSoulRowDataWrapper data1 = mockUpdateLakeSoulRowDataWrapper(2, useCDC, tableId); + recordCollection.add(new BinarySourceRecord(topic, primaryKeys, tableId, path, partitionKeys, false, data1, "")); + + DataStreamSource source = env.fromCollection(recordCollection).setParallelism(1); LakeSoulRecordConvert lakeSoulRecordConvert = new LakeSoulRecordConvert(conf, conf.getString(SERVER_TIME_ZONE)); MySqlSource mySqlSource = MySqlSource.builder() @@ -62,44 +67,86 @@ public void testCollectionSource() throws Exception { env.execute("test BinarySourceRecord Sink with CollectionSource"); } - private LakeSoulRowDataWrapper mockLakeSoulRowDataWrapper(int seed, boolean useCDC, TableId tableId) { - int arity = 1 + 1; - if (useCDC) ++arity; + private LakeSoulRowDataWrapper mockInsertLakeSoulRowDataWrapper(int seed, boolean useCDC, TableId tableId) { + LakeSoulRowDataWrapper.Builder builder = LakeSoulRowDataWrapper + .newBuilder() + .setTableId(tableId) + .setAfterType(genRowType(seed, useCDC)) + .setAfterRowData(genRowData(seed, useCDC, RowKind.INSERT)); + return builder.build(); + } - // construct RowData - BinaryRowData rowdata = new BinaryRowData(arity); - BinaryRowWriter writer = new BinaryRowWriter(rowdata); - writer.writeInt(0, seed); + private LakeSoulRowDataWrapper mockUpdateLakeSoulRowDataWrapper(int seed, boolean useCDC, TableId tableId) { - RowKind rowKind = RowKind.INSERT; - long sortField = 1; - writer.writeLong(useCDC ? arity - 2 : arity - 1, sortField); - writer.writeRowKind(rowKind); - if (useCDC) { - setCDCRowKindField(writer, rowKind, arity - 1); + LakeSoulRowDataWrapper.Builder builder = LakeSoulRowDataWrapper + .newBuilder() + .setTableId(tableId) + .setBeforeRowType(genRowType(seed - 1, useCDC)) + .setBeforeRowData(genRowData(seed - 1, useCDC, RowKind.UPDATE_BEFORE)) + .setAfterType(genRowType(seed, useCDC)) + .setAfterRowData(genRowData(seed, useCDC, RowKind.UPDATE_AFTER)); + return builder.build(); + } + + public static RowType genRowType(int seed, boolean useCDC) { + int arity; + if (seed % 2 == 1) { + arity = 2; + } else { + arity = 3; } - writer.complete(); + arity += 1; + if (useCDC) ++arity; + - // construct RowType String[] colNames = new String[arity]; LogicalType[] colTypes = new LogicalType[arity]; - colNames[0] = "int"; - colTypes[0] = new IntType(); + colNames[0] = "id"; + colTypes[0] = new IntType(false); + if (seed % 2 == 1) { + colNames[1] = "int"; + colTypes[1] = new IntType(); + } else { + colNames[1] = "int"; + colTypes[1] = new IntType(); + colNames[2] = "str"; + colTypes[2] = new VarCharType(); + } colNames[useCDC ? arity - 2 : arity - 1] = SORT_FIELD; colTypes[useCDC ? arity - 2 : arity - 1] = new BigIntType(); if (useCDC) { - colNames[arity - 1] = CDC_CHANGE_COLUMN; + colNames[arity - 1] = CDC_CHANGE_COLUMN_DEFAULT; colTypes[arity - 1] = new VarCharType(false, Integer.MAX_VALUE); } + return RowType.of(colTypes, colNames); + } - RowType rowType = RowType.of(colTypes, colNames); + public static RowData genRowData(int seed, boolean useCDC, RowKind rowKind) { + int arity; + if (seed % 2 == 1) { + arity = 2; + } else { + arity = 3; + } + arity += 1; + if (useCDC) ++arity; + // construct RowData + BinaryRowData rowdata = new BinaryRowData(arity); + BinaryRowWriter writer = new BinaryRowWriter(rowdata); + writer.writeInt(0, 0); + writer.writeInt(1, seed); + if (seed % 2 == 0) { + writer.writeString(2, StringData.fromString(String.valueOf(seed))); + } + long sortField = seed; + writer.writeLong(useCDC ? arity - 2 : arity - 1, sortField); + writer.writeRowKind(rowKind); - LakeSoulRowDataWrapper.Builder builder = LakeSoulRowDataWrapper - .newBuilder() - .setTableId(tableId) - .setAfterType(rowType) - .setAfterRowData(rowdata); - return builder.build(); + if (useCDC) { + setCDCRowKindField(writer, rowKind, arity - 1); + } + writer.complete(); + return rowdata; } }