Skip to content

Commit

Permalink
Fix clippy warnings
Browse files Browse the repository at this point in the history
Fix various clippy warnings in both source and tests
  • Loading branch information
fede1024 committed Aug 4, 2024
1 parent 9be7eca commit 1f1a038
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 64 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
components: rustfmt, clippy
- run: cargo fmt -- --check
- run: cargo clippy -- -Dwarnings
- run: cargo clippy --tests -- -Dwarnings
- run: cargo test --doc

check:
Expand Down
2 changes: 1 addition & 1 deletion rdkafka-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn main() {
// Ensure that we are in the right directory
let rdkafkasys_root = Path::new("rdkafka-sys");
if rdkafkasys_root.exists() {
assert!(env::set_current_dir(&rdkafkasys_root).is_ok());
assert!(env::set_current_dir(rdkafkasys_root).is_ok());
}
if !Path::new("librdkafka/LICENSE").exists() {
eprintln!("Setting up submodules");
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ pub trait ConsumerContext: ClientContext + Sized {
/// Pre-rebalance callback. This method will run before the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}

/// Post-rebalance callback. This method will run after the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}

// TODO: convert pointer to structure
/// Post commit callback. This method will run after a group of offsets was
Expand Down
8 changes: 4 additions & 4 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> {
type Headers = BorrowedHeaders;

fn key(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
unsafe { util::ptr_to_opt_slice(self.ptr.key, self.ptr.key_len) }
}

fn payload(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
unsafe { util::ptr_to_opt_slice(self.ptr.payload, self.ptr.len) }
}

unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
util::ptr_to_opt_mut_slice(self.ptr.payload, self.ptr.len)
}

fn topic(&self) -> &str {
unsafe {
CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt))
.to_str()
.expect("Topic name is not valid UTF-8")
}
Expand Down
2 changes: 2 additions & 0 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ where
/// Note that this method will never block.
// Simplifying the return type requires generic associated types, which are
// unstable.
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
Expand Down Expand Up @@ -701,6 +702,7 @@ where
/// See the documentation for [`BaseProducer::send`] for details.
// Simplifying the return type requires generic associated types, which are
// unstable.
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
Expand Down
1 change: 1 addition & 0 deletions src/producer/future_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ where

/// Like [`FutureProducer::send`], but if enqueuing fails, an error will be
/// returned immediately, alongside the [`FutureRecord`] provided.
#[allow(clippy::result_large_err)]
pub fn send_result<'a, K, P>(
&self,
record: FutureRecord<'a, K, P>,
Expand Down
4 changes: 2 additions & 2 deletions src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ mod tests {

assert_eq!(stats.brokers.len(), 1);

let broker = stats.brokers.values().into_iter().collect::<Vec<_>>()[0];
let broker = stats.brokers.values().collect::<Vec<_>>()[0];

assert_eq!(
broker.req,
Expand All @@ -391,7 +391,7 @@ mod tests {
}

// Example from https://github.com/edenhill/librdkafka/wiki/Statistics
const EXAMPLE: &'static str = r#"
const EXAMPLE: &str = r#"
{
"name": "rdkafka#producer-1",
"client_id": "rdkafka",
Expand Down
6 changes: 3 additions & 3 deletions src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl TopicPartitionList {

/// Sets all partitions in the list to the specified offset.
pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
for elem_ptr in slice {
let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
elem.set_offset(offset)?;
Expand All @@ -327,7 +327,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list.
pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
Expand All @@ -337,7 +337,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list that belong to the specified topic.
pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
Expand Down
22 changes: 13 additions & 9 deletions tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn create_consumer_group(consumer_group_name: &str) {
let admin_client = create_admin_client();
let topic_name = &rand_test_topic(consumer_group_name);
let consumer: BaseConsumer = create_config()
.set("group.id", consumer_group_name.clone())
.set("group.id", consumer_group_name)
.create()
.expect("create consumer failed");

Expand Down Expand Up @@ -74,17 +74,19 @@ fn fetch_metadata(topic: &str) -> Metadata {
create_config().create().expect("consumer creation failed");
let timeout = Some(Duration::from_secs(1));

let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(Duration::from_secs(5));
let mut backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(5)),
..Default::default()
};
(|| {
let metadata = consumer
.fetch_metadata(Some(topic), timeout)
.map_err(|e| e.to_string())?;
if metadata.topics().len() == 0 {
if metadata.topics().is_empty() {
Err("metadata fetch returned no topics".to_string())?
}
let topic = &metadata.topics()[0];
if topic.partitions().len() == 0 {
if topic.partitions().is_empty() {
Err("metadata fetch returned a topic with no partitions".to_string())?
}
Ok(metadata)
Expand All @@ -98,16 +100,18 @@ fn verify_delete(topic: &str) {
create_config().create().expect("consumer creation failed");
let timeout = Some(Duration::from_secs(1));

let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(Duration::from_secs(5));
let mut backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(5)),
..Default::default()
};
(|| {
// Asking about the topic specifically will recreate it (under the
// default Kafka configuration, at least) so we have to ask for the list
// of all topics and search through it.
let metadata = consumer
.fetch_metadata(None, timeout)
.map_err(|e| e.to_string())?;
if let Some(_) = metadata.topics().iter().find(|t| t.name() == topic) {
if metadata.topics().iter().any(|t| t.name() == topic) {
Err(format!("topic {} still exists", topic))?
}
Ok(())
Expand Down Expand Up @@ -416,7 +420,7 @@ async fn test_configs() {
}
}

let config = AlterConfig::new(broker).set("log.flush.interval.ms", &orig_val);
let config = AlterConfig::new(broker).set("log.flush.interval.ms", orig_val);
let res = admin_client
.alter_configs(&[config], &opts)
.await
Expand Down
4 changes: 2 additions & 2 deletions tests/test_high_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ async fn test_future_producer_send_full() {

// Fill up the queue.
producer
.send_result(FutureRecord::to(&topic_name).payload("A").key("B"))
.send_result(FutureRecord::to(topic_name).payload("A").key("B"))
.unwrap();

let send_message = |timeout| async move {
let start = Instant::now();
let res = producer
.send(FutureRecord::to(&topic_name).payload("A").key("B"), timeout)
.send(FutureRecord::to(topic_name).payload("A").key("B"), timeout)
.await;
match res {
Ok(_) => panic!("send unexpectedly succeeded"),
Expand Down
10 changes: 5 additions & 5 deletions tests/test_low_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,11 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
let timeout = Duration::from_secs(15);
loop {
let w = wakeups.load(Ordering::SeqCst);
if w == target {
break;
} else if w > target {
panic!("wakeups {} exceeds target {}", w, target);
}
match w.cmp(&target) {
std::cmp::Ordering::Equal => break,
std::cmp::Ordering::Greater => panic!("wakeups {} exceeds target {}", w, target),
std::cmp::Ordering::Less => (),
};
thread::sleep(Duration::from_millis(100));
if start.elapsed() > timeout {
panic!("timeout exceeded while waiting for wakeup");
Expand Down
22 changes: 12 additions & 10 deletions tests/test_low_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<Part: Partitioner + Send + Sync> ProducerContext<Part> for CollectingContex
fn get_custom_partitioner(&self) -> Option<&Part> {
match &self.partitioner {
None => None,
Some(p) => Some(&p),
Some(p) => Some(p),
}
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ impl Partitioner for PanicPartitioner {
fn default_config(config_overrides: HashMap<&str, &str>) -> ClientConfig {
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", &get_bootstrap_server())
.set("bootstrap.servers", get_bootstrap_server())
.set("message.timeout.ms", "5000");

for (key, value) in config_overrides {
Expand Down Expand Up @@ -210,11 +210,13 @@ fn test_base_producer_queue_full() {
let errors = results
.iter()
.filter(|&e| {
if let &Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) = e {
true
} else {
false
}
matches!(
e,
&Err((
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
_
))
)
})
.count();

Expand Down Expand Up @@ -496,7 +498,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {

assert_eq!(delivery_results.len(), 1);

for &(_, ref error, _) in &(*delivery_results) {
for (_, error, _) in &(*delivery_results) {
assert_eq!(*error, None);
}
}
Expand All @@ -523,7 +525,7 @@ fn test_custom_partitioner_base_producer() {

let delivery_results = context.results.lock().unwrap();

for &(ref message, ref error, _) in &(*delivery_results) {
for (message, error, _) in &(*delivery_results) {
assert_eq!(error, &None);
assert_eq!(message.partition(), 2);
}
Expand Down Expand Up @@ -551,7 +553,7 @@ fn test_custom_partitioner_threaded_producer() {

let delivery_results = context.results.lock().unwrap();

for &(ref message, ref error, _) in &(*delivery_results) {
for (message, error, _) in &(*delivery_results) {
assert_eq!(error, &None);
assert_eq!(message.partition(), 2);
}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ fn create_consumer(
fn create_producer() -> Result<BaseProducer, KafkaError> {
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", &get_bootstrap_server())
.set("bootstrap.servers", get_bootstrap_server())
.set("message.timeout.ms", "5000")
.set("enable.idempotence", "true")
.set("transactional.id", &rand_test_transactional_id())
.set("transactional.id", rand_test_transactional_id())
.set("debug", "eos");
config.set_log_level(RDKafkaLogLevel::Debug);
config.create()
Expand Down
46 changes: 22 additions & 24 deletions tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ pub fn get_broker_version() -> KafkaVersion {
panic!("KAFKA_VERSION env var contained non-unicode characters")
}
// If the environment variable is unset, assume we're running the latest version.
Err(VarError::NotPresent) => {
KafkaVersion(std::u32::MAX, std::u32::MAX, std::u32::MAX, std::u32::MAX)
}
Err(VarError::NotPresent) => KafkaVersion(u32::MAX, u32::MAX, u32::MAX, u32::MAX),
}
}

Expand Down Expand Up @@ -164,27 +162,6 @@ pub fn key_fn(id: i32) -> String {
format!("Key {}", id)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_populate_topic() {
let topic_name = rand_test_topic("test_populate_topic");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;

let total_messages = message_map
.iter()
.filter(|&(&(partition, _), _)| partition == 0)
.count();
assert_eq!(total_messages, 100);

let mut ids = message_map.iter().map(|(_, id)| *id).collect::<Vec<_>>();
ids.sort();
assert_eq!(ids, (0..100).collect::<Vec<_>>());
}
}

pub struct ConsumerTestContext {
pub _n: i64, // Add data for memory access validation
}
Expand Down Expand Up @@ -228,3 +205,24 @@ pub fn consumer_config(

config
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_populate_topic() {
let topic_name = rand_test_topic("test_populate_topic");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;

let total_messages = message_map
.iter()
.filter(|&(&(partition, _), _)| partition == 0)
.count();
assert_eq!(total_messages, 100);

let mut ids = message_map.values().copied().collect::<Vec<_>>();
ids.sort();
assert_eq!(ids, (0..100).collect::<Vec<_>>());
}
}

0 comments on commit 1f1a038

Please sign in to comment.