From 8045ecd76f8e5d680fff413a3e97b9ec2216a3fa Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 18 Nov 2024 13:53:22 +1100 Subject: [PATCH] Support kafka java driver 3.8 (#1819) --- .../src/transforms/kafka/sink_cluster/mod.rs | 107 ++++++++++++++---- test-helpers/src/connection/kafka/java.rs | 2 +- 2 files changed, 83 insertions(+), 26 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 1bfed727a..c684b1e39 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -16,6 +16,7 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction; use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic; use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult; +use kafka_protocol::messages::describe_cluster_response::DescribeClusterBroker; use kafka_protocol::messages::describe_producers_request::TopicRequest; use kafka_protocol::messages::describe_producers_response::TopicResponse; use kafka_protocol::messages::fetch_request::FetchTopic; @@ -32,15 +33,16 @@ use kafka_protocol::messages::produce_response::{ use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, DescribeGroupsRequest, DescribeGroupsResponse, DescribeProducersRequest, - DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse, - EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, - GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, - ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, - MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, - OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, - RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, - SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse, + DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest, + DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse, + FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, + ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, + MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, + OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, + SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, + TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -1131,6 +1133,7 @@ The connection to the client has been closed." Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Metadata(_) + | RequestBody::DescribeCluster(_) | RequestBody::DescribeConfigs(_) | RequestBody::AlterConfigs(_) | RequestBody::CreatePartitions(_) @@ -3066,14 +3069,13 @@ The connection to the client has been closed." response.invalidate_cache(); } Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::DescribeCluster(_), + body: ResponseBody::DescribeCluster(describe_cluster), .. })) => { - // If clients were to send this we would need to rewrite the broker information. - // However I dont think clients actually send this, so just error to ensure we dont break invariants. - return Err(anyhow!( - "I think this is a raft specific message and never sent by clients" - )); + self.process_describe_cluster_response(describe_cluster) + .await; + self.rewrite_describe_cluster_response(describe_cluster)?; + response.invalidate_cache(); } _ => {} } @@ -3387,6 +3389,26 @@ The connection to the client has been closed." } } + async fn process_describe_cluster_response( + &mut self, + describe_cluster: &DescribeClusterResponse, + ) { + for broker in &describe_cluster.brokers { + let node = KafkaNode::new( + broker.broker_id, + KafkaAddress::new(broker.host.clone(), broker.port), + broker.rack.clone(), + ); + self.add_node_if_new(node).await; + } + + tracing::debug!( + "Storing controller metadata, controller is now broker {}", + describe_cluster.controller_id.0 + ); + self.controller_broker.set(describe_cluster.controller_id); + } + fn process_find_coordinator_response( &mut self, version: i16, @@ -3593,34 +3615,69 @@ The connection to the client has been closed." } } + self.rewrite_controller_id(&mut metadata.controller_id, &up_shotover_nodes); + + Ok(()) + } + + /// Rewrite DescribeCluster response to appear as if the shotover cluster is the real cluster and the real kafka brokers do not exist + fn rewrite_describe_cluster_response( + &self, + describe_cluster: &mut DescribeClusterResponse, + ) -> Result<()> { + // This should never be empty since the local shotover node always considers itself UP + let up_shotover_nodes: Vec<_> = self + .shotover_nodes + .iter() + .filter(|shotover_node| shotover_node.is_up()) + .collect(); + + // Overwrite list of brokers with the list of UP shotover nodes + describe_cluster.brokers = up_shotover_nodes + .iter() + .map(|shotover_node| { + DescribeClusterBroker::default() + .with_broker_id(shotover_node.broker_id) + .with_host(shotover_node.address_for_clients.host.clone()) + .with_port(shotover_node.address_for_clients.port) + .with_rack(Some(shotover_node.rack.clone())) + }) + .collect(); + + self.rewrite_controller_id(&mut describe_cluster.controller_id, &up_shotover_nodes); + + Ok(()) + } + + fn rewrite_controller_id( + &self, + controller_id_field: &mut BrokerId, + up_shotover_nodes: &[&ShotoverNode], + ) { if let Some(controller_node_rack) = self .nodes .iter() - .find(|node| node.broker_id == metadata.controller_id) - .map(|x| x.rack.clone()) + .find(|node| node.broker_id == *controller_id_field) + .map(|x| &x.rack) { // If broker has no rack - use the first UP shotover node. // If broker has rack - use the first UP shotover node with the same rack if available, // and fall back to use the first UP shotover node out of the rack otherwise. // This is deterministic because the list of UP shotover nodes is sorted and partitioning does not change the order. - let shotover_nodes_by_rack = partition_shotover_nodes_by_rack( - &up_shotover_nodes, - &controller_node_rack.as_ref(), - ); + let shotover_nodes_by_rack = + partition_shotover_nodes_by_rack(up_shotover_nodes, &controller_node_rack.as_ref()); let shotover_node = shotover_nodes_by_rack .nodes_in_rack .first() .or_else(|| shotover_nodes_by_rack.nodes_out_of_rack.first()) .expect("There will always be at least one up shotover node"); - metadata.controller_id = shotover_node.broker_id; + *controller_id_field = shotover_node.broker_id; } else { // controller is either -1 or an unknown broker // In both cases it is reasonable to set to -1 to indicate the controller is unknown. - metadata.controller_id = BrokerId(-1); + *controller_id_field = BrokerId(-1); } - - Ok(()) } async fn add_node_if_new(&mut self, new_node: KafkaNode) { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 59b36cb0c..996010b92 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -35,7 +35,7 @@ impl KafkaConnectionBuilderJava { // The list of dependencies can be found here: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.pom // These are deployed to and loaded from a path like target/debug/jassets let jvm = Jvm::new(&[ - "org.apache.kafka:kafka-clients:3.7.0", + "org.apache.kafka:kafka-clients:3.8.1", "org.slf4j:slf4j-api:1.7.36", "org.slf4j:slf4j-simple:1.7.36", ]);