Skip to content

Commit

Permalink
Make all AdminClient timeouts configurable (#498)
Browse files Browse the repository at this point in the history
* Add acl synchronisation timeout parameter

* Handle all AdminClient timeouts

* Fix default property

* Add config test

* Fix checkstyle

* Fix Sonar

* Fix README

---------

Co-authored-by: Loïc Greffier <[email protected]>
  • Loading branch information
ThomasCAI-mlv and loicgreffier authored Jan 2, 2025
1 parent b37432f commit 0822de2
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 107 deletions.
43 changes: 27 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,22 +323,33 @@ ns4kafka:
The name for each managed cluster has to be unique. This is this name you have to set in the field **metadata.cluster**
of your namespace descriptors.

| Property | type | description |
|-----------------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------|
| manage-users | boolean | Does the cluster manages users ? |
| manage-acls | boolean | Does the cluster manages access control entries ? |
| manage-topics | boolean | Does the cluster manages topics ? |
| manage-connectors | boolean | Does the cluster manages connects ? |
| drop-unsync-acls | boolean | Should Ns4Kafka drop unsynchronized ACLs |
| provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD |
| config.bootstrap.servers | string | The location of the clusters servers |
| config.cluster.id | string | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). In this case, [Stream Catalog properties](#stream-catalog) must be set. |
| schema-registry.url | string | The location of the Schema Registry |
| schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry |
| schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry |
| connects.connect-name.url | string | The location of the kafka connect |
| connects.connect-name.basicAuthUsername | string | Basic authentication username to the Kafka Connect |
| connects.connect-name.basicAuthPassword | string | Basic authentication password to the Kafka Connect |
| Property | Type | Required | Description |
|--------------------------------------|---------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| manage-acls | boolean | No | Does the cluster manages access control entries (Default: false) |
| manage-connectors | boolean | No | Does the cluster manages connects (Default: false) |
| manage-topics | boolean | No | Does the cluster manages topics (Default: false) |
| manage-users | boolean | No | Does the cluster manages users (Default: false) |
| drop-unsync-acls | boolean | No | Should unsynchronized acls be dropped (Default: true) |
| timeout.acl.create | int | No | The timeout in milliseconds used by the AdminClient to create acls (Default: 30000ms) |
| timeout.acl.describe | int | No | The timeout in milliseconds used by the AdminClient to describe acls (Default: 30000ms) |
| timeout.acl.delete | int | No | The timeout in milliseconds used by the AdminClient to delete acls (Default: 30000ms) |
| timeout.topic.alter-configs | int | No | The timeout in milliseconds used by the AdminClient to alter topic configs (Default: 30000ms) |
| timeout.topic.create | int | No | The timeout in milliseconds used by the AdminClient to create topics (Default: 30000ms) |
| timeout.topic.describe-configs | int | No | The timeout in milliseconds used by the AdminClient to describe topic configs (Default: 30000ms) |
| timeout.topic.delete | int | No | The timeout in milliseconds used by the AdminClient to delete topics (Default: 30000ms) |
| timeout.topic.list | int | No | The timeout in milliseconds used by the AdminClient to list topics (Default: 30000ms) |
| timeout.user.alter-quotas | int | No | The timeout in milliseconds used by the AdminClient to alter client quotas (Default: 30000ms) |
| timeout.user.alter-scram-credentials | int | No | The timeout in milliseconds used by the AdminClient to alter scram credentials (Default: 30000ms) |
| timeout.user.describe-quotas | int | No | The timeout in milliseconds used by the AdminClient to describe client quotas (Default: 30000ms) |
| provider | boolean | Yes | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD |
| config.bootstrap.servers | string | Yes | The location of the clusters servers |
| config.cluster.id | string | No | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). In this case, [Stream Catalog properties](#stream-catalog) must be set. |
| schema-registry.url | string | No | The location of the Schema Registry |
| schema-registry.basicAuthUsername | string | No | Basic authentication username to the Schema Registry |
| schema-registry.basicAuthPassword | string | No | Basic authentication password to the Schema Registry |
| connects.<name>.url | string | No | The location of the kafka connect |
| connects.<name>.basicAuthUsername | string | No | Basic authentication username to the Kafka Connect |
| connects.<name>.basicAuthPassword | string | No | Basic authentication password to the Kafka Connect |

The configuration will depend on the authentication method selected for your broker, schema registry and Kafka Connect.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
@EachProperty("ns4kafka.managed-clusters")
public class ManagedClusterProperties {
private String name;
private boolean manageTopics;
private boolean manageAcls;
private boolean dropUnsyncAcls = true;
private boolean manageUsers;
private boolean manageConnectors;
private boolean manageTopics;
private boolean manageUsers;
private boolean dropUnsyncAcls = true;
private TimeoutProperties timeout = new TimeoutProperties();
private KafkaProvider provider;
private Properties config;
private Map<String, ConnectProperties> connects;
Expand Down Expand Up @@ -66,9 +67,9 @@ public enum KafkaProvider {
@Setter
@Introspected
public static class ConnectProperties {
String url;
String basicAuthUsername;
String basicAuthPassword;
private String url;
private String basicAuthUsername;
private String basicAuthPassword;
}

/**
Expand All @@ -78,9 +79,60 @@ public static class ConnectProperties {
@Setter
@ConfigurationProperties("schema-registry")
public static class SchemaRegistryProperties {
String url;
String basicAuthUsername;
String basicAuthPassword;
private String url;
private String basicAuthUsername;
private String basicAuthPassword;
}

/**
* Timeout properties.
*/
@Getter
@Setter
@ConfigurationProperties("timeout")
public static class TimeoutProperties {
private static final int DEFAULT_TIMEOUT_MS = 30000;
private AclProperties acl = new AclProperties();
private TopicProperties topic = new TopicProperties();
private UserProperties user = new UserProperties();

/**
* ACL properties.
*/
@Getter
@Setter
@ConfigurationProperties("acl")
public static class AclProperties {
private int describe = DEFAULT_TIMEOUT_MS;
private int create = DEFAULT_TIMEOUT_MS;
private int delete = DEFAULT_TIMEOUT_MS;
}

/**
* Topic properties.
*/
@Getter
@Setter
@ConfigurationProperties("topic")
public static class TopicProperties {
private int alterConfigs = DEFAULT_TIMEOUT_MS;
private int create = DEFAULT_TIMEOUT_MS;
private int describeConfigs = DEFAULT_TIMEOUT_MS;
private int delete = DEFAULT_TIMEOUT_MS;
private int list = DEFAULT_TIMEOUT_MS;
}

/**
* User properties.
*/
@Getter
@Setter
@ConfigurationProperties("user")
public static class UserProperties {
private int alterQuotas = DEFAULT_TIMEOUT_MS;
private int alterScramCredentials = DEFAULT_TIMEOUT_MS;
private int describeQuotas = DEFAULT_TIMEOUT_MS;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,25 @@ private List<AclBinding> collectNs4KafkaAcls() {
*/
private List<AclBinding> collectBrokerAcls(boolean managedUsersOnly)
throws ExecutionException, InterruptedException, TimeoutException {
List<ResourceType> validResourceTypes =
List.of(org.apache.kafka.common.resource.ResourceType.TOPIC,
org.apache.kafka.common.resource.ResourceType.GROUP,
org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID);
List<ResourceType> validResourceTypes = List.of(
org.apache.kafka.common.resource.ResourceType.TOPIC,
org.apache.kafka.common.resource.ResourceType.GROUP,
org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID
);

AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter(
managedClusterProperties.getProvider()
.equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD) ? "UserV2:*" : null,
null, AclOperation.ANY, AclPermissionType.ANY);
null,
AclOperation.ANY,
AclPermissionType.ANY
);
AclBindingFilter aclBindingFilter = new AclBindingFilter(ResourcePatternFilter.ANY, accessControlEntryFilter);

List<AclBinding> userAcls = getAdminClient()
.describeAcls(aclBindingFilter)
.values().get(10, TimeUnit.SECONDS)
.values()
.get(managedClusterProperties.getTimeout().getAcl().getDescribe(), TimeUnit.MILLISECONDS)
.stream()
.filter(aclBinding -> validResourceTypes.contains(aclBinding.pattern().resourceType()))
.toList();
Expand Down Expand Up @@ -320,7 +325,8 @@ private AclBinding convertConnectorAccessControlEntryToAclBinding(AccessControlE
ResourcePattern resourcePattern = new ResourcePattern(
org.apache.kafka.common.resource.ResourceType.GROUP,
"connect-" + accessControlEntry.getSpec().getResource(),
patternType);
patternType
);

String kafkaUser = namespaceRepository.findByName(accessControlEntry.getSpec().getGrantedTo())
.orElseThrow()
Expand Down Expand Up @@ -359,20 +365,23 @@ private List<AclOperation> computeAclOperationForOwner(ResourceType resourceType
*/
private void deleteAcls(List<AclBinding> toDelete) {
getAdminClient()
.deleteAcls(toDelete.stream()
.deleteAcls(toDelete
.stream()
.map(AclBinding::toFilter)
.toList())
.values().forEach((key, value) -> {
.values()
.forEach((key, value) -> {
try {
value.get(10, TimeUnit.SECONDS);
value.get(managedClusterProperties.getTimeout().getAcl().getDelete(), TimeUnit.MILLISECONDS);
log.info("Success deleting ACL {} on {}", key, managedClusterProperties.getName());
} catch (InterruptedException e) {
log.error("Error", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error(
String.format("Error while deleting ACL %s on %s", key,
managedClusterProperties.getName()), e);
String.format("Error while deleting ACL %s on %s", key, managedClusterProperties.getName()),
e
);
}
});
}
Expand Down Expand Up @@ -408,8 +417,10 @@ public void deleteAcl(AccessControlEntry accessControlEntry) {
*/
public void deleteKafkaStreams(Namespace namespace, KafkaStream kafkaStream) {
if (managedClusterProperties.isManageAcls()) {
List<AclBinding> results =
new ArrayList<>(buildAclBindingsFromKafkaStream(kafkaStream, namespace.getSpec().getKafkaUser()));
List<AclBinding> results = new ArrayList<>(buildAclBindingsFromKafkaStream(
kafkaStream,
namespace.getSpec().getKafkaUser())
);
deleteAcls(results);
}
}
Expand All @@ -420,18 +431,21 @@ public void deleteKafkaStreams(Namespace namespace, KafkaStream kafkaStream) {
* @param toCreate The list of ACLs to create
*/
private void createAcls(List<AclBinding> toCreate) {
getAdminClient().createAcls(toCreate)
getAdminClient()
.createAcls(toCreate)
.values()
.forEach((key, value) -> {
try {
value.get(10, TimeUnit.SECONDS);
log.info("Success creating ACL {} on {}", key, this.managedClusterProperties.getName());
value.get(managedClusterProperties.getTimeout().getAcl().getCreate(), TimeUnit.MILLISECONDS);
log.info("Success creating ACL {} on {}", key, managedClusterProperties.getName());
} catch (InterruptedException e) {
log.error("Error", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error(String.format("Error while creating ACL %s on %s", key,
this.managedClusterProperties.getName()), e);
log.error(
String.format("Error while creating ACL %s on %s", key, managedClusterProperties.getName()),
e
);
}
});
}
Expand Down
Loading

0 comments on commit 0822de2

Please sign in to comment.