diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8d388f2e2..a18f057f9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,7 @@ jobs: components: rustfmt, clippy - run: cargo fmt -- --check - run: cargo clippy -- -Dwarnings + - run: cargo clippy --tests -- -Dwarnings - run: cargo test --doc check: diff --git a/rdkafka-sys/build.rs b/rdkafka-sys/build.rs index d615744f3..b7c3e42ca 100644 --- a/rdkafka-sys/build.rs +++ b/rdkafka-sys/build.rs @@ -77,7 +77,7 @@ fn main() { // Ensure that we are in the right directory let rdkafkasys_root = Path::new("rdkafka-sys"); if rdkafkasys_root.exists() { - assert!(env::set_current_dir(&rdkafkasys_root).is_ok()); + assert!(env::set_current_dir(rdkafkasys_root).is_ok()); } if !Path::new("librdkafka/LICENSE").exists() { eprintln!("Setting up submodules"); diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 95e91ffb0..5ce8b05b1 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -100,12 +100,12 @@ pub trait ConsumerContext: ClientContext + Sized { /// Pre-rebalance callback. This method will run before the rebalance and /// should terminate its execution quickly. #[allow(unused_variables)] - fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'a>) {} + fn pre_rebalance(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'_>) {} /// Post-rebalance callback. This method will run after the rebalance and /// should terminate its execution quickly. #[allow(unused_variables)] - fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'a>) {} + fn post_rebalance(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'_>) {} // TODO: convert pointer to structure /// Post commit callback. This method will run after a group of offsets was diff --git a/src/message.rs b/src/message.rs index 76bac9c39..7a422608e 100644 --- a/src/message.rs +++ b/src/message.rs @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> { type Headers = BorrowedHeaders; fn key(&self) -> Option<&[u8]> { - unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) } + unsafe { util::ptr_to_opt_slice(self.ptr.key, self.ptr.key_len) } } fn payload(&self) -> Option<&[u8]> { - unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) } + unsafe { util::ptr_to_opt_slice(self.ptr.payload, self.ptr.len) } } unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> { - util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len) + util::ptr_to_opt_mut_slice(self.ptr.payload, self.ptr.len) } fn topic(&self) -> &str { unsafe { - CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt)) + CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt)) .to_str() .expect("Topic name is not valid UTF-8") } diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 1cc6e05ce..7623869f2 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -425,6 +425,7 @@ where /// Note that this method will never block. // Simplifying the return type requires generic associated types, which are // unstable. + #[allow(clippy::result_large_err)] pub fn send<'a, K, P>( &self, mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>, @@ -701,6 +702,7 @@ where /// See the documentation for [`BaseProducer::send`] for details. // Simplifying the return type requires generic associated types, which are // unstable. + #[allow(clippy::result_large_err)] pub fn send<'a, K, P>( &self, record: BaseRecord<'a, K, P, C::DeliveryOpaque>, diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index 0769a16a8..baae2cc15 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -346,6 +346,7 @@ where /// Like [`FutureProducer::send`], but if enqueuing fails, an error will be /// returned immediately, alongside the [`FutureRecord`] provided. + #[allow(clippy::result_large_err)] pub fn send_result<'a, K, P>( &self, record: FutureRecord<'a, K, P>, diff --git a/src/statistics.rs b/src/statistics.rs index 8c3fc4c45..6496aa09b 100644 --- a/src/statistics.rs +++ b/src/statistics.rs @@ -367,7 +367,7 @@ mod tests { assert_eq!(stats.brokers.len(), 1); - let broker = stats.brokers.values().into_iter().collect::>()[0]; + let broker = stats.brokers.values().collect::>()[0]; assert_eq!( broker.req, @@ -391,7 +391,7 @@ mod tests { } // Example from https://github.com/edenhill/librdkafka/wiki/Statistics - const EXAMPLE: &'static str = r#" + const EXAMPLE: &str = r#" { "name": "rdkafka#producer-1", "client_id": "rdkafka", diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs index 1d8e77ce9..16063bfbc 100644 --- a/src/topic_partition_list.rs +++ b/src/topic_partition_list.rs @@ -317,7 +317,7 @@ impl TopicPartitionList { /// Sets all partitions in the list to the specified offset. pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) }; for elem_ptr in slice { let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr); elem.set_offset(offset)?; @@ -327,7 +327,7 @@ impl TopicPartitionList { /// Returns all the elements of the list. pub fn elements(&self) -> Vec> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) }; let mut vec = Vec::with_capacity(slice.len()); for elem_ptr in slice { vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr)); @@ -337,7 +337,7 @@ impl TopicPartitionList { /// Returns all the elements of the list that belong to the specified topic. pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) }; let mut vec = Vec::with_capacity(slice.len()); for elem_ptr in slice { let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr); diff --git a/tests/test_admin.rs b/tests/test_admin.rs index dc7177708..2e6034cb2 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -34,7 +34,7 @@ async fn create_consumer_group(consumer_group_name: &str) { let admin_client = create_admin_client(); let topic_name = &rand_test_topic(consumer_group_name); let consumer: BaseConsumer = create_config() - .set("group.id", consumer_group_name.clone()) + .set("group.id", consumer_group_name) .create() .expect("create consumer failed"); @@ -74,17 +74,19 @@ fn fetch_metadata(topic: &str) -> Metadata { create_config().create().expect("consumer creation failed"); let timeout = Some(Duration::from_secs(1)); - let mut backoff = ExponentialBackoff::default(); - backoff.max_elapsed_time = Some(Duration::from_secs(5)); + let mut backoff = ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(5)), + ..Default::default() + }; (|| { let metadata = consumer .fetch_metadata(Some(topic), timeout) .map_err(|e| e.to_string())?; - if metadata.topics().len() == 0 { + if metadata.topics().is_empty() { Err("metadata fetch returned no topics".to_string())? } let topic = &metadata.topics()[0]; - if topic.partitions().len() == 0 { + if topic.partitions().is_empty() { Err("metadata fetch returned a topic with no partitions".to_string())? } Ok(metadata) @@ -98,8 +100,10 @@ fn verify_delete(topic: &str) { create_config().create().expect("consumer creation failed"); let timeout = Some(Duration::from_secs(1)); - let mut backoff = ExponentialBackoff::default(); - backoff.max_elapsed_time = Some(Duration::from_secs(5)); + let mut backoff = ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(5)), + ..Default::default() + }; (|| { // Asking about the topic specifically will recreate it (under the // default Kafka configuration, at least) so we have to ask for the list @@ -107,7 +111,7 @@ fn verify_delete(topic: &str) { let metadata = consumer .fetch_metadata(None, timeout) .map_err(|e| e.to_string())?; - if let Some(_) = metadata.topics().iter().find(|t| t.name() == topic) { + if metadata.topics().iter().any(|t| t.name() == topic) { Err(format!("topic {} still exists", topic))? } Ok(()) @@ -416,7 +420,7 @@ async fn test_configs() { } } - let config = AlterConfig::new(broker).set("log.flush.interval.ms", &orig_val); + let config = AlterConfig::new(broker).set("log.flush.interval.ms", orig_val); let res = admin_client .alter_configs(&[config], &opts) .await diff --git a/tests/test_high_producers.rs b/tests/test_high_producers.rs index bddc1beae..72b73919e 100644 --- a/tests/test_high_producers.rs +++ b/tests/test_high_producers.rs @@ -64,13 +64,13 @@ async fn test_future_producer_send_full() { // Fill up the queue. producer - .send_result(FutureRecord::to(&topic_name).payload("A").key("B")) + .send_result(FutureRecord::to(topic_name).payload("A").key("B")) .unwrap(); let send_message = |timeout| async move { let start = Instant::now(); let res = producer - .send(FutureRecord::to(&topic_name).payload("A").key("B"), timeout) + .send(FutureRecord::to(topic_name).payload("A").key("B"), timeout) .await; match res { Ok(_) => panic!("send unexpectedly succeeded"), diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index e6642b688..3b4cb19e8 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -385,11 +385,11 @@ async fn test_produce_consume_message_queue_nonempty_callback() { let timeout = Duration::from_secs(15); loop { let w = wakeups.load(Ordering::SeqCst); - if w == target { - break; - } else if w > target { - panic!("wakeups {} exceeds target {}", w, target); - } + match w.cmp(&target) { + std::cmp::Ordering::Equal => break, + std::cmp::Ordering::Greater => panic!("wakeups {} exceeds target {}", w, target), + std::cmp::Ordering::Less => (), + }; thread::sleep(Duration::from_millis(100)); if start.elapsed() > timeout { panic!("timeout exceeded while waiting for wakeup"); diff --git a/tests/test_low_producers.rs b/tests/test_low_producers.rs index 493642617..2a7c9f7a6 100644 --- a/tests/test_low_producers.rs +++ b/tests/test_low_producers.rs @@ -97,7 +97,7 @@ impl ProducerContext for CollectingContex fn get_custom_partitioner(&self) -> Option<&Part> { match &self.partitioner { None => None, - Some(p) => Some(&p), + Some(p) => Some(p), } } } @@ -144,7 +144,7 @@ impl Partitioner for PanicPartitioner { fn default_config(config_overrides: HashMap<&str, &str>) -> ClientConfig { let mut config = ClientConfig::new(); config - .set("bootstrap.servers", &get_bootstrap_server()) + .set("bootstrap.servers", get_bootstrap_server()) .set("message.timeout.ms", "5000"); for (key, value) in config_overrides { @@ -210,11 +210,13 @@ fn test_base_producer_queue_full() { let errors = results .iter() .filter(|&e| { - if let &Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) = e { - true - } else { - false - } + matches!( + e, + &Err(( + KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), + _ + )) + ) }) .count(); @@ -496,7 +498,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() { assert_eq!(delivery_results.len(), 1); - for &(_, ref error, _) in &(*delivery_results) { + for (_, error, _) in &(*delivery_results) { assert_eq!(*error, None); } } @@ -523,7 +525,7 @@ fn test_custom_partitioner_base_producer() { let delivery_results = context.results.lock().unwrap(); - for &(ref message, ref error, _) in &(*delivery_results) { + for (message, error, _) in &(*delivery_results) { assert_eq!(error, &None); assert_eq!(message.partition(), 2); } @@ -551,7 +553,7 @@ fn test_custom_partitioner_threaded_producer() { let delivery_results = context.results.lock().unwrap(); - for &(ref message, ref error, _) in &(*delivery_results) { + for (message, error, _) in &(*delivery_results) { assert_eq!(error, &None); assert_eq!(message.partition(), 2); } diff --git a/tests/test_transactions.rs b/tests/test_transactions.rs index 1fe84a98b..91e8c835d 100644 --- a/tests/test_transactions.rs +++ b/tests/test_transactions.rs @@ -26,10 +26,10 @@ fn create_consumer( fn create_producer() -> Result { let mut config = ClientConfig::new(); config - .set("bootstrap.servers", &get_bootstrap_server()) + .set("bootstrap.servers", get_bootstrap_server()) .set("message.timeout.ms", "5000") .set("enable.idempotence", "true") - .set("transactional.id", &rand_test_transactional_id()) + .set("transactional.id", rand_test_transactional_id()) .set("debug", "eos"); config.set_log_level(RDKafkaLogLevel::Debug); config.create() diff --git a/tests/utils.rs b/tests/utils.rs index 447213672..3a8e0137e 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -67,9 +67,7 @@ pub fn get_broker_version() -> KafkaVersion { panic!("KAFKA_VERSION env var contained non-unicode characters") } // If the environment variable is unset, assume we're running the latest version. - Err(VarError::NotPresent) => { - KafkaVersion(std::u32::MAX, std::u32::MAX, std::u32::MAX, std::u32::MAX) - } + Err(VarError::NotPresent) => KafkaVersion(u32::MAX, u32::MAX, u32::MAX, u32::MAX), } } @@ -164,27 +162,6 @@ pub fn key_fn(id: i32) -> String { format!("Key {}", id) } -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_populate_topic() { - let topic_name = rand_test_topic("test_populate_topic"); - let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await; - - let total_messages = message_map - .iter() - .filter(|&(&(partition, _), _)| partition == 0) - .count(); - assert_eq!(total_messages, 100); - - let mut ids = message_map.iter().map(|(_, id)| *id).collect::>(); - ids.sort(); - assert_eq!(ids, (0..100).collect::>()); - } -} - pub struct ConsumerTestContext { pub _n: i64, // Add data for memory access validation } @@ -228,3 +205,24 @@ pub fn consumer_config( config } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_populate_topic() { + let topic_name = rand_test_topic("test_populate_topic"); + let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await; + + let total_messages = message_map + .iter() + .filter(|&(&(partition, _), _)| partition == 0) + .count(); + assert_eq!(total_messages, 100); + + let mut ids = message_map.values().copied().collect::>(); + ids.sort(); + assert_eq!(ids, (0..100).collect::>()); + } +}