From 395ca0216dd79a3271b0f2af1f21eda8a931e1bc Mon Sep 17 00:00:00 2001 From: Sher Afgan <6375481+alleyshairu@users.noreply.github.com> Date: Fri, 22 Nov 2024 09:45:42 +1100 Subject: [PATCH] KafkaSinkCluster - move destination field from PendingRequestState into PendingRequest (#1828) Co-authored-by: Lucas Kent --- .../src/transforms/kafka/sink_cluster/mod.rs | 126 ++++++++---------- 1 file changed, 57 insertions(+), 69 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 743378deb..22108b9c1 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -343,13 +343,9 @@ pub(crate) struct KafkaSinkCluster { #[derive(Debug)] enum PendingRequestState { /// A route has been determined for this request but it has not yet been sent. - Routed { - destination: Destination, - request: Message, - }, + Routed { request: Message }, /// The request has been sent to the specified broker and we are now awaiting a response from that broker. Sent { - destination: Destination, /// How many responses must be received before this response is received. /// When this is 0 the next response from the broker will be for this request. /// This field must be manually decremented when another response for this broker comes through. @@ -360,8 +356,6 @@ enum PendingRequestState { /// The broker has returned a Response to this request. /// Returning this response may be delayed until a response to an earlier request comes back from another broker. Received { - // TODO: move this into the parent type - destination: Destination, response: Message, /// Some message types store the request here in case they need to resend it. // TODO: if we ever turn the Message into a CoW type we will be able to @@ -371,11 +365,8 @@ enum PendingRequestState { } impl PendingRequestState { - fn routed(broker_id: BrokerId, request: Message) -> Self { - Self::Routed { - destination: Destination::Id(broker_id), - request, - } + fn routed(request: Message) -> Self { + Self::Routed { request } } } @@ -396,6 +387,9 @@ enum PendingRequestTy { struct PendingRequest { state: PendingRequestState, + + destination: Destination, + /// Type of the request sent ty: PendingRequestTy, /// Combine the next N responses into a single response @@ -1209,7 +1203,8 @@ The connection to the client has been closed." let destination = random_broker_id(&self.nodes, &mut self.rng); tracing::debug!("Routing request to random broker {}", destination.0); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::Other, combine_responses: 1, }); @@ -1228,7 +1223,8 @@ The connection to the client has been closed." let destination = random_broker_id(&self.nodes, &mut self.rng); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::Other, combine_responses: 1, }); @@ -1250,7 +1246,8 @@ The connection to the client has been closed." T::reassemble(request_frame, topic_data); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::Other, combine_responses: 1, }); @@ -1275,7 +1272,8 @@ The connection to the client has been closed." let request_frame = T::get_request_frame(&mut request); T::reassemble(request_frame, topic_data); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::Other, combine_responses, }); @@ -1432,7 +1430,8 @@ The connection to the client has been closed." let destination = random_broker_id(&self.nodes, &mut self.rng); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), // we dont need special handling for fetch, so just use Other ty: PendingRequestTy::Other, combine_responses: 1, @@ -1455,7 +1454,8 @@ The connection to the client has been closed." fetch.topics = topics; self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), // we dont need special handling for fetch, so just use Other ty: PendingRequestTy::Other, combine_responses: 1, @@ -1494,7 +1494,8 @@ The connection to the client has been closed." fetch.topics = topics; } self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::Fetch { originally_sent_at: Instant::now(), max_wait_ms, @@ -2017,28 +2018,26 @@ The connection to the client has been closed." let mut broker_to_routed_requests: HashMap = HashMap::new(); for i in 0..self.pending_requests.len() { - if let PendingRequestState::Routed { destination, .. } = &self.pending_requests[i].state - { - let routed_requests = broker_to_routed_requests - .entry(*destination) - .or_insert_with(|| RoutedRequests { - requests: vec![], - already_pending: self - .pending_requests - .iter() - .filter(|pending_request| { - if let PendingRequestState::Sent { - destination: check_destination, - .. - } = &pending_request.state - { - check_destination == destination - } else { - false - } - }) - .count(), - }); + if let PendingRequestState::Routed { .. } = &self.pending_requests[i].state { + let destination = self.pending_requests[i].destination; + let routed_requests = + broker_to_routed_requests + .entry(destination) + .or_insert_with(|| RoutedRequests { + requests: vec![], + already_pending: self + .pending_requests + .iter() + .filter(|pending_request| { + if let PendingRequestState::Sent { .. } = &pending_request.state + { + pending_request.destination == destination + } else { + false + } + }) + .count(), + }); let request = match self.pending_requests[i].ty { PendingRequestTy::Fetch { .. } => { @@ -2056,7 +2055,6 @@ The connection to the client has been closed." PendingRequestTy::Other => None, }; let mut value = PendingRequestState::Sent { - destination: *destination, index: routed_requests.requests.len() + routed_requests.already_pending, request, }; @@ -2163,19 +2161,15 @@ The connection to the client has been closed." for response in self.temp_responses_buffer.drain(..) { let mut response = Some(response); for pending_request in &mut self.pending_requests { - if let PendingRequestState::Sent { - destination, - index, - request, - } = &mut pending_request.state + if let PendingRequestState::Sent { index, request } = + &mut pending_request.state { - if destination == connection_destination { + if &pending_request.destination == connection_destination { // Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent // All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent // to be used next time, and the time after that, and ... if *index == 0 { pending_request.state = PendingRequestState::Received { - destination: *destination, response: response.take().unwrap(), request: request.take(), }; @@ -2242,14 +2236,10 @@ The connection to the client has been closed." for i in 0..combine_responses { let pending_request = &mut self.pending_requests[i]; - if let PendingRequestState::Received { - destination, - request, - .. - } = &mut pending_request.state + if let PendingRequestState::Received { request, .. } = + &mut pending_request.state { pending_request.state = PendingRequestState::Routed { - destination: *destination, request: request.take().unwrap(), } } else { @@ -2278,12 +2268,8 @@ The connection to the client has been closed." } else { let drain = self.pending_requests.drain(..combine_responses).map(|x| { if let PendingRequest { - state: - PendingRequestState::Received { - response, - destination, - .. - }, + state: PendingRequestState::Received { response, .. }, + destination, .. } = x { @@ -3353,10 +3339,8 @@ The connection to the client has been closed." "route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive" ); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::Routed { - destination: Destination::ControlConnection, - request, - }, + state: PendingRequestState::Routed { request }, + destination: Destination::ControlConnection, ty: PendingRequestTy::Other, combine_responses: 1, }); @@ -3378,7 +3362,8 @@ The connection to the client has been closed." }; self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::Other, combine_responses: 1, }); @@ -3405,7 +3390,8 @@ The connection to the client has been closed." ); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::RoutedToGroup(group_id), combine_responses: 1, }); @@ -3432,7 +3418,8 @@ The connection to the client has been closed." ); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty: PendingRequestTy::RoutedToTransaction(transaction_id), combine_responses: 1, }); @@ -3452,7 +3439,8 @@ The connection to the client has been closed." tracing::debug!("Routing FindCoordinator to random broker {}", destination.0); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), + destination: Destination::Id(destination), ty, combine_responses: 1, });