Skip to content

Commit

Permalink
Support kafka java driver 3.8 (#1819)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 18, 2024
1 parent e50d528 commit 8045ecd
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 26 deletions.
107 changes: 82 additions & 25 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction;
use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic;
use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult;
use kafka_protocol::messages::describe_cluster_response::DescribeClusterBroker;
use kafka_protocol::messages::describe_producers_request::TopicRequest;
use kafka_protocol::messages::describe_producers_response::TopicResponse;
use kafka_protocol::messages::fetch_request::FetchTopic;
Expand All @@ -32,15 +33,16 @@ use kafka_protocol::messages::produce_response::{
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeGroupsRequest, DescribeGroupsResponse, DescribeProducersRequest,
DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse,
DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest,
DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse,
FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -1131,6 +1133,7 @@ The connection to the client has been closed."
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::Metadata(_)
| RequestBody::DescribeCluster(_)
| RequestBody::DescribeConfigs(_)
| RequestBody::AlterConfigs(_)
| RequestBody::CreatePartitions(_)
Expand Down Expand Up @@ -3066,14 +3069,13 @@ The connection to the client has been closed."
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeCluster(_),
body: ResponseBody::DescribeCluster(describe_cluster),
..
})) => {
// If clients were to send this we would need to rewrite the broker information.
// However I dont think clients actually send this, so just error to ensure we dont break invariants.
return Err(anyhow!(
"I think this is a raft specific message and never sent by clients"
));
self.process_describe_cluster_response(describe_cluster)
.await;
self.rewrite_describe_cluster_response(describe_cluster)?;
response.invalidate_cache();
}
_ => {}
}
Expand Down Expand Up @@ -3387,6 +3389,26 @@ The connection to the client has been closed."
}
}

async fn process_describe_cluster_response(
&mut self,
describe_cluster: &DescribeClusterResponse,
) {
for broker in &describe_cluster.brokers {
let node = KafkaNode::new(
broker.broker_id,
KafkaAddress::new(broker.host.clone(), broker.port),
broker.rack.clone(),
);
self.add_node_if_new(node).await;
}

tracing::debug!(
"Storing controller metadata, controller is now broker {}",
describe_cluster.controller_id.0
);
self.controller_broker.set(describe_cluster.controller_id);
}

fn process_find_coordinator_response(
&mut self,
version: i16,
Expand Down Expand Up @@ -3593,34 +3615,69 @@ The connection to the client has been closed."
}
}

self.rewrite_controller_id(&mut metadata.controller_id, &up_shotover_nodes);

Ok(())
}

/// Rewrite DescribeCluster response to appear as if the shotover cluster is the real cluster and the real kafka brokers do not exist
fn rewrite_describe_cluster_response(
&self,
describe_cluster: &mut DescribeClusterResponse,
) -> Result<()> {
// This should never be empty since the local shotover node always considers itself UP
let up_shotover_nodes: Vec<_> = self
.shotover_nodes
.iter()
.filter(|shotover_node| shotover_node.is_up())
.collect();

// Overwrite list of brokers with the list of UP shotover nodes
describe_cluster.brokers = up_shotover_nodes
.iter()
.map(|shotover_node| {
DescribeClusterBroker::default()
.with_broker_id(shotover_node.broker_id)
.with_host(shotover_node.address_for_clients.host.clone())
.with_port(shotover_node.address_for_clients.port)
.with_rack(Some(shotover_node.rack.clone()))
})
.collect();

self.rewrite_controller_id(&mut describe_cluster.controller_id, &up_shotover_nodes);

Ok(())
}

fn rewrite_controller_id(
&self,
controller_id_field: &mut BrokerId,
up_shotover_nodes: &[&ShotoverNode],
) {
if let Some(controller_node_rack) = self
.nodes
.iter()
.find(|node| node.broker_id == metadata.controller_id)
.map(|x| x.rack.clone())
.find(|node| node.broker_id == *controller_id_field)
.map(|x| &x.rack)
{
// If broker has no rack - use the first UP shotover node.
// If broker has rack - use the first UP shotover node with the same rack if available,
// and fall back to use the first UP shotover node out of the rack otherwise.
// This is deterministic because the list of UP shotover nodes is sorted and partitioning does not change the order.
let shotover_nodes_by_rack = partition_shotover_nodes_by_rack(
&up_shotover_nodes,
&controller_node_rack.as_ref(),
);
let shotover_nodes_by_rack =
partition_shotover_nodes_by_rack(up_shotover_nodes, &controller_node_rack.as_ref());
let shotover_node = shotover_nodes_by_rack
.nodes_in_rack
.first()
.or_else(|| shotover_nodes_by_rack.nodes_out_of_rack.first())
.expect("There will always be at least one up shotover node");

metadata.controller_id = shotover_node.broker_id;
*controller_id_field = shotover_node.broker_id;
} else {
// controller is either -1 or an unknown broker
// In both cases it is reasonable to set to -1 to indicate the controller is unknown.
metadata.controller_id = BrokerId(-1);
*controller_id_field = BrokerId(-1);
}

Ok(())
}

async fn add_node_if_new(&mut self, new_node: KafkaNode) {
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl KafkaConnectionBuilderJava {
// The list of dependencies can be found here: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.pom
// These are deployed to and loaded from a path like target/debug/jassets
let jvm = Jvm::new(&[
"org.apache.kafka:kafka-clients:3.7.0",
"org.apache.kafka:kafka-clients:3.8.1",
"org.slf4j:slf4j-api:1.7.36",
"org.slf4j:slf4j-simple:1.7.36",
]);
Expand Down

0 comments on commit 8045ecd

Please sign in to comment.