Skip to content

Commit

Permalink
fix(cymbal): work in batches (#26796)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Dec 11, 2024
1 parent 3d2b0b3 commit 48ee0e1
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 27 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/common/kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ tracing = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
28 changes: 28 additions & 0 deletions rust/common/kafka/src/kafka_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rdkafka::{
ClientConfig, Message,
};
use serde::de::DeserializeOwned;
use std::time::Duration;

use crate::config::{ConsumerConfig, KafkaConfig};

Expand Down Expand Up @@ -97,6 +98,33 @@ impl SingleTopicConsumer {

Ok((payload, offset))
}

pub async fn json_recv_batch<T>(
&self,
max: usize,
timeout: Duration,
) -> Vec<Result<(T, Offset), RecvErr>>
where
T: DeserializeOwned,
{
let mut results = Vec::with_capacity(max);

tokio::select! {
_ = tokio::time::sleep(timeout) => {},
_ = async {
while results.len() < max {
let result = self.json_recv::<T>().await;
let was_err = result.is_err();
results.push(result);
if was_err {
break; // Early exit on error, since it might indicate a kafka error or something
}
}
} => {}
}

results
}
}

pub struct Offset {
Expand Down
6 changes: 6 additions & 0 deletions rust/cymbal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ pub struct Config {
// Maximum number of lines of pre and post context to get per frame
#[envconfig(default = "15")]
pub context_line_count: usize,

#[envconfig(default = "1000")]
pub max_events_per_batch: usize,

#[envconfig(default = "10")]
pub max_event_batch_wait_seconds: u64,
}

impl Config {
Expand Down
32 changes: 31 additions & 1 deletion rust/cymbal/src/hack/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use rdkafka::{
};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::error::Error as SerdeError;
use std::sync::{Arc, Weak};
use std::{
sync::{Arc, Weak},
time::Duration,
};
use thiserror::Error;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -145,6 +148,33 @@ impl SingleTopicConsumer {

Ok((payload, offset))
}

pub async fn json_recv_batch<T>(
&self,
max: usize,
timeout: Duration,
) -> Vec<Result<(T, Offset), RecvErr>>
where
T: DeserializeOwned,
{
let mut results = Vec::with_capacity(max);

tokio::select! {
_ = tokio::time::sleep(timeout) => {},
_ = async {
while results.len() < max {
let result = self.json_recv::<T>().await;
let was_err = result.is_err();
results.push(result);
if was_err {
break; // Early exit on error, since it might indicate a kafka error or something
}
}
} => {}
}

results
}
}

pub struct Offset {
Expand Down
70 changes: 44 additions & 26 deletions rust/cymbal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,46 +58,64 @@ async fn main() {

start_health_liveness_server(&config, context.clone());

let batch_wait_time = std::time::Duration::from_secs(config.max_event_batch_wait_seconds);
let batch_size = config.max_events_per_batch;

loop {
let whole_loop = common_metrics::timing_guard(MAIN_LOOP_TIME, &[]);
context.worker_liveness.report_healthy().await;
// Just grab the event as a serde_json::Value and immediately drop it,
// we can work out a real type for it later (once we're deployed etc)
let (event, offset): (ClickHouseEvent, _) = match context.kafka_consumer.json_recv().await {
Ok(r) => r,
Err(RecvErr::Kafka(e)) => {
panic!("Kafka error: {}", e)
}
Err(err) => {
// If we failed to parse the message, or it was empty, just log and continue, our
// consumer has already stored the offset for us.
metrics::counter!(ERRORS, "cause" => "recv_err").increment(1);
error!("Error receiving message: {:?}", err);
continue;
}
};
metrics::counter!(EVENT_RECEIVED).increment(1);

let event = match handle_event(context.clone(), event).await {
Ok(e) => e,
Err(e) => {
error!("Error handling event: {:?}", e);
// If we get an unhandled error, it means we have some logical error in the code, or a
// dependency is down, and we should just fall over.
panic!("Unhandled error: {:?}", e);
}
};
let received: Vec<Result<(ClickHouseEvent, _), _>> = context
.kafka_consumer
.json_recv_batch(batch_size, batch_wait_time)
.await;

let mut output = Vec::with_capacity(received.len());
let mut offsets = Vec::with_capacity(received.len());
for message in received {
let (event, offset) = match message {
Ok(r) => r,
Err(RecvErr::Kafka(e)) => {
panic!("Kafka error: {}", e)
}
Err(err) => {
// If we failed to parse the message, or it was empty, just log and continue, our
// consumer has already stored the offset for us.
metrics::counter!(ERRORS, "cause" => "recv_err").increment(1);
error!("Error receiving message: {:?}", err);
continue;
}
};

metrics::counter!(EVENT_RECEIVED).increment(1);

let event = match handle_event(context.clone(), event).await {
Ok(e) => e,
Err(e) => {
error!("Error handling event: {:?}", e);
// If we get an unhandled error, it means we have some logical error in the code, or a
// dependency is down, and we should just fall over.
panic!("Unhandled error: {:?}", e);
}
};

output.push(event);
offsets.push(offset);
}

send_keyed_iter_to_kafka(
&context.kafka_producer,
&context.config.events_topic,
|ev| Some(ev.uuid.to_string()),
&[event],
&output,
)
.await
.expect("Failed to send event to Kafka");

offset.store().unwrap();
for offset in offsets {
offset.store().unwrap();
}

metrics::counter!(STACK_PROCESSED).increment(1);
whole_loop.label("finished", "true").fin();
Expand Down

0 comments on commit 48ee0e1

Please sign in to comment.