-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
152 additions
and
21 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
use kafka_protocol::{messages::ApiKey, protocol::VersionRange}; | ||
|
||
// This table defines which api versions shotover supports. | ||
// This applies to both KafkaSinkSingle and KafkaSinkCluster | ||
// * When adding a new message type: | ||
// + Make sure you have implemented routing logic for it in KafkaSinkCluster | ||
// + Make sure any fields referring to internal cluster details such as broker ids or addresses are rewritten to refer to shotover nodes | ||
// * When adding a new message type version: | ||
// + Make sure any new fields do not break any of the requirements listed above | ||
pub(crate) fn api_version_support(api_key: i16) -> Option<VersionRange> { | ||
match ApiKey::try_from(api_key) { | ||
Ok(ApiKey::ProduceKey) => Some(VersionRange { min: 0, max: 11 }), | ||
Ok(ApiKey::FetchKey) => Some(VersionRange { min: 0, max: 16 }), | ||
Ok(ApiKey::ListOffsetsKey) => Some(VersionRange { min: 0, max: 8 }), | ||
Ok(ApiKey::MetadataKey) => Some(VersionRange { min: 0, max: 12 }), | ||
Ok(ApiKey::OffsetCommitKey) => Some(VersionRange { min: 0, max: 9 }), | ||
Ok(ApiKey::OffsetFetchKey) => Some(VersionRange { min: 0, max: 9 }), | ||
Ok(ApiKey::FindCoordinatorKey) => Some(VersionRange { min: 0, max: 5 }), | ||
Ok(ApiKey::JoinGroupKey) => Some(VersionRange { min: 0, max: 9 }), | ||
Ok(ApiKey::HeartbeatKey) => Some(VersionRange { min: 0, max: 4 }), | ||
Ok(ApiKey::LeaveGroupKey) => Some(VersionRange { min: 0, max: 5 }), | ||
Ok(ApiKey::SyncGroupKey) => Some(VersionRange { min: 0, max: 5 }), | ||
Ok(ApiKey::DescribeGroupsKey) => Some(VersionRange { min: 0, max: 5 }), | ||
Ok(ApiKey::ListGroupsKey) => Some(VersionRange { min: 0, max: 5 }), | ||
Ok(ApiKey::SaslHandshakeKey) => Some(VersionRange { min: 0, max: 1 }), | ||
Ok(ApiKey::ApiVersionsKey) => Some(VersionRange { min: 0, max: 3 }), | ||
Ok(ApiKey::CreateTopicsKey) => Some(VersionRange { min: 0, max: 7 }), | ||
Ok(ApiKey::DeleteTopicsKey) => Some(VersionRange { min: 0, max: 6 }), | ||
Ok(ApiKey::DeleteRecordsKey) => Some(VersionRange { min: 0, max: 2 }), | ||
Ok(ApiKey::InitProducerIdKey) => Some(VersionRange { min: 0, max: 5 }), | ||
Ok(ApiKey::OffsetForLeaderEpochKey) => Some(VersionRange { min: 0, max: 4 }), | ||
Ok(ApiKey::AddPartitionsToTxnKey) => Some(VersionRange { min: 0, max: 5 }), | ||
Ok(ApiKey::AddOffsetsToTxnKey) => Some(VersionRange { min: 0, max: 4 }), | ||
Ok(ApiKey::EndTxnKey) => Some(VersionRange { min: 0, max: 4 }), | ||
Ok(ApiKey::TxnOffsetCommitKey) => Some(VersionRange { min: 0, max: 4 }), | ||
Ok(ApiKey::CreateAclsKey) => Some(VersionRange { min: 0, max: 3 }), | ||
Ok(ApiKey::DescribeConfigsKey) => Some(VersionRange { min: 0, max: 4 }), | ||
Ok(ApiKey::AlterConfigsKey) => Some(VersionRange { min: 0, max: 2 }), | ||
Ok(ApiKey::DescribeLogDirsKey) => Some(VersionRange { min: 0, max: 4 }), | ||
Ok(ApiKey::SaslAuthenticateKey) => Some(VersionRange { min: 0, max: 2 }), | ||
Ok(ApiKey::CreatePartitionsKey) => Some(VersionRange { min: 0, max: 3 }), | ||
Ok(ApiKey::DeleteGroupsKey) => Some(VersionRange { min: 0, max: 2 }), | ||
Ok(ApiKey::ElectLeadersKey) => Some(VersionRange { min: 0, max: 2 }), | ||
Ok(ApiKey::AlterPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), | ||
Ok(ApiKey::ListPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), | ||
Ok(ApiKey::OffsetDeleteKey) => Some(VersionRange { min: 0, max: 0 }), | ||
Ok(ApiKey::AlterPartitionKey) => Some(VersionRange { min: 0, max: 3 }), | ||
Ok(ApiKey::DescribeClusterKey) => Some(VersionRange { min: 0, max: 1 }), | ||
Ok(ApiKey::DescribeProducersKey) => Some(VersionRange { min: 0, max: 0 }), | ||
Ok(ApiKey::DescribeTransactionsKey) => Some(VersionRange { min: 0, max: 0 }), | ||
Ok(ApiKey::ListTransactionsKey) => Some(VersionRange { min: 0, max: 1 }), | ||
Ok(ApiKey::ConsumerGroupHeartbeatKey) => Some(VersionRange { min: 0, max: 0 }), | ||
Ok(ApiKey::ConsumerGroupDescribeKey) => Some(VersionRange { min: 0, max: 0 }), | ||
// This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION | ||
// So its not clear at all how to implement this and its not even possible to test it. | ||
// Instead lets just ask the client to not send it at all. | ||
// We can consider supporting it when kafka itself starts to support it but we will need to be very | ||
// careful to correctly implement the pagination/cursor logic. | ||
Ok(ApiKey::DescribeTopicPartitionsKey) => None, | ||
Ok(_) => None, | ||
Err(_) => None, | ||
} | ||
} | ||
|
||
// This test gives visibility into the api versions that shotover doesnt support yet. | ||
// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions. | ||
// The actual upgrade can be done later. | ||
#[test] | ||
fn check_api_version_backlog() { | ||
use std::fmt::Write; | ||
const EXPECTED_ERROR_MESSAGE: &str = r#" | ||
LeaderAndIsrKey kafka-protocol=0..7 shotover=NotSupported | ||
StopReplicaKey kafka-protocol=0..4 shotover=NotSupported | ||
UpdateMetadataKey kafka-protocol=0..8 shotover=NotSupported | ||
ControlledShutdownKey kafka-protocol=0..3 shotover=NotSupported | ||
WriteTxnMarkersKey kafka-protocol=0..1 shotover=NotSupported | ||
DescribeAclsKey kafka-protocol=0..3 shotover=NotSupported | ||
DeleteAclsKey kafka-protocol=0..3 shotover=NotSupported | ||
AlterReplicaLogDirsKey kafka-protocol=0..2 shotover=NotSupported | ||
CreateDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported | ||
RenewDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported | ||
ExpireDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported | ||
DescribeDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported | ||
IncrementalAlterConfigsKey kafka-protocol=0..1 shotover=NotSupported | ||
DescribeClientQuotasKey kafka-protocol=0..1 shotover=NotSupported | ||
AlterClientQuotasKey kafka-protocol=0..1 shotover=NotSupported | ||
DescribeUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported | ||
AlterUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported | ||
VoteKey kafka-protocol=0..0 shotover=NotSupported | ||
BeginQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported | ||
EndQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported | ||
DescribeQuorumKey kafka-protocol=0..1 shotover=NotSupported | ||
UpdateFeaturesKey kafka-protocol=0..1 shotover=NotSupported | ||
EnvelopeKey kafka-protocol=0..0 shotover=NotSupported | ||
FetchSnapshotKey kafka-protocol=0..0 shotover=NotSupported | ||
BrokerRegistrationKey kafka-protocol=0..3 shotover=NotSupported | ||
BrokerHeartbeatKey kafka-protocol=0..1 shotover=NotSupported | ||
UnregisterBrokerKey kafka-protocol=0..0 shotover=NotSupported | ||
AllocateProducerIdsKey kafka-protocol=0..0 shotover=NotSupported | ||
ControllerRegistrationKey kafka-protocol=0..0 shotover=NotSupported | ||
GetTelemetrySubscriptionsKey kafka-protocol=0..0 shotover=NotSupported | ||
PushTelemetryKey kafka-protocol=0..0 shotover=NotSupported | ||
AssignReplicasToDirsKey kafka-protocol=0..0 shotover=NotSupported | ||
ListClientMetricsResourcesKey kafka-protocol=0..0 shotover=NotSupported | ||
DescribeTopicPartitionsKey kafka-protocol=0..0 shotover=NotSupported | ||
"#; | ||
|
||
let mut error_message = String::new(); | ||
for api_key in ApiKey::iterate_all() { | ||
let shotover_version = api_version_support(api_key as i16); | ||
|
||
let kafka_protocol_version = api_key.valid_versions(); | ||
if shotover_version != Some(kafka_protocol_version) { | ||
let shotover_version = match shotover_version { | ||
Some(version) => format!("{version}"), | ||
None => "NotSupported".to_owned(), | ||
}; | ||
writeln!( | ||
error_message, | ||
"{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}" | ||
) | ||
.unwrap(); | ||
} | ||
} | ||
|
||
pretty_assertions::assert_eq!( | ||
error_message.trim(), | ||
EXPECTED_ERROR_MESSAGE.trim(), | ||
"The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE", | ||
); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
pub(crate) mod api_versions; | ||
pub mod sink_cluster; | ||
pub mod sink_single; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters