Skip to content

Commit

Permalink
KafkaSinkCluster: describe_log_dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 19, 2024
1 parent 3f76b54 commit 39f99df
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 19 deletions.
1 change: 1 addition & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;
test_cases::describe_log_dirs(&connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down
99 changes: 94 additions & 5 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo,
NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record,
RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
TransactionDescription,
ConsumerGroupDescription, DescribeReplicaLogDirInfo, ExpectedResponse, IsolationLevel,
KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicPartition, TopicPartitionReplica, TransactionDescription,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -1801,6 +1801,95 @@ async fn create_and_list_partition_reassignments(connection_builder: &KafkaConne
);
}

// Due to specifying brokers to query directly, this test is specialized to a 2 shotover node, 4 kafka node cluster.
// So we call it directly from such a test, instead of including it in the test suite.
pub async fn describe_log_dirs(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

// Create a topic that is replicated to every node in the cluster
admin
.create_topics_and_wait(&[
NewTopic {
name: "describe_logs_test",
num_partitions: 1,
replication_factor: 6,
},
NewTopic {
name: "describe_logs_test2",
num_partitions: 1,
replication_factor: 6,
},
])
.await;
let producer = connection_builder.connect_producer("all", 100).await;
producer
.assert_produce(
Record {
payload: "initial",
topic_name: "describe_logs_test",
key: None,
},
Some(0),
)
.await;

// describe the topic and assert contains path
let result = admin
.describe_replica_log_dirs(&[
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 0,
},
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 1,
},
TopicPartitionReplica {
topic_name: "describe_logs_test2".to_owned(),
partition: 0,
broker_id: 0,
},
])
.await;
assert_eq!(
result,
HashMap::from([
(
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 0,
},
DescribeReplicaLogDirInfo {
path: Some("/bitnami/kafka/data".to_owned())
}
),
(
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 1,
},
DescribeReplicaLogDirInfo {
path: Some("/bitnami/kafka/data".to_owned())
}
),
(
TopicPartitionReplica {
topic_name: "describe_logs_test2".to_owned(),
partition: 0,
broker_id: 0,
},
DescribeReplicaLogDirInfo {
path: Some("/bitnami/kafka/data".to_owned())
}
)
])
);
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
Expand Down
43 changes: 36 additions & 7 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse,
DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest,
DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse,
FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
DescribeLogDirsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
Expand All @@ -59,10 +59,10 @@ use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter,
DescribeProducersRequestSplitAndRouter, DescribeTransactionsSplitAndRouter,
ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter,
OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
DescribeLogDirsSplitAndRouter, DescribeProducersRequestSplitAndRouter,
DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -1139,6 +1139,10 @@ The connection to the client has been closed."
body: RequestBody::ListTransactions(_),
..
})) => self.split_and_route_request::<ListTransactionsSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeLogDirs(_),
..
})) => self.split_and_route_request::<DescribeLogDirsSplitAndRouter>(request)?,

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -2347,6 +2351,10 @@ The connection to the client has been closed."
body: ResponseBody::DescribeGroups(base),
..
})) => Self::combine_describe_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeLogDirs(base),
..
})) => Self::combine_describe_log_dirs(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2711,6 +2719,27 @@ The connection to the client has been closed."
Ok(())
}

fn combine_describe_log_dirs(
base: &mut DescribeLogDirsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeLogDirs(next),
..
})) = next.frame()
{
for result in &mut next.results {
let log_dir = result.log_dir.as_str();
result.log_dir = StrBytes::from_string(format!("{}:{log_dir}", 0));
}
base.results.extend(std::mem::take(&mut next.results));
}
}

Ok(())
}

fn combine_describe_groups(
base: &mut DescribeGroupsResponse,
drain: impl Iterator<Item = Message>,
Expand Down
35 changes: 32 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use kafka_protocol::messages::{
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest,
DescribeGroupsRequest, DescribeProducersRequest, DescribeTransactionsRequest, GroupId,
ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName, TransactionalId,
DescribeGroupsRequest, DescribeLogDirsRequest, DescribeProducersRequest,
DescribeTransactionsRequest, GroupId, ListGroupsRequest, ListOffsetsRequest,
ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest,
TopicName, TransactionalId,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -281,6 +282,34 @@ impl RequestSplitAndRouter for ListTransactionsSplitAndRouter {
}
}

pub struct DescribeLogDirsSplitAndRouter;

impl RequestSplitAndRouter for DescribeLogDirsSplitAndRouter {
type Request = DescribeLogDirsRequest;
type SubRequests = ();

fn split_by_destination(
transform: &mut KafkaSinkCluster,
_request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeLogDirs(request),
..
})) => request,
_ => unreachable!(),
}
}

fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) {
// No need to reassemble, each DescribeLogDirs is an exact clone of the original
}
}

pub struct DescribeTransactionsSplitAndRouter;

impl RequestSplitAndRouter for DescribeTransactionsSplitAndRouter {
Expand Down
1 change: 1 addition & 0 deletions test-helpers/src/connection/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ impl Value {
}

/// Convert this java value into a native rust type
/// When T is Option<U>, it is None when java null, otherwise Some.
pub(crate) fn into_rust<T>(self) -> T
where
T: DeserializeOwned + Any,
Expand Down
62 changes: 58 additions & 4 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ConsumerGroupDescription,
ExpectedResponse, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic,
OffsetAndMetadata, OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record,
RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription,
TopicPartition, TopicPartitionInfo, TransactionDescription,
DescribeReplicaLogDirInfo, ExpectedResponse, ListOffsetsResultInfo, NewPartition,
NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, PartitionReassignment,
ProduceResult, ProducerState, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo, TopicPartitionReplica,
TransactionDescription,
};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -871,6 +872,39 @@ impl KafkaAdminJava {
.await;
}

pub async fn describe_replica_log_dirs(
&self,
topic_partitions: &[TopicPartitionReplica],
) -> HashMap<TopicPartitionReplica, DescribeReplicaLogDirInfo> {
let topic_partitions_java = self.jvm.new_set(
"org.apache.kafka.common.TopicPartitionReplica",
topic_partitions
.iter()
.map(|topic_partition| topic_partition_replica_to_java(&self.jvm, topic_partition))
.collect(),
);

let results = self
.admin
.call("describeReplicaLogDirs", vec![topic_partitions_java])
.call_async("all", vec![])
.await;

map_iterator(results)
.map(|(topic_partition, log_dir_info)| {
(
topic_partition_replica_to_rust(topic_partition),
DescribeReplicaLogDirInfo {
path: log_dir_info
.cast("org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult$ReplicaLogDirInfo")
.call("getCurrentReplicaLogDir", vec!())
.into_rust()
},
)
})
.collect()
}

pub async fn elect_leaders(&self, topic_partitions: &[TopicPartition]) -> Result<()> {
let election_type = self
.jvm
Expand Down Expand Up @@ -1052,6 +1086,26 @@ fn topic_partition_to_rust(tp: Value) -> TopicPartition {
}
}

fn topic_partition_replica_to_java(jvm: &Jvm, tp: &TopicPartitionReplica) -> Value {
jvm.construct(
"org.apache.kafka.common.TopicPartitionReplica",
vec![
jvm.new_string(&tp.topic_name),
jvm.new_int(tp.partition),
jvm.new_int(tp.broker_id),
],
)
}

fn topic_partition_replica_to_rust(tp: Value) -> TopicPartitionReplica {
let tp = tp.cast("org.apache.kafka.common.TopicPartitionReplica");
TopicPartitionReplica {
topic_name: tp.call("topic", vec![]).into_rust(),
partition: tp.call("partition", vec![]).into_rust(),
broker_id: tp.call("brokerId", vec![]).into_rust(),
}
}

fn offset_and_metadata_to_rust(offset_and_metadata: Value) -> OffsetAndMetadata {
let offset_and_metadata =
offset_and_metadata.cast("org.apache.kafka.clients.consumer.OffsetAndMetadata");
Expand Down
24 changes: 24 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,18 @@ impl KafkaAdmin {
}
}
}
pub async fn describe_replica_log_dirs(
&self,
topic_partitions: &[TopicPartitionReplica],
) -> HashMap<TopicPartitionReplica, DescribeReplicaLogDirInfo> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support describe_replica_log_dirs")
}
Self::Java(java) => java.describe_replica_log_dirs(topic_partitions).await,
}
}

pub async fn elect_leaders(&self, topic_partitions: &[TopicPartition]) -> Result<()> {
match self {
Expand Down Expand Up @@ -630,6 +642,18 @@ pub struct TopicPartition {
pub partition: i32,
}

#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct TopicPartitionReplica {
pub topic_name: String,
pub partition: i32,
pub broker_id: i32,
}

#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct DescribeReplicaLogDirInfo {
pub path: Option<String>,
}

pub enum ResourceSpecifier<'a> {
Topic(&'a str),
}
Expand Down

0 comments on commit 39f99df

Please sign in to comment.