Skip to content

Commit

Permalink
[Flink] allow schema change when canCast=true
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Jan 10, 2025
1 parent 15f6d21 commit d57618b
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
isBounded);
LOG.info("Committing: {}", globalCommittable);

int index = 0;
String dbType = this.conf.getString(SOURCE_DB_TYPE, "");

for (Map.Entry<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> entry :
Expand Down Expand Up @@ -195,60 +194,56 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
boolean schemaChanged = (boolean) equalOrCanCastTuple3._2();
StructType mergeStructType = equalOrCanCastTuple3._3();

boolean schemaChangeFound = false;
if (dbType.equals("mongodb")) {
if (mergeStructType.length() > origSchema.size()) {
schemaChangeFound = schemaChanged;
}
} else {
schemaChangeFound = equalOrCanCast.equals(DataTypeCastUtils.CAN_CAST());
}
if (schemaChangeFound) {
if (equalOrCanCast.equals(DataTypeCastUtils.CAN_CAST())) {
LOG.warn("Schema change found, origin schema = {}, changed schema = {}",
origSchema.json(),
msgSchema.toJson());
if (logicallyDropColumn) {
List<String> droppedColumn = DataTypeCastUtils.getDroppedColumn(origSchema, sparkSchema);
if (droppedColumn.size() > 0) {
LOG.warn("Dropping Column {} Logically", droppedColumn);
dbManager.logicallyDropColumn(tableInfo.getTableId(), droppedColumn);
if (schemaChanged) {
dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
}
} else {
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
if (dbType.equals("mongodb")) {
if (schemaChanged) {
dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
}
} else {
LOG.info("Changing table schema: {}, {}, {}, {}, {}, {}",
tableNamespace,
tableName,
identity.tableLocation,
msgSchema,
identity.useCDC,
identity.cdcColumn);
if (dbType.equals("mongodb")) {
dbManager.updateTableSchema(tableInfo.getTableId(),
ArrowUtils.toArrowSchema(mergeStructType, "UTC").toJson());
if (logicallyDropColumn) {
List<String>
droppedColumn =
DataTypeCastUtils.getDroppedColumn(origSchema, sparkSchema);
if (droppedColumn.size() > 0) {
LOG.warn("Dropping Column {} Logically", droppedColumn);
dbManager.logicallyDropColumn(tableInfo.getTableId(), droppedColumn);
if (schemaChanged) {
dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
}
} else {
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
}
} else {
LOG.info("Changing table schema: {}, {}, {}, {}, {}, {}",
tableNamespace,
tableName,
identity.tableLocation,
msgSchema,
identity.useCDC,
identity.cdcColumn);
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
}
if (JSONObject.parseObject(tableInfo.getProperties())
.containsKey(DBConfig.TableInfoProperty.DROPPED_COLUMN)) {
dbManager.removeLogicallyDropColumn(tableInfo.getTableId());
if (JSONObject.parseObject(tableInfo.getProperties())
.containsKey(DBConfig.TableInfoProperty.DROPPED_COLUMN)) {
dbManager.removeLogicallyDropColumn(tableInfo.getTableId());
}
}
}
} else if (!equalOrCanCast.equals(DataTypeCastUtils.IS_EQUAL())) {
long
schemaLastChangeTime =
JSON.parseObject(tableInfo.getProperties())
.getLong(DBConfig.TableInfoProperty.LAST_TABLE_SCHEMA_CHANGE_TIME);
.getLongValue(DBConfig.TableInfoProperty.LAST_TABLE_SCHEMA_CHANGE_TIME);
if (equalOrCanCast.contains("Change of Partition Column") ||
equalOrCanCast.contains("Change of Primary Key Column")) {
throw new IOException(equalOrCanCast);
}
for (LakeSoulMultiTableSinkCommittable committable : lakeSoulMultiTableSinkCommittable) {
if (committable.getTsMs() > schemaLastChangeTime) {
LOG.error("incompatible cast data created and delayThreshold time: {}, dml create time: {}",
LOG.error("Incompatible cast data {} created and delayThreshold time: {}, dml create time: {}",
equalOrCanCast,
schemaLastChangeTime, committable.getTsMs());
throw new IOException(equalOrCanCast);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord
}
if (sourceField.schema().field("ts_ms") != null) {
tsMs = (Long) source.getWithoutDefault("ts_ms");
if (tsMs == 0) {
tsMs = System.currentTimeMillis();
}
}
}
long sortField = (binlogFileIndex << 32) + binlogPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
Expand All @@ -36,6 +35,7 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.Test;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -74,6 +74,7 @@ public void test() throws Exception {

@Test
public void testManualArrowBatch() throws Exception {

Configuration conf = new Configuration();
conf.set(MAX_ROW_GROUP_SIZE, 2);
conf.set(FILE_ROLLING_SIZE, 10L);
Expand Down Expand Up @@ -185,6 +186,199 @@ public void testManualArrowBatch() throws Exception {
tEnv.registerCatalog("lakesoul", new LakeSoulCatalog());
tEnv.useCatalog("lakesoul");
tEnv.executeSql("select * from `default`.`qar_table`").print();
tEnv.executeSql("drop table `default`.`qar_table`").print();
}

@Test
public void testSchemaChange() throws Exception {
Configuration conf = new Configuration();
conf.set(MAX_ROW_GROUP_SIZE, 2);
conf.set(FILE_ROLLING_SIZE, 10L);
conf.set(AUTO_SCHEMA_CHANGE, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

{
Schema arrowSchema = new Schema(Arrays.asList(
// string column
new Field("field_string", FieldType.nullable(new ArrowType.Utf8()), null),
// signed 64 bit integer column
new Field("field_int64", FieldType.nullable(new ArrowType.Int(64, true)), null),
// float 32 column
new Field("field_float32",
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null),
// date partition column
new Field("timestamp", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), null)
));

// TableInfo object can be reused
TableInfo sinkTableInfo = TableInfo
.newBuilder()
.setTableId("NOT_USED")
.setTableNamespace("default")
.setTableName("qar_table")
.setTableSchema(arrowSchema.toJson())
.setTablePath("file:///tmp/test_arrow_sink")
.setPartitions(";")
.setProperties("{}")
.build();

byte[] sinkTableInfoEncoded = sinkTableInfo.toByteArray();
List<LakeSoulArrowWrapper> arrowBatches = new ArrayList<>();

try (
BufferAllocator allocator = new RootAllocator();
VectorSchemaRoot arrowBatch = VectorSchemaRoot.create(arrowSchema, allocator)
) {
int batchSize = 3;

// create string vector
VarCharVector strVector = (VarCharVector) arrowBatch.getVector("field_string");
strVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
strVector.set(i, ("" + i).getBytes());
strVector.set(i, ("" + i).getBytes());
strVector.set(i, ("" + i).getBytes());
}
strVector.setValueCount(batchSize);

// create int64 vector
BigIntVector bigIntVector = (BigIntVector) arrowBatch.getVector("field_int64");
bigIntVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
bigIntVector.set(i, i + i);
}
bigIntVector.setValueCount(batchSize);

// create float32 vector
Float4Vector float4Vector = (Float4Vector) arrowBatch.getVector("field_float32");
float4Vector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
float4Vector.set(i, (float) i + 2.0f * (float) i);
}
float4Vector.setValueCount(batchSize);

// create time stamp vector
TimeStampMilliTZVector timestampVector = (TimeStampMilliTZVector) arrowBatch.getVector("timestamp");
timestampVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
timestampVector.set(i, Instant.now().toEpochMilli());
}
timestampVector.setValueCount(batchSize);

arrowBatch.setRowCount(batchSize);

arrowBatches.add(new LakeSoulArrowWrapper(sinkTableInfoEncoded, arrowBatch));
}

DataStreamSource<LakeSoulArrowWrapper> source = env.fromCollection(arrowBatches);
LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = (Configuration) env.getConfiguration();
context.conf.set(AUTO_SCHEMA_CHANGE, true);

LakeSoulMultiTableSinkStreamBuilder.buildArrowSink(context, source);

env.execute("Test Arrow Sink");

// read data
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv.registerCatalog("lakesoul", new LakeSoulCatalog());
tEnv.useCatalog("lakesoul");
tEnv.executeSql("desc `default`.`qar_table`").print();
tEnv.executeSql("select * from `default`.`qar_table`").print();
}

// auto change schema
{
Schema arrowSchema = new Schema(Arrays.asList(
// string column
new Field("field_string", FieldType.nullable(new ArrowType.Int(64, true)), null),
// signed 64 bit integer column
new Field("field_int64", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
// float 32 column
new Field("field_float32",
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null),
// date partition column
new Field("timestamp", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), null)
));

// TableInfo object can be reused
TableInfo sinkTableInfo = TableInfo
.newBuilder()
.setTableId("NOT_USED")
.setTableNamespace("default")
.setTableName("qar_table")
.setTableSchema(arrowSchema.toJson())
.setTablePath("file:///tmp/test_arrow_sink")
.setPartitions(";")
.setProperties("{}")
.build();

byte[] sinkTableInfoEncoded = sinkTableInfo.toByteArray();
List<LakeSoulArrowWrapper> arrowBatches = new ArrayList<>();

try (
BufferAllocator allocator = new RootAllocator();
VectorSchemaRoot arrowBatch = VectorSchemaRoot.create(arrowSchema, allocator)
) {
int batchSize = 3;

// create string vector
BigIntVector strVector = (BigIntVector) arrowBatch.getVector("field_string");
strVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
strVector.set(i, i + 15);
strVector.set(i, i + 35);
strVector.set(i, i + 55);
}
strVector.setValueCount(batchSize);

// create int64 vector
VarCharVector bigIntVector = (VarCharVector) arrowBatch.getVector("field_int64");
bigIntVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
bigIntVector.set(i, ("" + (i + i)).getBytes());
}
bigIntVector.setValueCount(batchSize);

// create float32 vector
Float8Vector float4Vector = (Float8Vector) arrowBatch.getVector("field_float32");
float4Vector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
float4Vector.set(i, (double) i + 2.0f * (double) i);
}
float4Vector.setValueCount(batchSize);

// create time stamp vector
TimeStampMilliVector timestampVector = (TimeStampMilliVector) arrowBatch.getVector("timestamp");
timestampVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
timestampVector.set(i, Instant.now().toEpochMilli());
}
timestampVector.setValueCount(batchSize);

arrowBatch.setRowCount(batchSize);

arrowBatches.add(new LakeSoulArrowWrapper(sinkTableInfoEncoded, arrowBatch));
}

DataStreamSource<LakeSoulArrowWrapper> source = env.fromCollection(arrowBatches);
LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = (Configuration) env.getConfiguration();
context.conf.set(AUTO_SCHEMA_CHANGE, true);

LakeSoulMultiTableSinkStreamBuilder.buildArrowSink(context, source);

env.execute("Test Arrow Sink");
// read data
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv.registerCatalog("lakesoul", new LakeSoulCatalog());
tEnv.useCatalog("lakesoul");
tEnv.executeSql("desc `default`.`qar_table`").print();
tEnv.executeSql("select * from `default`.`qar_table`").print();
}
}

public static void main(String[] args) throws Exception {
Expand Down
Loading

0 comments on commit d57618b

Please sign in to comment.