Skip to content

Commit

Permalink
Fix false data loss alerts in case of read_committed Kafka isolation (#…
Browse files Browse the repository at this point in the history
…14716)

* Fix false data loss alerts in case of read_committed Kafka isolation

* Fix null handling

---------

Co-authored-by: Kartik Khare <[email protected]>
  • Loading branch information
KKcorps and Kartik Khare authored Dec 26, 2024
1 parent c50ed0d commit 00f0721
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.BytesStreamMessage;
Expand Down Expand Up @@ -88,8 +89,16 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta
}
}

// In case read_committed is enabled, the messages consumed are not guaranteed to have consecutive offsets.
// TODO: A better solution would be to fetch earliest offset from topic and see if it is greater than startOffset.
// However, this would require and additional call to Kafka which we want to avoid.
boolean hasDataLoss = false;
if (_config.getKafkaIsolationLevel() == null || _config.getKafkaIsolationLevel()
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)) {
hasDataLoss = firstOffset > startOffset;
}
return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata,
firstOffset > startOffset);
hasDataLoss);
}

private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, Bytes> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.BytesStreamMessage;
Expand Down Expand Up @@ -88,8 +89,16 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta
}
}

// In case read_committed is enabled, the messages consumed are not guaranteed to have consecutive offsets.
// TODO: A better solution would be to fetch earliest offset from topic and see if it is greater than startOffset.
// However, this would require and additional call to Kafka which we want to avoid.
boolean hasDataLoss = false;
if (_config.getKafkaIsolationLevel() == null || _config.getKafkaIsolationLevel()
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)) {
hasDataLoss = firstOffset > startOffset;
}
return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata,
firstOffset > startOffset);
hasDataLoss);
}

private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, Bytes> record) {
Expand Down

0 comments on commit 00f0721

Please sign in to comment.