From e132294c96f0b68d56c4d5d5a30326df5286891f Mon Sep 17 00:00:00 2001 From: Danny Cranmer Date: Wed, 11 Nov 2020 11:21:56 +0000 Subject: [PATCH] Fix issue when Polling consumer using timestamp with empty shard (#6) Fix issue where Polling consumer would fail when starting consumption from timestamp and first record batch is empty --- .../polling/PollingRecordPublisher.java | 22 ++++++++++++-- .../flink/internals/ShardConsumerTest.java | 17 +++++++++++ .../FakeKinesisBehavioursFactory.java | 30 +++++++++++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/polling/PollingRecordPublisher.java b/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/polling/PollingRecordPublisher.java index ab47efc..abfbe6b 100644 --- a/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/polling/PollingRecordPublisher.java +++ b/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/polling/PollingRecordPublisher.java @@ -36,8 +36,10 @@ import javax.annotation.Nullable; +import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; import static software.amazon.kinesis.connectors.flink.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE; import static software.amazon.kinesis.connectors.flink.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE; +import static software.amazon.kinesis.connectors.flink.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM; /** * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber. @@ -48,6 +50,8 @@ public class PollingRecordPublisher implements RecordPublisher { private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class); + private static final SequenceNumber TIMESTAMP_SENTINEL_SEQUENCE_NUMBER = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get(); + private final PollingRecordPublisherMetricsReporter metricsReporter; private final KinesisProxyInterface kinesisProxy; @@ -108,13 +112,27 @@ public RecordPublisherRunResult run(final RecordBatchConsumer consumer, int maxN GetRecordsResult result = getRecords(nextShardItr, maxNumberOfRecords); RecordBatch recordBatch = new RecordBatch(result.getRecords(), subscribedShard, result.getMillisBehindLatest()); - SequenceNumber latestSeequenceNumber = consumer.accept(recordBatch); - nextStartingPosition = StartingPosition.continueFromSequenceNumber(latestSeequenceNumber); + SequenceNumber latestSequenceNumber = consumer.accept(recordBatch); + + nextStartingPosition = getNextStartingPosition(latestSequenceNumber); nextShardItr = result.getNextShardIterator(); return nextShardItr == null ? COMPLETE : INCOMPLETE; } + private StartingPosition getNextStartingPosition(final SequenceNumber latestSequenceNumber) { + // When consuming from a timestamp sentinel/AT_TIMESTAMP ShardIteratorType. + // If the first RecordBatch is empty, then the latestSequenceNumber would be the timestamp sentinel. + // This is because we have not yet received any real sequence numbers on this shard. + // In this condition we should retry from the previous starting position (AT_TIMESTAMP). + if (TIMESTAMP_SENTINEL_SEQUENCE_NUMBER.equals(latestSequenceNumber)) { + Preconditions.checkState(nextStartingPosition.getShardIteratorType() == AT_TIMESTAMP); + return nextStartingPosition; + } else { + return StartingPosition.continueFromSequenceNumber(latestSequenceNumber); + } + } + /** * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on diff --git a/src/test/java/software/amazon/kinesis/connectors/flink/internals/ShardConsumerTest.java b/src/test/java/software/amazon/kinesis/connectors/flink/internals/ShardConsumerTest.java index a68c5eb..2e65f1d 100644 --- a/src/test/java/software/amazon/kinesis/connectors/flink/internals/ShardConsumerTest.java +++ b/src/test/java/software/amazon/kinesis/connectors/flink/internals/ShardConsumerTest.java @@ -33,8 +33,10 @@ import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static software.amazon.kinesis.connectors.flink.internals.ShardConsumerTestUtils.fakeSequenceNumber; import static software.amazon.kinesis.connectors.flink.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM; @@ -53,6 +55,21 @@ public void testMetricsReporting() throws Exception { assertEquals(500, metrics.getMillisBehindLatest()); } + @Test + public void testTimestampStartingPositionWithEmptyShard() throws Exception { + Properties consumerProperties = new Properties(); + consumerProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2020-11-11T09:14"); + consumerProperties.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd'T'HH:mm"); + SequenceNumber sequenceNumber = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get(); + + final int numberOfIterations = 3; + KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.emptyShard(numberOfIterations)); + + assertNumberOfMessagesReceivedFromKinesis(0, kinesis, sequenceNumber, consumerProperties); + verify(kinesis).getShardIterator(any(), eq("AT_TIMESTAMP"), any()); + verify(kinesis, times(numberOfIterations)).getRecords(any(), anyInt()); + } + @Test public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceNumber() throws Exception { KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L)); diff --git a/src/test/java/software/amazon/kinesis/connectors/flink/testutils/FakeKinesisBehavioursFactory.java b/src/test/java/software/amazon/kinesis/connectors/flink/testutils/FakeKinesisBehavioursFactory.java index 3f880a2..0d16f17 100644 --- a/src/test/java/software/amazon/kinesis/connectors/flink/testutils/FakeKinesisBehavioursFactory.java +++ b/src/test/java/software/amazon/kinesis/connectors/flink/testutils/FakeKinesisBehavioursFactory.java @@ -86,6 +86,10 @@ public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map streamNamesWithLastSeenShardIds) throws InterruptedException { + return null; + } + } + private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis { private final long millisBehindLatest;