diff --git a/Cargo.lock b/Cargo.lock index 3dd821617..3680d1699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2795,8 +2795,7 @@ dependencies = [ [[package]] name = "kafka-protocol" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" +source = "git+https://github.com/rukai/kafka-protocol-rs?branch=shotover_fork#6990104ee29b915df702700fd85ed498d4312bb8" dependencies = [ "anyhow", "bytes", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 12498d7d9..031278b19 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -115,7 +115,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } +kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/rukai/kafka-protocol-rs", branch = "shotover_fork" } rustls = { version = "0.23.0", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/transforms/kafka/sink_cluster/api_versions.rs b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs new file mode 100644 index 000000000..5bb28fe70 --- /dev/null +++ b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs @@ -0,0 +1,130 @@ +use kafka_protocol::{messages::ApiKey, protocol::VersionRange}; + +// This table defines which api versions KafkaSinkCluster supports. +// * When adding a new message type: +// + Make sure you have implemented routing logic for the message type +// + Make sure any fields referring to internal cluster details such as broker ids or addresses are rewritten to refer to shotover nodes +// * When adding a new message type version: +// + Make sure any new fields do not break any of the requirements listed above +pub(crate) fn versions_supported_by_key(api_key: i16) -> Option { + match ApiKey::try_from(api_key) { + Ok(ApiKey::ProduceKey) => Some(VersionRange { min: 0, max: 11 }), + Ok(ApiKey::FetchKey) => Some(VersionRange { min: 0, max: 16 }), + Ok(ApiKey::ListOffsetsKey) => Some(VersionRange { min: 0, max: 8 }), + Ok(ApiKey::MetadataKey) => Some(VersionRange { min: 0, max: 12 }), + Ok(ApiKey::OffsetCommitKey) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::OffsetFetchKey) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::FindCoordinatorKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::JoinGroupKey) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::HeartbeatKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::LeaveGroupKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SyncGroupKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::DescribeGroupsKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::ListGroupsKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SaslHandshakeKey) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ApiVersionsKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::CreateTopicsKey) => Some(VersionRange { min: 0, max: 7 }), + Ok(ApiKey::DeleteTopicsKey) => Some(VersionRange { min: 0, max: 6 }), + Ok(ApiKey::DeleteRecordsKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::InitProducerIdKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::OffsetForLeaderEpochKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AddPartitionsToTxnKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::AddOffsetsToTxnKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::EndTxnKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::TxnOffsetCommitKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::CreateAclsKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeConfigsKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AlterConfigsKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::DescribeLogDirsKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::SaslAuthenticateKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::CreatePartitionsKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DeleteGroupsKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::ElectLeadersKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::AlterPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::OffsetDeleteKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::AlterPartitionKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeClusterKey) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::DescribeProducersKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::DescribeTransactionsKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListTransactionsKey) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ConsumerGroupHeartbeatKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ConsumerGroupDescribeKey) => Some(VersionRange { min: 0, max: 0 }), + // This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION + // So its not clear at all how to implement this and its not even possible to test it. + // Instead lets just ask the client to not send it at all. + // We can consider supporting it when kafka itself starts to support it but we will need to be very + // careful to correctly implement the pagination/cursor logic. + Ok(ApiKey::DescribeTopicPartitionsKey) => None, + Ok(_) => None, + Err(_) => None, + } +} + +// This test gives visibility into the api versions that shotover doesnt support yet. +// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions. +// The actual upgrade can be done later. +#[test] +fn check_api_version_backlog() { + use std::fmt::Write; + const EXPECTED_ERROR_MESSAGE: &str = r#" +LeaderAndIsrKey kafka-protocol=0..7 shotover=NotSupported +StopReplicaKey kafka-protocol=0..4 shotover=NotSupported +UpdateMetadataKey kafka-protocol=0..8 shotover=NotSupported +ControlledShutdownKey kafka-protocol=0..3 shotover=NotSupported +WriteTxnMarkersKey kafka-protocol=0..1 shotover=NotSupported +DescribeAclsKey kafka-protocol=0..3 shotover=NotSupported +DeleteAclsKey kafka-protocol=0..3 shotover=NotSupported +AlterReplicaLogDirsKey kafka-protocol=0..2 shotover=NotSupported +CreateDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +RenewDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +ExpireDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +DescribeDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +IncrementalAlterConfigsKey kafka-protocol=0..1 shotover=NotSupported +DescribeClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +AlterClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +DescribeUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +AlterUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +VoteKey kafka-protocol=0..0 shotover=NotSupported +BeginQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +EndQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +DescribeQuorumKey kafka-protocol=0..1 shotover=NotSupported +UpdateFeaturesKey kafka-protocol=0..1 shotover=NotSupported +EnvelopeKey kafka-protocol=0..0 shotover=NotSupported +FetchSnapshotKey kafka-protocol=0..0 shotover=NotSupported +BrokerRegistrationKey kafka-protocol=0..3 shotover=NotSupported +BrokerHeartbeatKey kafka-protocol=0..1 shotover=NotSupported +UnregisterBrokerKey kafka-protocol=0..0 shotover=NotSupported +AllocateProducerIdsKey kafka-protocol=0..0 shotover=NotSupported +ControllerRegistrationKey kafka-protocol=0..0 shotover=NotSupported +GetTelemetrySubscriptionsKey kafka-protocol=0..0 shotover=NotSupported +PushTelemetryKey kafka-protocol=0..0 shotover=NotSupported +AssignReplicasToDirsKey kafka-protocol=0..0 shotover=NotSupported +ListClientMetricsResourcesKey kafka-protocol=0..0 shotover=NotSupported +DescribeTopicPartitionsKey kafka-protocol=0..0 shotover=NotSupported +"#; + + let mut error_message = String::new(); + for api_key in ApiKey::iterate_all() { + let shotover_version = versions_supported_by_key(api_key as i16); + + let kafka_protocol_version = api_key.valid_versions(); + if shotover_version != Some(kafka_protocol_version) { + let shotover_version = match shotover_version { + Some(version) => format!("{version}"), + None => "NotSupported".to_owned(), + }; + writeln!( + error_message, + "{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}" + ) + .unwrap(); + } + } + + pretty_assertions::assert_eq!( + error_message.trim(), + EXPECTED_ERROR_MESSAGE.trim(), + "The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE", + ); +} diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 22108b9c1..d809ad29c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -73,6 +73,7 @@ use std::time::{Duration, Instant}; use tokio::sync::RwLock; use uuid::Uuid; +mod api_versions; mod connections; mod kafka_node; mod scram_over_mtls; @@ -3223,25 +3224,22 @@ The connection to the client has been closed." body: ResponseBody::ApiVersions(api_versions), .. })) => { - let original_size = api_versions.api_keys.len(); - - // List of keys that shotover doesnt support and so should be removed from supported keys list - let disable_keys = [ - // This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION - // So its not clear at all how to implement this and its not even possible to test it. - // Instead lets just ask the client to not send it at all. - // We can consider supporting it when kafka itself starts to support it but we will need to be very - // careful to correctly implement the pagination/cursor logic. - ApiKey::DescribeTopicPartitionsKey as i16, - ]; - api_versions - .api_keys - .retain(|x| !disable_keys.contains(&x.api_key)); - - if original_size != api_versions.api_keys.len() { - // only invalidate the cache if we actually removed anything - response.invalidate_cache(); - } + api_versions.api_keys.retain_mut(|api_key| { + match api_versions::versions_supported_by_key(api_key.api_key) { + Some(version) => { + if api_key.max_version > version.max { + api_key.max_version = version.max; + } + if api_key.min_version < version.min { + api_key.min_version = version.min; + } + true + } + None => false, + } + }); + + response.invalidate_cache(); } _ => {} }