diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 22108b9c1..e821bd57b 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -887,8 +887,11 @@ impl KafkaSinkCluster { if !topic_names.is_empty() || !topic_ids.is_empty() || self.controller_broker.get().is_none() + || self.nodes.is_empty() { - let mut metadata = self.get_metadata_of_topics(topic_names, topic_ids).await?; + let mut metadata = self + .get_metadata_of_topics_with_retry(topic_names, topic_ids) + .await?; match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Metadata(metadata), @@ -1978,10 +1981,43 @@ The connection to the client has been closed." } } - async fn get_metadata_of_topics( + /// Retry if we get an empty brokers list + /// We dont actually retry on failure since thats not a known failure mode for this request. + async fn get_metadata_of_topics_with_retry( &mut self, topic_names: Vec, topic_ids: Vec, + ) -> Result { + for _ in 0..3 { + let mut response = self + .get_metadata_of_topics(&topic_names, &topic_ids) + .await?; + + match response.frame() { + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Metadata(metadata), + .. + })) => { + if metadata.brokers.is_empty() { + // Cluster is probably still starting up, retry after a delay. + tokio::time::sleep(Duration::from_millis(200)).await; + continue; + } else { + // cluster is ready, return the response + return Ok(response); + } + } + response => return Err(anyhow!("Expected metadata response but was {response:?}")), + } + } + + Err(anyhow!("Broker returned empty list of brokers")) + } + + async fn get_metadata_of_topics( + &mut self, + topic_names: &[TopicName], + topic_ids: &[Uuid], ) -> Result { let api_version = if topic_ids.is_empty() { 4 } else { 12 }; let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { @@ -1992,12 +2028,12 @@ The connection to the client has been closed." body: RequestBody::Metadata( MetadataRequest::default().with_topics(Some( topic_names - .into_iter() - .map(|name| MetadataRequestTopic::default().with_name(Some(name))) - .chain(topic_ids.into_iter().map(|id| { + .iter() + .map(|name| MetadataRequestTopic::default().with_name(Some(name.clone()))) + .chain(topic_ids.iter().map(|id| { MetadataRequestTopic::default() .with_name(None) - .with_topic_id(id) + .with_topic_id(*id) })) .collect(), )),