diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 56c99406d..6ec23af48 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -629,6 +629,16 @@ async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::cluster_test_suite(&connection_builder).await; + #[allow(irrefutable_let_patterns)] + if let KafkaDriver::Java = driver { + // new consumer group protocol is only on java driver + test_cases::produce_consume_partitions_new_consumer_group_protocol( + &connection_builder, + "partitions3_new_consumer_group_protocol", + ) + .await; + } + for shotover in shotovers { tokio::time::timeout( Duration::from_secs(10), diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index b4fc70d5b..a89f5d4bd 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -3,11 +3,12 @@ use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, - ConsumerGroupDescription, DescribeReplicaLogDirInfo, ExpectedResponse, IsolationLevel, - KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, - ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, - OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, - TopicPartition, TopicPartitionReplica, TransactionDescription, + ConsumerGroupDescription, ConsumerProtocol, DescribeReplicaLogDirInfo, ExpectedResponse, + IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, + KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, + OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicPartition, TopicPartitionReplica, + TransactionDescription, }, docker_compose::DockerCompose, }; @@ -47,6 +48,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { num_partitions: 3, replication_factor: 1, }, + NewTopic { + name: "partitions3_new_consumer_group_protocol", + num_partitions: 3, + replication_factor: 1, + }, NewTopic { name: "acks0", num_partitions: 1, @@ -1091,6 +1097,64 @@ pub async fn produce_consume_partitions3( } } +/// The new consumer protocol must be specifically enabled in the broker config. +/// We only do this for the kafka 3.9 docker-compose.yaml so this test case is +/// manually called for that test and not included in the standard test suite. +pub async fn produce_consume_partitions_new_consumer_group_protocol( + connection_builder: &KafkaConnectionBuilder, + topic_name: &str, +) { + let producer = connection_builder.connect_producer("1", 0).await; + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) + .with_group("some_group") + .with_protocol(ConsumerProtocol::Consumer), + ) + .await; + + for _ in 0..5 { + producer + .assert_produce( + Record { + payload: "Message1", + topic_name, + key: Some("Key".into()), + }, + // We cant predict the offsets since that will depend on which partition the keyless record ends up in + None, + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name, + key: None, + }, + None, + ) + .await; + + consumer + .assert_consume_in_any_order(vec![ + ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: None, + }, + ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: None, + }, + ]) + .await; + } +} + pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConnectionBuilder) { let producer = connection_builder.connect_producer("1", 0).await; let mut consumer = connection_builder diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml index 430d8e1c0..317e247bd 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml @@ -36,6 +36,8 @@ services: # # However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run. KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + + KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "consumer, classic" volumes: &volumes - type: tmpfs target: /bitnami/kafka diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 9d29fa88b..8ffa86493 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -32,17 +32,17 @@ use kafka_protocol::messages::produce_response::{ }; use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, - BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse, - DescribeLogDirsResponse, DescribeProducersRequest, DescribeProducersResponse, - DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest, - FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, - InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, - ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, - MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, - OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, - SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, - TopicName, TransactionalId, TxnOffsetCommitRequest, + BrokerId, ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse, + DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, + DescribeGroupsResponse, DescribeLogDirsResponse, DescribeProducersRequest, + DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse, + EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, + GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, + ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, + MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, + OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, + RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, + SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -736,6 +736,10 @@ impl KafkaSinkCluster { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Heartbeat(HeartbeatRequest { group_id, .. }) + | RequestBody::ConsumerGroupHeartbeat(ConsumerGroupHeartbeatRequest { + group_id, + .. + }) | RequestBody::SyncGroup(SyncGroupRequest { group_id, .. }) | RequestBody::JoinGroup(JoinGroupRequest { group_id, .. }) | RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. }) @@ -955,6 +959,13 @@ impl KafkaSinkCluster { let group_id = heartbeat.group_id.clone(); self.route_to_group_coordinator(request, group_id); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ConsumerGroupHeartbeat(heartbeat), + .. + })) => { + let group_id = heartbeat.group_id.clone(); + self.route_to_group_coordinator(request, group_id); + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::SyncGroup(sync_group), .. @@ -2999,6 +3010,10 @@ The connection to the client has been closed." body: ResponseBody::Heartbeat(heartbeat), .. })) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ConsumerGroupHeartbeat(heartbeat), + .. + })) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code), Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::SyncGroup(sync_group), .. diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 718fd1399..06c6fe6ac 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -3,8 +3,8 @@ use std::collections::{HashMap, HashSet}; pub use rdkafka; use super::{ - ConsumerConfig, ExpectedResponse, NewPartition, OffsetAndMetadata, ProduceResult, Record, - TopicPartition, + ConsumerConfig, ConsumerProtocol, ExpectedResponse, NewPartition, OffsetAndMetadata, + ProduceResult, Record, TopicPartition, }; use anyhow::Result; use pretty_assertions::assert_eq; @@ -96,6 +96,10 @@ impl KafkaConnectionBuilderCpp { .create() .unwrap(); + if let ConsumerProtocol::Consumer = config.protocol { + panic!("New consumer protocol not support by rdkafka driver"); + } + let topic_names: Vec<&str> = config.topic_names.iter().map(|x| x.as_str()).collect(); consumer.subscribe(&topic_names).unwrap(); KafkaConsumerCpp { consumer } diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 7a9460905..52ed88554 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -173,6 +173,14 @@ impl KafkaConnectionBuilderJava { "org.apache.kafka.common.serialization.StringDeserializer".to_owned(), ); + config.insert( + "group.protocol".to_owned(), + match consumer_config.protocol { + super::ConsumerProtocol::Classic => "CLASSIC".to_owned(), + super::ConsumerProtocol::Consumer => "CONSUMER".to_owned(), + }, + ); + let consumer = self.jvm.construct( "org.apache.kafka.clients.consumer.KafkaConsumer", vec![properties(&self.jvm, &config)], diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index e901433b0..bbfa5562e 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -736,6 +736,7 @@ pub struct ConsumerConfig { fetch_min_bytes: i32, fetch_max_wait_ms: i32, isolation_level: IsolationLevel, + protocol: ConsumerProtocol, } impl ConsumerConfig { @@ -746,6 +747,7 @@ impl ConsumerConfig { fetch_min_bytes: 1, fetch_max_wait_ms: 500, isolation_level: IsolationLevel::ReadUncommitted, + protocol: ConsumerProtocol::Classic, } } @@ -768,6 +770,11 @@ impl ConsumerConfig { self.isolation_level = isolation_level; self } + + pub fn with_protocol(mut self, protocol: ConsumerProtocol) -> Self { + self.protocol = protocol; + self + } } pub enum IsolationLevel { @@ -784,6 +791,11 @@ impl IsolationLevel { } } +pub enum ConsumerProtocol { + Classic, + Consumer, +} + #[derive(PartialEq, Debug)] pub struct ConsumerGroupDescription { pub is_simple_consumer: bool,