From c7654f773121f7c9b5c5ca7ecc019ff30a2d3771 Mon Sep 17 00:00:00 2001 From: TaiJuWu Date: Wed, 15 Jan 2025 04:53:03 +0800 Subject: [PATCH] KAFKA-18399 Remove ZooKeeper from KafkaApis (8/N): ELECT_LEADERS , ALTER_PARTITION, UPDATE_FEATURES (#18453) Reviewers: Chia-Ping Tsai --- .../main/scala/kafka/server/KafkaApis.scala | 118 ------------------ .../unit/kafka/server/KafkaApisTest.scala | 27 ---- 2 files changed, 145 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 085df39c8865b..136abf1f42587 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -37,7 +37,6 @@ import org.apache.kafka.common.internals.{FatalExitError, Topic} import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection} import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult} -import org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, ReplicaElectionResult} import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} @@ -217,7 +216,6 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request) case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request) case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => forwardToController(request) - case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request) case ApiKeys.UPDATE_FEATURES => forwardToController(request) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) @@ -2399,77 +2397,6 @@ class KafkaApis(val requestChannel: RequestChannel, true } - def handleElectLeaders(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val electionRequest = request.body[ElectLeadersRequest] - - def sendResponseCallback( - error: ApiError - )( - results: Map[TopicPartition, ApiError] - ): Unit = { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - val adjustedResults = if (electionRequest.data.topicPartitions == null) { - /* When performing elections across all of the partitions we should only return - * partitions for which there was an election or resulted in an error. In other - * words, partitions that didn't need election because they ready have the correct - * leader are not returned to the client. - */ - results.filter { case (_, error) => - error.error != Errors.ELECTION_NOT_NEEDED - } - } else results - - val electionResults = new util.ArrayList[ReplicaElectionResult]() - adjustedResults - .groupBy { case (tp, _) => tp.topic } - .foreachEntry { (topic, ps) => - val electionResult = new ReplicaElectionResult() - - electionResult.setTopic(topic) - ps.foreachEntry { (topicPartition, error) => - val partitionResult = new PartitionResult() - partitionResult.setPartitionId(topicPartition.partition) - partitionResult.setErrorCode(error.error.code) - partitionResult.setErrorMessage(error.message) - electionResult.partitionResult.add(partitionResult) - } - - electionResults.add(electionResult) - } - - new ElectLeadersResponse( - requestThrottleMs, - error.error.code, - electionResults, - electionRequest.version - ) - }) - } - - if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { - val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null) - val partitionErrors: Map[TopicPartition, ApiError] = - electionRequest.topicPartitions.asScala.iterator.map(partition => partition -> error).toMap - - sendResponseCallback(error)(partitionErrors) - } else { - val partitions = if (electionRequest.data.topicPartitions == null) { - metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions) - } else { - electionRequest.topicPartitions.asScala - } - - replicaManager.electLeaders( - zkSupport.controller, - partitions, - electionRequest.electionType, - sendResponseCallback(ApiError.NONE), - electionRequest.data.timeoutMs - ) - } - } - def handleOffsetDeleteRequest( request: RequestChannel.Request, requestLocal: RequestLocal @@ -2628,51 +2555,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) - val alterPartitionRequest = request.body[AlterPartitionRequest] - authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) - - if (!zkSupport.controller.isActive) - requestHelper.sendResponseExemptThrottle(request, alterPartitionRequest.getErrorResponse( - AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.NOT_CONTROLLER.exception)) - else - zkSupport.controller.alterPartitions(alterPartitionRequest.data, request.context.apiVersion, alterPartitionResp => - requestHelper.sendResponseExemptThrottle(request, new AlterPartitionResponse(alterPartitionResp))) - } - - def handleUpdateFeatures(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val updateFeaturesRequest = request.body[UpdateFeaturesRequest] - - def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): Unit = { - def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = { - errors match { - case Left(topLevelError) => - UpdateFeaturesResponse.createWithErrors( - topLevelError, - Collections.emptySet(), - throttleTimeMs) - case Right(featureUpdateErrors) => - // This response is not correct, but since this is ZK specific code it will be removed in 4.0 - UpdateFeaturesResponse.createWithErrors( - ApiError.NONE, - featureUpdateErrors.asJava.keySet(), - throttleTimeMs) - } - } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createResponse(requestThrottleMs)) - } - - if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { - sendResponseCallback(Left(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED))) - } else if (!zkSupport.controller.isActive) { - sendResponseCallback(Left(new ApiError(Errors.NOT_CONTROLLER))) - } else { - zkSupport.controller.updateFeatures(updateFeaturesRequest, sendResponseCallback) - } - } - def handleDescribeCluster(request: RequestChannel.Request): Unit = { val response = authHelper.computeDescribeClusterResponse( request, diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index cdc77a8213e9d..8b91603dbc851 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -9932,25 +9932,12 @@ class KafkaApisTest extends Logging { request } - private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = { - val request = createMockRequest() - val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request)) - assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) - } - private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = { val request = createMockRequest() val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request)) assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage) } - @Test - def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest) - } - @Test def testRaftShouldAlwaysForwardCreateAcls(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) @@ -10048,20 +10035,6 @@ class KafkaApisTest extends Logging { verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest) } - @Test - def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures) - } - - @Test - def testRaftShouldAlwaysForwardElectLeaders(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders) - } - @Test def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")