-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18399 Remove ZooKeeper from KafkaApis (6/N): CREATE_ACLS, DELETE_ACLS #18454
base: trunk
Are you sure you want to change the base?
KAFKA-18399 Remove ZooKeeper from KafkaApis (6/N): CREATE_ACLS, DELETE_ACLS #18454
Conversation
case ApiKeys.CREATE_ACLS => maybeForwardToController(request, forwardToControllerOrFail) | ||
case ApiKeys.DELETE_ACLS => maybeForwardToController(request, forwardToControllerOrFail) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert these change.
def handleCreateAcls(request: RequestChannel.Request): Unit = { | ||
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) | ||
aclApis.handleCreateAcls(request) | ||
} | ||
|
||
def handleDeleteAcls(request: RequestChannel.Request): Unit = { | ||
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) | ||
aclApis.handleDeleteAcls(request) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just throw exception within the two method.
@Test | ||
def testCreateAclWithForwarding(): Unit = { | ||
val requestBuilder = new CreateAclsRequest.Builder(new CreateAclsRequestData()) | ||
testForwardableApi(ApiKeys.CREATE_ACLS, requestBuilder) | ||
} | ||
|
||
@Test | ||
def testDeleteAclWithForwarding(): Unit = { | ||
val requestBuilder = new DeleteAclsRequest.Builder(new DeleteAclsRequestData()) | ||
testForwardableApi(ApiKeys.DELETE_ACLS, requestBuilder) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change. These tests should be retained.
@Test | ||
def testRaftShouldAlwaysForwardCreateAcls(): Unit = { | ||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) | ||
kafkaApis = createKafkaApis(raftSupport = true) | ||
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateAcls) | ||
} | ||
|
||
@Test | ||
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = { | ||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) | ||
kafkaApis = createKafkaApis(raftSupport = true) | ||
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteAcls) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
looks like very simple fix, but If just throw exception, I don't know whether it can pass the unit test. |
In zk mode, we do these operations on local but in Kraft mode we just forward requests to controller. |
@@ -2620,13 +2620,11 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
} | |||
|
|||
def handleCreateAcls(request: RequestChannel.Request): Unit = { | |||
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) | |||
aclApis.handleCreateAcls(request) | |||
throw KafkaApis.shouldNeverReceive(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be throw KafkaApis.shouldAlwaysForward(request)
instead? cc. @TaiJuWu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, I reverted it because this comment: #18454 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can throw exception here and keep #18454 (comment)
Here is an example https://github.com/apache/kafka/pull/18447/files
Sorry, throw throw KafkaApis.shouldAlwaysForward(request)
like @mingdaoy said.
} | ||
|
||
def handleDeleteAcls(request: RequestChannel.Request): Unit = { | ||
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) | ||
aclApis.handleDeleteAcls(request) | ||
throw KafkaApis.shouldNeverReceive(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be throw KafkaApis.shouldAlwaysForward(request)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, I reverted it because this comment: #18454 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
*More detailed description of your change,
Delete follow KafkaApi handler
handleCreateAcls
handleDeleteAcls
In Kraft mode, all requests are forwarded to controller and all authorization is not work so I delete some tests.
Forwarding test already cover by
testRaftShouldAlwaysForwardCreateAcls
testRaftShouldAlwaysForwardDeleteAcls
Committer Checklist (excluded from commit message)
Verify design and implementation
Verify test coverage and CI build status
Verify documentation (including upgrade notes)