From 87f79e5ee1c0cc55fc69b786864c3d44e140f299 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 19 Nov 2024 15:39:45 +1100 Subject: [PATCH] include broker id in log string --- shotover-proxy/tests/kafka_int_tests/mod.rs | 7 +- .../tests/kafka_int_tests/test_cases.rs | 80 ++++++----- .../src/transforms/kafka/sink_cluster/mod.rs | 133 ++++++++++++------ 3 files changed, 142 insertions(+), 78 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 4287b92b7..56c99406d 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -585,7 +585,12 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::cluster_test_suite(&connection_builder).await; - test_cases::describe_log_dirs(&connection_builder).await; + + #[allow(irrefutable_let_patterns)] + if let KafkaDriver::Java = driver { + // describeLogDirs is only on java driver + test_cases::describe_log_dirs(&connection_builder).await; + } for shotover in shotovers { tokio::time::timeout( diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 2e775d73a..1c45d1e70 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1853,40 +1853,52 @@ pub async fn describe_log_dirs(connection_builder: &KafkaConnectionBuilder) { }, ]) .await; - assert_eq!( - result, - HashMap::from([ - ( - TopicPartitionReplica { - topic_name: "describe_logs_test".to_owned(), - partition: 0, - broker_id: 0, - }, - DescribeReplicaLogDirInfo { - path: Some("/bitnami/kafka/data".to_owned()) - } - ), - ( - TopicPartitionReplica { - topic_name: "describe_logs_test".to_owned(), - partition: 0, - broker_id: 1, - }, - DescribeReplicaLogDirInfo { - path: Some("/bitnami/kafka/data".to_owned()) - } - ), - ( - TopicPartitionReplica { - topic_name: "describe_logs_test2".to_owned(), - partition: 0, - broker_id: 0, - }, - DescribeReplicaLogDirInfo { - path: Some("/bitnami/kafka/data".to_owned()) - } - ) - ]) + + /// Assert that the path in the DescribeLogsDir response matches the custom format used by shotover. + /// This format looks like: actual-kafka-broker-id3:/original/kafka/path/here + fn assert_valid_path(info: &DescribeReplicaLogDirInfo) { + let id = info + .path + .as_ref() + .unwrap() + .strip_prefix("actual-kafka-broker-id") + .unwrap() + .strip_suffix(":/bitnami/kafka/data") + .unwrap(); + let id: i32 = id.parse().unwrap(); + assert!( + id < 6, + "There are only 6 brokers so the broker id must be between 0-5 inclusive" + ); + } + + assert_eq!(result.len(), 3); + assert_valid_path( + result + .get(&TopicPartitionReplica { + topic_name: "describe_logs_test".to_owned(), + partition: 0, + broker_id: 0, + }) + .unwrap(), + ); + assert_valid_path( + result + .get(&TopicPartitionReplica { + topic_name: "describe_logs_test".to_owned(), + partition: 0, + broker_id: 1, + }) + .unwrap(), + ); + assert_valid_path( + result + .get(&TopicPartitionReplica { + topic_name: "describe_logs_test2".to_owned(), + partition: 0, + broker_id: 0, + }) + .unwrap(), ); } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index a727b23cd..9d29fa88b 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -2229,11 +2229,19 @@ The connection to the client has been closed." } else { let drain = self.pending_requests.drain(..combine_responses).map(|x| { if let PendingRequest { - state: PendingRequestState::Received { response, .. }, + state: + PendingRequestState::Received { + response, + destination, + .. + }, .. } = x { - response + ResponseToBeCombined { + response, + destination, + } } else { unreachable!("Guaranteed by all_combined_received") } @@ -2297,12 +2305,12 @@ The connection to the client has been closed." result } - fn combine_responses(mut drain: impl Iterator) -> Result { + fn combine_responses(mut drain: impl Iterator) -> Result { // Take this response as base. // Then iterate over all remaining combined responses and integrate them into the base. let mut base = drain.next().unwrap(); - match base.frame() { + match base.response.frame() { Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Fetch(base), .. @@ -2352,9 +2360,9 @@ The connection to the client has been closed." .. })) => Self::combine_describe_groups(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::DescribeLogDirs(base), + body: ResponseBody::DescribeLogDirs(base_body), .. - })) => Self::combine_describe_log_dirs(base, drain)?, + })) => Self::combine_describe_log_dirs(base.destination, base_body, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2370,20 +2378,20 @@ The connection to the client has been closed." } } - base.invalidate_cache(); + base.response.invalidate_cache(); - Ok(base) + Ok(base.response) } fn combine_fetch_responses( base_fetch: &mut FetchResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Fetch(next_fetch), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next_fetch.responses) { if let Some(base_response) = base_fetch.responses.iter_mut().find(|response| { @@ -2416,13 +2424,13 @@ The connection to the client has been closed." fn combine_list_offsets_responses( base_list_offsets: &mut ListOffsetsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListOffsets(next_list_offsets), .. - })) = next.frame() + })) = next.response.frame() { for next_topic in std::mem::take(&mut next_list_offsets.topics) { if let Some(base_topic) = base_list_offsets @@ -2456,13 +2464,13 @@ The connection to the client has been closed." fn combine_offset_for_leader_epoch_responses( base_list_offsets: &mut OffsetForLeaderEpochResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::OffsetForLeaderEpoch(next_body), .. - })) = next.frame() + })) = next.response.frame() { for next_topic in std::mem::take(&mut next_body.topics) { if let Some(base_topic) = base_list_offsets @@ -2495,7 +2503,7 @@ The connection to the client has been closed." fn combine_produce_responses( base_produce: &mut ProduceResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { let mut base_responses: HashMap = std::mem::take(&mut base_produce.responses) @@ -2506,7 +2514,7 @@ The connection to the client has been closed." if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Produce(next_produce), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next_produce.responses) { if let Some(base_response) = base_responses.get_mut(&next_response.name) { @@ -2539,7 +2547,7 @@ The connection to the client has been closed." fn combine_delete_records( base_body: &mut DeleteRecordsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { let mut base_topics: HashMap = std::mem::take(&mut base_body.topics) @@ -2550,7 +2558,7 @@ The connection to the client has been closed." if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DeleteRecords(next_body), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next_body.topics) { if let Some(base_response) = base_topics.get_mut(&next_response.name) { @@ -2582,13 +2590,13 @@ The connection to the client has been closed." fn combine_delete_groups_responses( base_delete_groups: &mut DeleteGroupsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DeleteGroups(next_delete_groups), .. - })) = next.frame() + })) = next.response.frame() { base_delete_groups .results @@ -2601,13 +2609,13 @@ The connection to the client has been closed." fn combine_offset_fetch( base_offset_fetch: &mut OffsetFetchResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::OffsetFetch(next_offset_fetch), .. - })) = next.frame() + })) = next.response.frame() { base_offset_fetch .groups @@ -2620,13 +2628,13 @@ The connection to the client has been closed." fn combine_list_groups( base_list_groups: &mut ListGroupsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListGroups(next_list_groups), .. - })) = next.frame() + })) = next.response.frame() { base_list_groups .groups @@ -2639,7 +2647,7 @@ The connection to the client has been closed." fn combine_describe_producers( base: &mut DescribeProducersResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { let mut base_responses: HashMap = std::mem::take(&mut base.topics) @@ -2650,7 +2658,7 @@ The connection to the client has been closed." if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeProducers(next), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next.topics) { if let Some(base_response) = base_responses.get_mut(&next_response.name) { @@ -2682,13 +2690,13 @@ The connection to the client has been closed." fn combine_describe_transactions( base: &mut DescribeTransactionsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeTransactions(next), .. - })) = next.frame() + })) = next.response.frame() { base.transaction_states .extend(std::mem::take(&mut next.transaction_states)); @@ -2700,13 +2708,13 @@ The connection to the client has been closed." fn combine_list_transactions( base_list_transactions: &mut ListTransactionsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListTransactions(next_list_transactions), .. - })) = next.frame() + })) = next.response.frame() { base_list_transactions .transaction_states @@ -2720,35 +2728,69 @@ The connection to the client has been closed." } fn combine_describe_log_dirs( - base: &mut DescribeLogDirsResponse, - drain: impl Iterator, + base_destination: Destination, + base_body: &mut DescribeLogDirsResponse, + drain: impl Iterator, ) -> Result<()> { + Self::prepend_destination_to_log_dir(base_destination, base_body); + for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::DescribeLogDirs(next), + body: ResponseBody::DescribeLogDirs(next_body), .. - })) = next.frame() + })) = next.response.frame() { - for result in &mut next.results { - let log_dir = result.log_dir.as_str(); - result.log_dir = StrBytes::from_string(format!("{}:{log_dir}", 0)); - } - base.results.extend(std::mem::take(&mut next.results)); + Self::prepend_destination_to_log_dir(next.destination, next_body); + base_body + .results + .extend(std::mem::take(&mut next_body.results)); } } Ok(()) } + /// Rewrite the log dir paths to a custom format to allow results to be disambiguated. + /// Usually only 1 file system path can exist on a single broker machine. + /// However since shotover represents many different brokers, there can be different log dirs with identical paths associated with a single shotover instance. + /// This would be extremely confusing to a user and the client driver could assume that paths are unique. + /// So we need to alter the path to include details of which broker the path resides on. + /// + /// The downsides of this are: + /// * This leaks details of the actual kafka cluster to the user + /// * The path is no longer a valid path + /// + /// The only other possible solution I see would be to have shotover error on this request type instead. + /// But I think its reasonable to instead take these downsides and provide most of the value of this message type to the user. + /// + /// For example: + /// If the path starts as: /original/log/dir/path + /// It will become something like: actual-kafka-broker-id3:/original/log/dir/path + fn prepend_destination_to_log_dir( + destination: Destination, + body: &mut DescribeLogDirsResponse, + ) { + for result in &mut body.results { + let log_dir = result.log_dir.as_str(); + let altered_log_dir = match destination { + Destination::Id(id) => format!("actual-kafka-broker-id{id:?}:{log_dir}"), + Destination::ControlConnection => { + unreachable!("DescribeLogDirs are not sent as control connections") + } + }; + result.log_dir = StrBytes::from_string(altered_log_dir); + } + } + fn combine_describe_groups( base: &mut DescribeGroupsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeGroups(next), .. - })) = next.frame() + })) = next.response.frame() { base.groups.extend(std::mem::take(&mut next.groups)); } @@ -2759,13 +2801,13 @@ The connection to the client has been closed." fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(next_add_partitions_to_txn), .. - })) = next.frame() + })) = next.response.frame() { base_add_partitions_to_txn .results_by_transaction @@ -3908,3 +3950,8 @@ fn collect_broker_ids(shotover_nodes_by_rack: ShotoverNodesByRack) -> Vec