Skip to content

Commit

Permalink
[Improve][CDC] Filter heartbeat event
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Jan 22, 2025
1 parent cc22fb1 commit efc6a4f
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isHeartbeatRecord;

@Slf4j
public class DebeziumJsonDeserializeSchema
extends AbstractDebeziumDeserializationSchema<SeaTunnelRow> {
Expand All @@ -60,9 +62,13 @@ public DebeziumJsonDeserializeSchema(
@Override
public void deserialize(SourceRecord record, Collector<SeaTunnelRow> out) throws Exception {
super.deserialize(record, out);
if (!isHeartbeatRecord(record)) {
SeaTunnelRow row = deserializationSchema.deserialize(record);
out.collect(row);
return;
}

SeaTunnelRow row = deserializationSchema.deserialize(record);
out.collect(row);
log.debug("Unsupported record {}, just skip.", record);
}

@Override
Expand Down

0 comments on commit efc6a4f

Please sign in to comment.