Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
benesch committed May 16, 2020
1 parent c0ea567 commit d11dea5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
10 changes: 8 additions & 2 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ impl<'a, C: ConsumerContext + 'static> MessageStream<'a, C> {
}

fn set_waker(&self, waker: Waker) {
self.context().waker.lock().expect("lock poisoned").replace(waker);
self.context()
.waker
.lock()
.expect("lock poisoned")
.replace(waker);
}

fn poll(&self) -> Option<KafkaResult<BorrowedMessage<'a>>> {
Expand All @@ -140,7 +144,9 @@ impl<'a, C: ConsumerContext + 'a> Stream for MessageStream<'a, C> {
if let Some(err_interval) = self.err_interval {
// We've been asked to periodically report that there are no
// new messages. Check if it's time to do so.
let mut err_delay = self.err_delay.get_or_insert_with(|| time::delay_for(err_interval));
let mut err_delay = self
.err_delay
.get_or_insert_with(|| time::delay_for(err_interval));
ready!(Pin::new(&mut err_delay).poll(cx));
err_delay.reset(Instant::now() + err_interval);
Poll::Ready(Some(Err(KafkaError::NoMessageReceived)))
Expand Down
15 changes: 12 additions & 3 deletions tests/test_high_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,18 @@ async fn test_consume_with_no_message_error() {

// Assert that we receive a few NoMessageReceived errors after we see the
// one record.
assert!(matches!(message_stream.next().await, Some(Err(KafkaError::NoMessageReceived))));
assert!(matches!(message_stream.next().await, Some(Err(KafkaError::NoMessageReceived))));
assert!(matches!(message_stream.next().await, Some(Err(KafkaError::NoMessageReceived))));
assert!(matches!(
message_stream.next().await,
Some(Err(KafkaError::NoMessageReceived))
));
assert!(matches!(
message_stream.next().await,
Some(Err(KafkaError::NoMessageReceived))
));
assert!(matches!(
message_stream.next().await,
Some(Err(KafkaError::NoMessageReceived))
));
}

// TODO: add check that commit cb gets called correctly
Expand Down

0 comments on commit d11dea5

Please sign in to comment.