Skip to content

Commit

Permalink
Merge pull request #655 from trtt/fix/logs
Browse files Browse the repository at this point in the history
fix: forward log messages
  • Loading branch information
davidblewett authored Oct 8, 2024
2 parents 11790be + 36dc43c commit b2bcf19
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
7 changes: 7 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ impl<C: ClientContext> Client<C> {
Arc::as_ptr(&context) as *mut c_void,
)
};
native_config.set("log.queue", "true")?;

let client_ptr = unsafe {
let native_config = ManuallyDrop::new(native_config);
Expand All @@ -270,6 +271,12 @@ impl<C: ClientContext> Client<C> {
return Err(KafkaError::ClientCreation(err_buf.to_string()));
}

let ret = unsafe {
rdsys::rd_kafka_set_log_queue(client_ptr, rdsys::rd_kafka_queue_get_main(client_ptr))
};
if ret.is_error() {
return Err(KafkaError::Global(ret.into()));
}
unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };

Ok(Client {
Expand Down
45 changes: 25 additions & 20 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,30 @@ impl NativeClientConfig {
.trim_matches(char::from(0))
.to_string())
}

pub(crate) fn set(&self, key: &str, value: &str) -> KafkaResult<()> {
let mut err_buf = ErrBuf::new();
let key_c = CString::new(key)?;
let value_c = CString::new(value)?;
let ret = unsafe {
rdsys::rd_kafka_conf_set(
self.ptr(),
key_c.as_ptr(),
value_c.as_ptr(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
if ret.is_error() {
return Err(KafkaError::ClientConfig(
ret,
err_buf.to_string(),
key.to_string(),
value.to_string(),
));
}
Ok(())
}
}

/// Client configuration.
Expand Down Expand Up @@ -227,27 +251,8 @@ impl ClientConfig {
/// Builds a native librdkafka configuration.
pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
let mut err_buf = ErrBuf::new();
for (key, value) in &self.conf_map {
let key_c = CString::new(key.to_string())?;
let value_c = CString::new(value.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_conf_set(
conf.ptr(),
key_c.as_ptr(),
value_c.as_ptr(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
if ret.is_error() {
return Err(KafkaError::ClientConfig(
ret,
err_buf.to_string(),
key.to_string(),
value.to_string(),
));
}
conf.set(key, value)?;
}
Ok(conf)
}
Expand Down

0 comments on commit b2bcf19

Please sign in to comment.