diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java index 47b1413bd73..9e5215203f5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java @@ -34,6 +34,8 @@ import java.util.List; import java.util.Map; +import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isHeartbeatRecord; + @Slf4j public class DebeziumJsonDeserializeSchema extends AbstractDebeziumDeserializationSchema { @@ -60,9 +62,13 @@ public DebeziumJsonDeserializeSchema( @Override public void deserialize(SourceRecord record, Collector out) throws Exception { super.deserialize(record, out); + if (!isHeartbeatRecord(record)) { + SeaTunnelRow row = deserializationSchema.deserialize(record); + out.collect(row); + return; + } - SeaTunnelRow row = deserializationSchema.deserialize(record); - out.collect(row); + log.debug("Unsupported record {}, just skip.", record); } @Override