Skip to content

Commit

Permalink
update schema with record of largest debezium ts_ms
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Aug 1, 2024
1 parent 26e43f8 commit ff9b3c1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
sparkSchema,
partitionKeys.rangeKeys,
partitionKeys.primaryKeys);
LOG.info("{}", equalOrCanCastTuple3);
String equalOrCanCast = equalOrCanCastTuple3._1();
boolean schemaChanged = (boolean) equalOrCanCastTuple3._2();
StructType mergeStructType = equalOrCanCastTuple3._3();
Expand Down Expand Up @@ -273,12 +274,10 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
if (equalOrCanCast.contains("Change of Partition Column") || equalOrCanCast.contains("Change of Primary Key Column")) {
throw new IOException(equalOrCanCast);
}
// for (LakeSoulMultiTableSinkCommittable committable : committableList) {
// if (committable.getTsMs() > schemaLastChangeTime) {
// LOG.error("incompatible cast data created and delayThreshold time: {}, dml create time: {}", schemaLastChangeTime, committable.getTsMs());
// throw new IOException(equalOrCanCast);
// }
// }
if (entry.getValue().f0 > schemaLastChangeTime) {
LOG.error("incompatible cast data created and delayThreshold time: {}, dml create time: {}", schemaLastChangeTime, entry.getValue().f0);
throw new IOException(equalOrCanCast);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ abstract class DDLTestBase extends QueryTest with SQLTestUtils {
}
}

test("read") {
sql("show create table TestBinarySourceRecordSink").show(false)
sql("desc TestBinarySourceRecordSink").show()
sql("select * from TestBinarySourceRecordSink").show()
}
// test("read") {
// sql("show create table TestBinarySourceRecordSink").show(false)
// sql("desc TestBinarySourceRecordSink").show()
// sql("select * from TestBinarySourceRecordSink").show()
// }
}

0 comments on commit ff9b3c1

Please sign in to comment.