Skip to content

Commit

Permalink
KafkaSinkCluster: Support new consumer protocol (#1825)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 21, 2024
1 parent 229c9ce commit 6d73da1
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 18 deletions.
10 changes: 10 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,16 @@ async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) {
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

#[allow(irrefutable_let_patterns)]
if let KafkaDriver::Java = driver {
// new consumer group protocol is only on java driver
test_cases::produce_consume_partitions_new_consumer_group_protocol(
&connection_builder,
"partitions3_new_consumer_group_protocol",
)
.await;
}

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
Expand Down
74 changes: 69 additions & 5 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ConsumerGroupDescription, DescribeReplicaLogDirInfo, ExpectedResponse, IsolationLevel,
KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicPartition, TopicPartitionReplica, TransactionDescription,
ConsumerGroupDescription, ConsumerProtocol, DescribeReplicaLogDirInfo, ExpectedResponse,
IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver,
KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition, TopicPartitionReplica,
TransactionDescription,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -47,6 +48,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_new_consumer_group_protocol",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "acks0",
num_partitions: 1,
Expand Down Expand Up @@ -1091,6 +1097,64 @@ pub async fn produce_consume_partitions3(
}
}

/// The new consumer protocol must be specifically enabled in the broker config.
/// We only do this for the kafka 3.9 docker-compose.yaml so this test case is
/// manually called for that test and not included in the standard test suite.
pub async fn produce_consume_partitions_new_consumer_group_protocol(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("some_group")
.with_protocol(ConsumerProtocol::Consumer),
)
.await;

for _ in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
None,
)
.await;

consumer
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: None,
},
ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: None,
},
])
.await;
}
}

pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ services:
#
# However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run.
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"

KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "consumer, classic"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
Expand Down
37 changes: 26 additions & 11 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ use kafka_protocol::messages::produce_response::{
};
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse,
DescribeLogDirsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
BrokerId, ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse,
DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest,
DescribeGroupsResponse, DescribeLogDirsResponse, DescribeProducersRequest,
DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -736,6 +736,10 @@ impl KafkaSinkCluster {
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::Heartbeat(HeartbeatRequest { group_id, .. })
| RequestBody::ConsumerGroupHeartbeat(ConsumerGroupHeartbeatRequest {
group_id,
..
})
| RequestBody::SyncGroup(SyncGroupRequest { group_id, .. })
| RequestBody::JoinGroup(JoinGroupRequest { group_id, .. })
| RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. })
Expand Down Expand Up @@ -955,6 +959,13 @@ impl KafkaSinkCluster {
let group_id = heartbeat.group_id.clone();
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupHeartbeat(heartbeat),
..
})) => {
let group_id = heartbeat.group_id.clone();
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SyncGroup(sync_group),
..
Expand Down Expand Up @@ -2999,6 +3010,10 @@ The connection to the client has been closed."
body: ResponseBody::Heartbeat(heartbeat),
..
})) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ConsumerGroupHeartbeat(heartbeat),
..
})) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::SyncGroup(sync_group),
..
Expand Down
8 changes: 6 additions & 2 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::collections::{HashMap, HashSet};
pub use rdkafka;

use super::{
ConsumerConfig, ExpectedResponse, NewPartition, OffsetAndMetadata, ProduceResult, Record,
TopicPartition,
ConsumerConfig, ConsumerProtocol, ExpectedResponse, NewPartition, OffsetAndMetadata,
ProduceResult, Record, TopicPartition,
};
use anyhow::Result;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -96,6 +96,10 @@ impl KafkaConnectionBuilderCpp {
.create()
.unwrap();

if let ConsumerProtocol::Consumer = config.protocol {
panic!("New consumer protocol not support by rdkafka driver");
}

let topic_names: Vec<&str> = config.topic_names.iter().map(|x| x.as_str()).collect();
consumer.subscribe(&topic_names).unwrap();
KafkaConsumerCpp { consumer }
Expand Down
8 changes: 8 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ impl KafkaConnectionBuilderJava {
"org.apache.kafka.common.serialization.StringDeserializer".to_owned(),
);

config.insert(
"group.protocol".to_owned(),
match consumer_config.protocol {
super::ConsumerProtocol::Classic => "CLASSIC".to_owned(),
super::ConsumerProtocol::Consumer => "CONSUMER".to_owned(),
},
);

let consumer = self.jvm.construct(
"org.apache.kafka.clients.consumer.KafkaConsumer",
vec![properties(&self.jvm, &config)],
Expand Down
12 changes: 12 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ pub struct ConsumerConfig {
fetch_min_bytes: i32,
fetch_max_wait_ms: i32,
isolation_level: IsolationLevel,
protocol: ConsumerProtocol,
}

impl ConsumerConfig {
Expand All @@ -746,6 +747,7 @@ impl ConsumerConfig {
fetch_min_bytes: 1,
fetch_max_wait_ms: 500,
isolation_level: IsolationLevel::ReadUncommitted,
protocol: ConsumerProtocol::Classic,
}
}

Expand All @@ -768,6 +770,11 @@ impl ConsumerConfig {
self.isolation_level = isolation_level;
self
}

pub fn with_protocol(mut self, protocol: ConsumerProtocol) -> Self {
self.protocol = protocol;
self
}
}

pub enum IsolationLevel {
Expand All @@ -784,6 +791,11 @@ impl IsolationLevel {
}
}

pub enum ConsumerProtocol {
Classic,
Consumer,
}

#[derive(PartialEq, Debug)]
pub struct ConsumerGroupDescription {
pub is_simple_consumer: bool,
Expand Down

0 comments on commit 6d73da1

Please sign in to comment.