Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about waiting for rebalance callbacks #701

Open
rhishikeshj opened this issue Jul 30, 2024 · 0 comments
Open

Question about waiting for rebalance callbacks #701

rhishikeshj opened this issue Jul 30, 2024 · 0 comments

Comments

@rhishikeshj
Copy link

rhishikeshj commented Jul 30, 2024

Our use-case is as follows

  • Create a consumer with a consumer group (this will be the only consumer in that group)
  • Subscribe to the topic
  • Get information about the committed offsets for this consumer group

As I understand, getting any information from the consumer about offsets can only happen when the rebalance callbacks have been called. Here's a simplistic version of a consumer which uses a tokio::sync::Notify to notify when that happens.

use rdkafka::{
    consumer::{BaseConsumer, Consumer, ConsumerContext, Rebalance},
    error::{KafkaError, KafkaResult},
    ClientConfig, ClientContext, Offset, TopicPartitionList,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::Notify;

pub struct NotifierContext {
    notify: Arc<Notify>,
}

impl NotifierContext {
    fn new(notify: Arc<Notify>) -> Self {
        Self { notify }
    }
}

impl ClientContext for NotifierContext {}

impl ConsumerContext for NotifierContext {
    fn pre_rebalance(&self, rebalance: &Rebalance) {
        println!("Pre rebalance {:?}", rebalance);
    }

    fn post_rebalance(&self, rebalance: &Rebalance) {
        println!("Post rebalance {:?}", rebalance);
        self.notify.notify_one();
    }

    fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
        println!("Committing offsets: {:?}", result);
    }
}

pub type NotifyingConsumer = BaseConsumer<NotifierContext>;

pub fn new_consumer<C>(
    brokers: &str,
    group_id: &str,
    config_overrides: Vec<(String, String)>,
) -> (C, Arc<Notify>)
where
    C: rdkafka::consumer::Consumer<NotifierContext>
        + rdkafka::config::FromClientConfigAndContext<NotifierContext>,
{
    let notify = Arc::new(Notify::new());
    let ret_notify = notify.clone();
    let context = NotifierContext::new(notify);
    let mut final_config = ClientConfig::new();
    final_config
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        .set("auto.offset.reset", "earliest");

    final_config.extend(config_overrides);
    let consumer: C = final_config
        .create_with_context(context)
        .expect("Consumer creation failed");
    (consumer, ret_notify)
}

pub async fn get_offsets(
    consumer: &NotifyingConsumer,
) -> Result<HashMap<(String, i32), Offset>, KafkaError> {
    Ok(consumer.committed(Duration::from_secs(5))?.to_topic_map())
}

#[tokio::main]
async fn main() {
    let brokers = "some-url";
    let group_id = "krusty-consumer-3";
    let topic = "some-topic";

    let (consumer, notify): (NotifyingConsumer, Arc<Notify>) =
        new_consumer(brokers, group_id, vec![]);
    consumer
        .subscribe(&[topic])
        .expect("Can't subscribe to specified topics");
    println!("subscribed");

    let _ = consumer.poll(Duration::from_secs(5));
    println!("polled");

    let mut partitions = TopicPartitionList::new();
    partitions.add_partition(&topic, 0);
    partitions.add_partition(&topic, 1);
    let _ = consumer.pause(&partitions);
    println!("paused");

    // Wait for rebalance callbacks to be called
    // If rebalance doesn't happen, we dont get offset info correctly
    notify.notified().await; // this never returns

    println!("notified");

    // now get some information about consumer group's offsets
    let offsets = get_offsets(&consumer).await;

    println!("Received offsets as {:?}", offsets);
}

I read in the github issues here that to trigger a rebalance, we should be calling pause. But this does not seem to work. When i await on the notify everything deadlocks (my guess) ??
If instead I use a streaming consumer and await on the first recv, then that tends to trigger the rebalance just fine and then I can get the offset information correctly.

But my use-case dictates that I need to know the consumer groups offsets without having to read a message. Because it might happen that the offset is latest and a new message never comes in.

What I really want to do is if a group does not have valid offsets for a topic, I want to commit the current highest offsets as its committed offsets so that next time it runs, it reads from this offset. (offset management is manual)
If i use auto.offset.reset as latest, that's not the same because if the topic gets a new message during the time this consumer wasnt running, that message will be missed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant