From efc6a4ffe88e004395a878e509e5fd8369f7511e Mon Sep 17 00:00:00 2001 From: hailin0 Date: Wed, 22 Jan 2025 10:53:18 +0800 Subject: [PATCH] [Improve][CDC] Filter heartbeat event --- .../debezium/row/DebeziumJsonDeserializeSchema.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java index 47b1413bd73..9e5215203f5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/DebeziumJsonDeserializeSchema.java @@ -34,6 +34,8 @@ import java.util.List; import java.util.Map; +import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isHeartbeatRecord; + @Slf4j public class DebeziumJsonDeserializeSchema extends AbstractDebeziumDeserializationSchema { @@ -60,9 +62,13 @@ public DebeziumJsonDeserializeSchema( @Override public void deserialize(SourceRecord record, Collector out) throws Exception { super.deserialize(record, out); + if (!isHeartbeatRecord(record)) { + SeaTunnelRow row = deserializationSchema.deserialize(record); + out.collect(row); + return; + } - SeaTunnelRow row = deserializationSchema.deserialize(record); - out.collect(row); + log.debug("Unsupported record {}, just skip.", record); } @Override