Skip to content

Commit

Permalink
Fix issue when Polling consumer using timestamp with empty shard (#6)
Browse files Browse the repository at this point in the history
Fix issue where Polling consumer would fail when starting consumption from timestamp and first record batch is empty
  • Loading branch information
dannycranmer authored Nov 11, 2020
1 parent 33320fb commit e132294
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@

import javax.annotation.Nullable;

import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
import static software.amazon.kinesis.connectors.flink.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
import static software.amazon.kinesis.connectors.flink.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
import static software.amazon.kinesis.connectors.flink.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;

/**
* A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber.
Expand All @@ -48,6 +50,8 @@ public class PollingRecordPublisher implements RecordPublisher {

private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);

private static final SequenceNumber TIMESTAMP_SENTINEL_SEQUENCE_NUMBER = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();

private final PollingRecordPublisherMetricsReporter metricsReporter;

private final KinesisProxyInterface kinesisProxy;
Expand Down Expand Up @@ -108,13 +112,27 @@ public RecordPublisherRunResult run(final RecordBatchConsumer consumer, int maxN
GetRecordsResult result = getRecords(nextShardItr, maxNumberOfRecords);

RecordBatch recordBatch = new RecordBatch(result.getRecords(), subscribedShard, result.getMillisBehindLatest());
SequenceNumber latestSeequenceNumber = consumer.accept(recordBatch);

nextStartingPosition = StartingPosition.continueFromSequenceNumber(latestSeequenceNumber);
SequenceNumber latestSequenceNumber = consumer.accept(recordBatch);

nextStartingPosition = getNextStartingPosition(latestSequenceNumber);
nextShardItr = result.getNextShardIterator();
return nextShardItr == null ? COMPLETE : INCOMPLETE;
}

private StartingPosition getNextStartingPosition(final SequenceNumber latestSequenceNumber) {
// When consuming from a timestamp sentinel/AT_TIMESTAMP ShardIteratorType.
// If the first RecordBatch is empty, then the latestSequenceNumber would be the timestamp sentinel.
// This is because we have not yet received any real sequence numbers on this shard.
// In this condition we should retry from the previous starting position (AT_TIMESTAMP).
if (TIMESTAMP_SENTINEL_SEQUENCE_NUMBER.equals(latestSequenceNumber)) {
Preconditions.checkState(nextStartingPosition.getShardIteratorType() == AT_TIMESTAMP);
return nextStartingPosition;
} else {
return StartingPosition.continueFromSequenceNumber(latestSequenceNumber);
}
}

/**
* Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
* AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static software.amazon.kinesis.connectors.flink.internals.ShardConsumerTestUtils.fakeSequenceNumber;
import static software.amazon.kinesis.connectors.flink.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
Expand All @@ -53,6 +55,21 @@ public void testMetricsReporting() throws Exception {
assertEquals(500, metrics.getMillisBehindLatest());
}

@Test
public void testTimestampStartingPositionWithEmptyShard() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2020-11-11T09:14");
consumerProperties.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd'T'HH:mm");
SequenceNumber sequenceNumber = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();

final int numberOfIterations = 3;
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.emptyShard(numberOfIterations));

assertNumberOfMessagesReceivedFromKinesis(0, kinesis, sequenceNumber, consumerProperties);
verify(kinesis).getShardIterator(any(), eq("AT_TIMESTAMP"), any());
verify(kinesis, times(numberOfIterations)).getRecords(any(), anyInt());
}

@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceNumber() throws Exception {
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String, Int
// Behaviours related to fetching records, used mainly in ShardConsumerTest
// ------------------------------------------------------------------------

public static KinesisProxyInterface emptyShard(final int numberOfIterations) {
return new SingleShardEmittingZeroRecords(numberOfIterations);
}

public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(
final int numOfRecords,
final int numOfGetRecordsCalls,
Expand Down Expand Up @@ -133,6 +137,32 @@ public static KinesisProxyInterface blockingQueueGetRecords(Map<String, List<Blo
return new BlockingQueueKinesis(streamsToShardQueues);
}

private static class SingleShardEmittingZeroRecords implements KinesisProxyInterface {

private int remainingIterators;

private SingleShardEmittingZeroRecords(int remainingIterators) {
this.remainingIterators = remainingIterators;
}

@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) throws InterruptedException {
return String.valueOf(remainingIterators--);
}

@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
return new GetRecordsResult()
.withMillisBehindLatest(0L)
.withNextShardIterator(remainingIterators == 0 ? null : String.valueOf(remainingIterators--));
}

@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
return null;
}
}

private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {

private final long millisBehindLatest;
Expand Down

0 comments on commit e132294

Please sign in to comment.