Skip to content

Commit

Permalink
Routing fixes (#1815)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 15, 2024
1 parent 65ce7cb commit 819f88c
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 82 deletions.
114 changes: 63 additions & 51 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,14 @@ impl KafkaSinkCluster {
self.store_topic_names(&mut topic_names, topic.name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteRecords(body),
..
})) => {
for topic in &body.topics {
self.store_topic_names(&mut topic_names, topic.name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
Expand Down Expand Up @@ -740,6 +748,12 @@ impl KafkaSinkCluster {
self.store_group(&mut groups, group_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetDelete(offset_delete),
..
})) => {
self.store_group(&mut groups, offset_delete.group_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::InitProducerId(InitProducerIdRequest {
Expand Down Expand Up @@ -1129,70 +1143,68 @@ The connection to the client has been closed."
&mut self,
mut request: Message,
) -> Result<()> {
if let Some(request_frame) = T::get_request_frame(&mut request) {
let routing = T::split_by_destination(self, request_frame);
let request_frame = T::get_request_frame(&mut request);
let routing = T::split_by_destination(self, request_frame);

if routing.is_empty() {
// Produce contains no topics, so we can just pick a random destination.
// The request is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);
if routing.is_empty() {
// Produce contains no topics, so we can just pick a random destination.
// The request is unchanged so we can just send as is.
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original request as is,
// act like this never happened 😎,
// we dont even need to invalidate the request's cache.
let (destination, topic_data) = routing.into_iter().next().unwrap();
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
"Routing request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original request as is,
// act like this never happened 😎,
// we dont even need to invalidate the request's cache.
let (destination, topic_data) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!("Routing request to single broker {:?}", destination.0);
} else {
// The request has been split so it may be delivered to multiple destinations.
// We must generate a unique request for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, topic_data)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};

let mut request = if i == 0 {
// First request acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
let request_frame = T::get_request_frame(&mut request);
T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
combine_responses,
});
tracing::debug!("Routing request to single broker {:?}", destination.0);
} else {
// The request has been split so it may be delivered to multiple destinations.
// We must generate a unique request for each destination.
let combine_responses = routing.len();
request.invalidate_cache();
for (i, (destination, topic_data)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
destination
};
let mut request = if i == 0 {
// First request acts as base and retains message id
request.clone()
} else {
request.clone_with_new_id()
};
if let Some(request_frame) = T::get_request_frame(&mut request) {
T::reassemble(request_frame, topic_data)
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
tracing::debug!("Routing request to multiple brokers");
}
tracing::debug!("Routing request to multiple brokers");
}
Ok(())
}
Expand Down
62 changes: 31 additions & 31 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::HashMap;
pub trait RequestSplitAndRouter {
type SubRequests;
type Request;
fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request>;
fn get_request_frame(request: &mut Message) -> &mut Self::Request;
fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
Expand All @@ -42,13 +42,13 @@ impl RequestSplitAndRouter for ProduceRequestSplitAndRouter {
transform.split_produce_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Produce(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -70,13 +70,13 @@ impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter {
transform.split_add_partition_to_txn_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -98,13 +98,13 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter {
transform.split_list_offsets_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListOffsets(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -126,13 +126,13 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter {
transform.split_offset_for_leader_epoch_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -154,13 +154,13 @@ impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter {
transform.split_delete_records_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteRecords(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -182,13 +182,13 @@ impl RequestSplitAndRouter for DescribeProducersRequestSplitAndRouter {
transform.split_describe_producers_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeProducers(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -210,13 +210,13 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
transform.split_delete_groups_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -238,13 +238,13 @@ impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -266,13 +266,13 @@ impl RequestSplitAndRouter for ListTransactionsSplitAndRouter {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand All @@ -294,13 +294,13 @@ impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
transform.split_offset_fetch_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetFetch(request),
..
})) => Some(request),
_ => None,
})) => request,
_ => unreachable!(),
}
}

Expand Down

0 comments on commit 819f88c

Please sign in to comment.