From 36dc43ce7a7c563dec84eca5ad1b6aab8c07c873 Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Wed, 14 Feb 2024 19:33:10 +0400 Subject: [PATCH] Enqueue log messages --- src/client.rs | 7 +++++++ src/config.rs | 45 +++++++++++++++++++++++++-------------------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/src/client.rs b/src/client.rs index 1b9f6bd1c..4d8e6b561 100644 --- a/src/client.rs +++ b/src/client.rs @@ -239,6 +239,7 @@ impl Client { Arc::as_ptr(&context) as *mut c_void, ) }; + native_config.set("log.queue", "true")?; let client_ptr = unsafe { let native_config = ManuallyDrop::new(native_config); @@ -255,6 +256,12 @@ impl Client { return Err(KafkaError::ClientCreation(err_buf.to_string())); } + let ret = unsafe { + rdsys::rd_kafka_set_log_queue(client_ptr, rdsys::rd_kafka_queue_get_main(client_ptr)) + }; + if ret.is_error() { + return Err(KafkaError::Global(ret.into())); + } unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) }; Ok(Client { diff --git a/src/config.rs b/src/config.rs index da4885a78..4b302ac0e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -155,6 +155,30 @@ impl NativeClientConfig { .to_string_lossy() .into()) } + + pub(crate) fn set(&self, key: &str, value: &str) -> KafkaResult<()> { + let mut err_buf = ErrBuf::new(); + let key_c = CString::new(key)?; + let value_c = CString::new(value)?; + let ret = unsafe { + rdsys::rd_kafka_conf_set( + self.ptr(), + key_c.as_ptr(), + value_c.as_ptr(), + err_buf.as_mut_ptr(), + err_buf.capacity(), + ) + }; + if ret.is_error() { + return Err(KafkaError::ClientConfig( + ret, + err_buf.to_string(), + key.to_string(), + value.to_string(), + )); + } + Ok(()) + } } /// Client configuration. @@ -228,27 +252,8 @@ impl ClientConfig { /// Builds a native librdkafka configuration. pub fn create_native_config(&self) -> KafkaResult { let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) }; - let mut err_buf = ErrBuf::new(); for (key, value) in &self.conf_map { - let key_c = CString::new(key.to_string())?; - let value_c = CString::new(value.to_string())?; - let ret = unsafe { - rdsys::rd_kafka_conf_set( - conf.ptr(), - key_c.as_ptr(), - value_c.as_ptr(), - err_buf.as_mut_ptr(), - err_buf.capacity(), - ) - }; - if ret.is_error() { - return Err(KafkaError::ClientConfig( - ret, - err_buf.to_string(), - key.to_string(), - value.to_string(), - )); - } + conf.set(key, value)?; } Ok(conf) }