Skip to content

Commit

Permalink
[hack] separate kafka consumer for probe task
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Nov 14, 2024
1 parent f173d3a commit ee439b0
Showing 1 changed file with 52 additions and 12 deletions.
64 changes: 52 additions & 12 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,13 @@ impl SourceRender for KafkaSourceConnection {
let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
let health_status = Arc::new(Mutex::new(Default::default()));
let notificator = Arc::new(Notify::new());

let consumer: Result<BaseConsumer<_>, _> = connection
.create_with_context(
&config.config,
GlueConsumerContext {
notificator: Arc::clone(&notificator),
stats_tx,
stats_tx: stats_tx.clone(),
inner: MzClientContext::default(),
},
&btreemap! {
Expand Down Expand Up @@ -363,9 +364,48 @@ impl SourceRender for KafkaSourceConnection {
)
.await;

let consumer = match consumer {
Ok(consumer) => Arc::new(consumer),
Err(e) => {
let consumer2: Result<BaseConsumer<_>, _> = connection
.create_with_context(
&config.config,
GlueConsumerContext {
notificator: Arc::clone(&notificator),
stats_tx,
inner: MzClientContext::default(),
},
&btreemap! {
// Disable Kafka auto commit. We manually commit offsets
// to Kafka once we have reclocked those offsets, so
// that users can use standard Kafka tools for progress
// tracking.
"enable.auto.commit" => "false".into(),
// Always begin ingest at 0 when restarted, even if Kafka
// contains committed consumer read offsets
"auto.offset.reset" => "earliest".into(),
// Use the user-configured topic metadata refresh
// interval.
"topic.metadata.refresh.interval.ms" =>
topic_metadata_refresh_interval
.as_millis()
.to_string(),
// TODO: document the rationale for this.
"fetch.message.max.bytes" => "134217728".into(),
// Consumer group ID, which may have been overridden by
// the user. librdkafka requires this, and we use offset
// committing to provide a way for users to monitor
// ingest progress, though we do not rely on the
// committed offsets for any functionality.
"group.id" => group_id.clone(),
// Allow Kafka monitoring tools to identify this
// consumer.
"client.id" => format!("{client_id}-metadata"),
},
InTask::Yes,
)
.await;

let (consumer, consumer2) = match (consumer, consumer2) {
(Ok(consumer), Ok(consumer2)) => (Arc::new(consumer), consumer2),
(Err(e), _) | (_, Err(e)) => {
let update = HealthStatusUpdate::halting(
format!(
"failed creating kafka consumer: {}",
Expand Down Expand Up @@ -410,7 +450,6 @@ impl SourceRender for KafkaSourceConnection {
let metadata_thread_handle = {
let partition_info = Arc::downgrade(&partition_info);
let topic = topic.clone();
let consumer = Arc::clone(&consumer);

// We want a fairly low ceiling on our polling frequency, since we rely
// on this heartbeat to determine the health of our Kafka connection.
Expand Down Expand Up @@ -439,7 +478,7 @@ impl SourceRender for KafkaSourceConnection {
let probe_ts =
mz_repr::Timestamp::try_from((now_fn)()).expect("must fit");
let result = fetch_partition_info(
consumer.client(),
consumer2.client(),
&topic,
config
.config
Expand All @@ -458,11 +497,11 @@ impl SourceRender for KafkaSourceConnection {
Ok(info) => {
*partition_info.lock().unwrap() = Some((probe_ts, info));
trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: updated partition metadata info",
);
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: updated partition metadata info",
);

// Clear all the health namespaces we know about.
// Note that many kafka sources's don't have an ssh tunnel, but
Expand All @@ -479,7 +518,7 @@ impl SourceRender for KafkaSourceConnection {
));

let ssh_status =
consumer.client().context().tunnel_status();
consumer2.client().context().tunnel_status();
let ssh_status = match ssh_status {
SshTunnelStatus::Running => {
Some(HealthStatusUpdate::running())
Expand All @@ -495,6 +534,7 @@ impl SourceRender for KafkaSourceConnection {
}
}
}
consumer2.poll(Duration::from_millis(0));
thread::park_timeout(poll_interval);
}
info!(
Expand Down

0 comments on commit ee439b0

Please sign in to comment.