diff --git a/src/client.rs b/src/client.rs index 08aa5b9dd..231bcf908 100644 --- a/src/client.rs +++ b/src/client.rs @@ -254,6 +254,7 @@ impl Client { Arc::as_ptr(&context) as *mut c_void, ) }; + native_config.set("log.queue", "true")?; let client_ptr = unsafe { let native_config = ManuallyDrop::new(native_config); @@ -270,6 +271,12 @@ impl Client { return Err(KafkaError::ClientCreation(err_buf.to_string())); } + let ret = unsafe { + rdsys::rd_kafka_set_log_queue(client_ptr, rdsys::rd_kafka_queue_get_main(client_ptr)) + }; + if ret.is_error() { + return Err(KafkaError::Global(ret.into())); + } unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) }; Ok(Client { diff --git a/src/config.rs b/src/config.rs index fc116d2a7..1f5e34247 100644 --- a/src/config.rs +++ b/src/config.rs @@ -154,6 +154,30 @@ impl NativeClientConfig { .trim_matches(char::from(0)) .to_string()) } + + pub(crate) fn set(&self, key: &str, value: &str) -> KafkaResult<()> { + let mut err_buf = ErrBuf::new(); + let key_c = CString::new(key)?; + let value_c = CString::new(value)?; + let ret = unsafe { + rdsys::rd_kafka_conf_set( + self.ptr(), + key_c.as_ptr(), + value_c.as_ptr(), + err_buf.as_mut_ptr(), + err_buf.capacity(), + ) + }; + if ret.is_error() { + return Err(KafkaError::ClientConfig( + ret, + err_buf.to_string(), + key.to_string(), + value.to_string(), + )); + } + Ok(()) + } } /// Client configuration. @@ -227,27 +251,8 @@ impl ClientConfig { /// Builds a native librdkafka configuration. pub fn create_native_config(&self) -> KafkaResult { let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) }; - let mut err_buf = ErrBuf::new(); for (key, value) in &self.conf_map { - let key_c = CString::new(key.to_string())?; - let value_c = CString::new(value.to_string())?; - let ret = unsafe { - rdsys::rd_kafka_conf_set( - conf.ptr(), - key_c.as_ptr(), - value_c.as_ptr(), - err_buf.as_mut_ptr(), - err_buf.capacity(), - ) - }; - if ret.is_error() { - return Err(KafkaError::ClientConfig( - ret, - err_buf.to_string(), - key.to_string(), - value.to_string(), - )); - } + conf.set(key, value)?; } Ok(conf) }