From 00f0721662c2e9a3c4a1702bf13dbd937488295a Mon Sep 17 00:00:00 2001 From: Kartik Khare Date: Thu, 26 Dec 2024 12:37:12 +0530 Subject: [PATCH] Fix false data loss alerts in case of read_committed Kafka isolation (#14716) * Fix false data loss alerts in case of read_committed Kafka isolation * Fix null handling --------- Co-authored-by: Kartik Khare --- .../stream/kafka20/KafkaPartitionLevelConsumer.java | 11 ++++++++++- .../stream/kafka30/KafkaPartitionLevelConsumer.java | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index c1d4873abf4c..251b378ab944 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch; +import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.BytesStreamMessage; @@ -88,8 +89,16 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta } } + // In case read_committed is enabled, the messages consumed are not guaranteed to have consecutive offsets. + // TODO: A better solution would be to fetch earliest offset from topic and see if it is greater than startOffset. + // However, this would require and additional call to Kafka which we want to avoid. + boolean hasDataLoss = false; + if (_config.getKafkaIsolationLevel() == null || _config.getKafkaIsolationLevel() + .equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)) { + hasDataLoss = firstOffset > startOffset; + } return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata, - firstOffset > startOffset); + hasDataLoss); } private StreamMessageMetadata extractMessageMetadata(ConsumerRecord record) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java index 000320406724..2e0e910f7cf5 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch; +import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.BytesStreamMessage; @@ -88,8 +89,16 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta } } + // In case read_committed is enabled, the messages consumed are not guaranteed to have consecutive offsets. + // TODO: A better solution would be to fetch earliest offset from topic and see if it is greater than startOffset. + // However, this would require and additional call to Kafka which we want to avoid. + boolean hasDataLoss = false; + if (_config.getKafkaIsolationLevel() == null || _config.getKafkaIsolationLevel() + .equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)) { + hasDataLoss = firstOffset > startOffset; + } return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata, - firstOffset > startOffset); + hasDataLoss); } private StreamMessageMetadata extractMessageMetadata(ConsumerRecord record) {