From 0822de2786b2e21683c43cea592822d3478bbc39 Mon Sep 17 00:00:00 2001 From: Thomas CAI <92149044+ThomasCAI-mlv@users.noreply.github.com> Date: Thu, 2 Jan 2025 19:53:08 +0100 Subject: [PATCH] Make all AdminClient timeouts configurable (#498) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add acl synchronisation timeout parameter * Handle all AdminClient timeouts * Fix default property * Add config test * Fix checkstyle * Fix Sonar * Fix README --------- Co-authored-by: Loïc Greffier --- README.md | 43 +++++++----- .../property/ManagedClusterProperties.java | 70 ++++++++++++++++--- .../AccessControlEntryAsyncExecutor.java | 52 +++++++++----- .../executor/ConsumerGroupAsyncExecutor.java | 29 ++++++-- .../service/executor/TopicAsyncExecutor.java | 63 ++++++++--------- .../service/executor/UserAsyncExecutor.java | 59 ++++++++++------ .../integration/ConfigIntegrationTest.java | 37 ++++++++++ .../executor/TopicAsyncExecutorTest.java | 27 +++++++ src/test/resources/application-test.yml | 21 +++++- 9 files changed, 294 insertions(+), 107 deletions(-) create mode 100644 src/test/java/com/michelin/ns4kafka/integration/ConfigIntegrationTest.java diff --git a/README.md b/README.md index b536281a..beaa875f 100644 --- a/README.md +++ b/README.md @@ -323,22 +323,33 @@ ns4kafka: The name for each managed cluster has to be unique. This is this name you have to set in the field **metadata.cluster** of your namespace descriptors. -| Property | type | description | -|-----------------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------| -| manage-users | boolean | Does the cluster manages users ? | -| manage-acls | boolean | Does the cluster manages access control entries ? | -| manage-topics | boolean | Does the cluster manages topics ? | -| manage-connectors | boolean | Does the cluster manages connects ? | -| drop-unsync-acls | boolean | Should Ns4Kafka drop unsynchronized ACLs | -| provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD | -| config.bootstrap.servers | string | The location of the clusters servers | -| config.cluster.id | string | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). In this case, [Stream Catalog properties](#stream-catalog) must be set. | -| schema-registry.url | string | The location of the Schema Registry | -| schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry | -| schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry | -| connects.connect-name.url | string | The location of the kafka connect | -| connects.connect-name.basicAuthUsername | string | Basic authentication username to the Kafka Connect | -| connects.connect-name.basicAuthPassword | string | Basic authentication password to the Kafka Connect | +| Property | Type | Required | Description | +|--------------------------------------|---------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| manage-acls | boolean | No | Does the cluster manages access control entries (Default: false) | +| manage-connectors | boolean | No | Does the cluster manages connects (Default: false) | +| manage-topics | boolean | No | Does the cluster manages topics (Default: false) | +| manage-users | boolean | No | Does the cluster manages users (Default: false) | +| drop-unsync-acls | boolean | No | Should unsynchronized acls be dropped (Default: true) | +| timeout.acl.create | int | No | The timeout in milliseconds used by the AdminClient to create acls (Default: 30000ms) | +| timeout.acl.describe | int | No | The timeout in milliseconds used by the AdminClient to describe acls (Default: 30000ms) | +| timeout.acl.delete | int | No | The timeout in milliseconds used by the AdminClient to delete acls (Default: 30000ms) | +| timeout.topic.alter-configs | int | No | The timeout in milliseconds used by the AdminClient to alter topic configs (Default: 30000ms) | +| timeout.topic.create | int | No | The timeout in milliseconds used by the AdminClient to create topics (Default: 30000ms) | +| timeout.topic.describe-configs | int | No | The timeout in milliseconds used by the AdminClient to describe topic configs (Default: 30000ms) | +| timeout.topic.delete | int | No | The timeout in milliseconds used by the AdminClient to delete topics (Default: 30000ms) | +| timeout.topic.list | int | No | The timeout in milliseconds used by the AdminClient to list topics (Default: 30000ms) | +| timeout.user.alter-quotas | int | No | The timeout in milliseconds used by the AdminClient to alter client quotas (Default: 30000ms) | +| timeout.user.alter-scram-credentials | int | No | The timeout in milliseconds used by the AdminClient to alter scram credentials (Default: 30000ms) | +| timeout.user.describe-quotas | int | No | The timeout in milliseconds used by the AdminClient to describe client quotas (Default: 30000ms) | +| provider | boolean | Yes | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD | +| config.bootstrap.servers | string | Yes | The location of the clusters servers | +| config.cluster.id | string | No | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). In this case, [Stream Catalog properties](#stream-catalog) must be set. | +| schema-registry.url | string | No | The location of the Schema Registry | +| schema-registry.basicAuthUsername | string | No | Basic authentication username to the Schema Registry | +| schema-registry.basicAuthPassword | string | No | Basic authentication password to the Schema Registry | +| connects..url | string | No | The location of the kafka connect | +| connects..basicAuthUsername | string | No | Basic authentication username to the Kafka Connect | +| connects..basicAuthPassword | string | No | Basic authentication password to the Kafka Connect | The configuration will depend on the authentication method selected for your broker, schema registry and Kafka Connect. diff --git a/src/main/java/com/michelin/ns4kafka/property/ManagedClusterProperties.java b/src/main/java/com/michelin/ns4kafka/property/ManagedClusterProperties.java index 5e6ba373..12e99f74 100644 --- a/src/main/java/com/michelin/ns4kafka/property/ManagedClusterProperties.java +++ b/src/main/java/com/michelin/ns4kafka/property/ManagedClusterProperties.java @@ -18,11 +18,12 @@ @EachProperty("ns4kafka.managed-clusters") public class ManagedClusterProperties { private String name; - private boolean manageTopics; private boolean manageAcls; - private boolean dropUnsyncAcls = true; - private boolean manageUsers; private boolean manageConnectors; + private boolean manageTopics; + private boolean manageUsers; + private boolean dropUnsyncAcls = true; + private TimeoutProperties timeout = new TimeoutProperties(); private KafkaProvider provider; private Properties config; private Map connects; @@ -66,9 +67,9 @@ public enum KafkaProvider { @Setter @Introspected public static class ConnectProperties { - String url; - String basicAuthUsername; - String basicAuthPassword; + private String url; + private String basicAuthUsername; + private String basicAuthPassword; } /** @@ -78,9 +79,60 @@ public static class ConnectProperties { @Setter @ConfigurationProperties("schema-registry") public static class SchemaRegistryProperties { - String url; - String basicAuthUsername; - String basicAuthPassword; + private String url; + private String basicAuthUsername; + private String basicAuthPassword; + } + + /** + * Timeout properties. + */ + @Getter + @Setter + @ConfigurationProperties("timeout") + public static class TimeoutProperties { + private static final int DEFAULT_TIMEOUT_MS = 30000; + private AclProperties acl = new AclProperties(); + private TopicProperties topic = new TopicProperties(); + private UserProperties user = new UserProperties(); + + /** + * ACL properties. + */ + @Getter + @Setter + @ConfigurationProperties("acl") + public static class AclProperties { + private int describe = DEFAULT_TIMEOUT_MS; + private int create = DEFAULT_TIMEOUT_MS; + private int delete = DEFAULT_TIMEOUT_MS; + } + + /** + * Topic properties. + */ + @Getter + @Setter + @ConfigurationProperties("topic") + public static class TopicProperties { + private int alterConfigs = DEFAULT_TIMEOUT_MS; + private int create = DEFAULT_TIMEOUT_MS; + private int describeConfigs = DEFAULT_TIMEOUT_MS; + private int delete = DEFAULT_TIMEOUT_MS; + private int list = DEFAULT_TIMEOUT_MS; + } + + /** + * User properties. + */ + @Getter + @Setter + @ConfigurationProperties("user") + public static class UserProperties { + private int alterQuotas = DEFAULT_TIMEOUT_MS; + private int alterScramCredentials = DEFAULT_TIMEOUT_MS; + private int describeQuotas = DEFAULT_TIMEOUT_MS; + } } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java index d2a69839..bb50f8d5 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java @@ -174,20 +174,25 @@ private List collectNs4KafkaAcls() { */ private List collectBrokerAcls(boolean managedUsersOnly) throws ExecutionException, InterruptedException, TimeoutException { - List validResourceTypes = - List.of(org.apache.kafka.common.resource.ResourceType.TOPIC, - org.apache.kafka.common.resource.ResourceType.GROUP, - org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID); + List validResourceTypes = List.of( + org.apache.kafka.common.resource.ResourceType.TOPIC, + org.apache.kafka.common.resource.ResourceType.GROUP, + org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID + ); AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter( managedClusterProperties.getProvider() .equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD) ? "UserV2:*" : null, - null, AclOperation.ANY, AclPermissionType.ANY); + null, + AclOperation.ANY, + AclPermissionType.ANY + ); AclBindingFilter aclBindingFilter = new AclBindingFilter(ResourcePatternFilter.ANY, accessControlEntryFilter); List userAcls = getAdminClient() .describeAcls(aclBindingFilter) - .values().get(10, TimeUnit.SECONDS) + .values() + .get(managedClusterProperties.getTimeout().getAcl().getDescribe(), TimeUnit.MILLISECONDS) .stream() .filter(aclBinding -> validResourceTypes.contains(aclBinding.pattern().resourceType())) .toList(); @@ -320,7 +325,8 @@ private AclBinding convertConnectorAccessControlEntryToAclBinding(AccessControlE ResourcePattern resourcePattern = new ResourcePattern( org.apache.kafka.common.resource.ResourceType.GROUP, "connect-" + accessControlEntry.getSpec().getResource(), - patternType); + patternType + ); String kafkaUser = namespaceRepository.findByName(accessControlEntry.getSpec().getGrantedTo()) .orElseThrow() @@ -359,20 +365,23 @@ private List computeAclOperationForOwner(ResourceType resourceType */ private void deleteAcls(List toDelete) { getAdminClient() - .deleteAcls(toDelete.stream() + .deleteAcls(toDelete + .stream() .map(AclBinding::toFilter) .toList()) - .values().forEach((key, value) -> { + .values() + .forEach((key, value) -> { try { - value.get(10, TimeUnit.SECONDS); + value.get(managedClusterProperties.getTimeout().getAcl().getDelete(), TimeUnit.MILLISECONDS); log.info("Success deleting ACL {} on {}", key, managedClusterProperties.getName()); } catch (InterruptedException e) { log.error("Error", e); Thread.currentThread().interrupt(); } catch (Exception e) { log.error( - String.format("Error while deleting ACL %s on %s", key, - managedClusterProperties.getName()), e); + String.format("Error while deleting ACL %s on %s", key, managedClusterProperties.getName()), + e + ); } }); } @@ -408,8 +417,10 @@ public void deleteAcl(AccessControlEntry accessControlEntry) { */ public void deleteKafkaStreams(Namespace namespace, KafkaStream kafkaStream) { if (managedClusterProperties.isManageAcls()) { - List results = - new ArrayList<>(buildAclBindingsFromKafkaStream(kafkaStream, namespace.getSpec().getKafkaUser())); + List results = new ArrayList<>(buildAclBindingsFromKafkaStream( + kafkaStream, + namespace.getSpec().getKafkaUser()) + ); deleteAcls(results); } } @@ -420,18 +431,21 @@ public void deleteKafkaStreams(Namespace namespace, KafkaStream kafkaStream) { * @param toCreate The list of ACLs to create */ private void createAcls(List toCreate) { - getAdminClient().createAcls(toCreate) + getAdminClient() + .createAcls(toCreate) .values() .forEach((key, value) -> { try { - value.get(10, TimeUnit.SECONDS); - log.info("Success creating ACL {} on {}", key, this.managedClusterProperties.getName()); + value.get(managedClusterProperties.getTimeout().getAcl().getCreate(), TimeUnit.MILLISECONDS); + log.info("Success creating ACL {} on {}", key, managedClusterProperties.getName()); } catch (InterruptedException e) { log.error("Error", e); Thread.currentThread().interrupt(); } catch (Exception e) { - log.error(String.format("Error while creating ACL %s on %s", key, - this.managedClusterProperties.getName()), e); + log.error( + String.format("Error while creating ACL %s on %s", key, managedClusterProperties.getName()), + e + ); } }); } diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/ConsumerGroupAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/ConsumerGroupAsyncExecutor.java index 4ec61f9d..bc94caf0 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/ConsumerGroupAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/ConsumerGroupAsyncExecutor.java @@ -37,9 +37,20 @@ private Admin getAdminClient() { return managedClusterProperties.getAdminClient(); } + /** + * Describe the consumer groups. + * + * @param groupIds The consumer group ids + * @return A map of consumer group id and consumer group description + * @throws ExecutionException Any execution exception during consumer groups description + * @throws InterruptedException Any interrupted exception during consumer groups description + */ public Map describeConsumerGroups(List groupIds) throws ExecutionException, InterruptedException { - return getAdminClient().describeConsumerGroups(groupIds).all().get(); + return getAdminClient() + .describeConsumerGroups(groupIds) + .all() + .get(); } /** @@ -52,11 +63,15 @@ public Map describeConsumerGroups(List */ public void alterConsumerGroupOffsets(String consumerGroupId, Map preparedOffsets) throws InterruptedException, ExecutionException { - getAdminClient().alterConsumerGroupOffsets(consumerGroupId, - preparedOffsets.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))) - ).all().get(); + getAdminClient() + .alterConsumerGroupOffsets( + consumerGroupId, + preparedOffsets + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())))) + .all() + .get(); log.info("Consumer group {} changed offset", consumerGroupId); if (log.isDebugEnabled()) { preparedOffsets.forEach( @@ -111,7 +126,7 @@ public Map getCommittedOffsets(String groupId) */ public List getTopicPartitions(String topicName) throws ExecutionException, InterruptedException { return getAdminClient().describeTopics(Collections.singletonList(topicName)) - .all() + .allTopicNames() .get() .get(topicName) .partitions() diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java index 4e12a88a..72b100fd 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java @@ -189,20 +189,6 @@ public void alterTags(List ns4kafkaTopics, Map brokerTopic } } - /** - * Delete a topic. - * - * @param topic The topic to delete - */ - public void deleteTopic(Topic topic) throws InterruptedException, ExecutionException, TimeoutException { - getAdminClient().deleteTopics(List.of(topic.getMetadata().getName())) - .all() - .get(30, TimeUnit.SECONDS); - - log.info("Success deleting topic {} on {}", topic.getMetadata().getName(), - managedClusterProperties.getName()); - } - /** * Delete a list of topics. * @@ -214,9 +200,10 @@ public void deleteTopics(List topics) throws InterruptedException, Execut .map(topic -> topic.getMetadata().getName()) .toList(); - getAdminClient().deleteTopics(topicsNames) + getAdminClient() + .deleteTopics(topicsNames) .all() - .get(30, TimeUnit.SECONDS); + .get(managedClusterProperties.getTimeout().getTopic().getDelete(), TimeUnit.MILLISECONDS); log.info("Success deleting topics {} on {}", String.join(", ", topicsNames), managedClusterProperties.getName()); @@ -237,8 +224,10 @@ public Map collectBrokerTopics() throws ExecutionException, Inter * @return All topic names */ public List listBrokerTopicNames() throws InterruptedException, ExecutionException, TimeoutException { - return getAdminClient().listTopics().listings() - .get(30, TimeUnit.SECONDS) + return getAdminClient() + .listTopics() + .listings() + .get(managedClusterProperties.getTimeout().getTopic().getList(), TimeUnit.MILLISECONDS) .stream() .map(TopicListing::name) .toList(); @@ -331,15 +320,18 @@ public void enrichWithCatalogInfo(Map topics) { */ public Map collectBrokerTopicsFromNames(List topicNames) throws InterruptedException, ExecutionException, TimeoutException { - Map topicDescriptions = getAdminClient().describeTopics(topicNames) - .allTopicNames().get(); + Map topicDescriptions = getAdminClient() + .describeTopics(topicNames) + .allTopicNames() + .get(); Map topics = getAdminClient() - .describeConfigs(topicNames.stream() - .map(s -> new ConfigResource(ConfigResource.Type.TOPIC, s)) + .describeConfigs(topicNames + .stream() + .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .toList()) .all() - .get(30, TimeUnit.SECONDS) + .get(managedClusterProperties.getTimeout().getTopic().getDescribeConfigs(), TimeUnit.MILLISECONDS) .entrySet() .stream() .collect(Collectors.toMap( @@ -382,12 +374,12 @@ private void alterTopics(Map> toUpdate alterConfigsResult.values().forEach((key, value) -> { Topic updatedTopic = topics .stream() - .filter(t -> t.getMetadata().getName().equals(key.name())) + .filter(topic -> topic.getMetadata().getName().equals(key.name())) .findFirst() .get(); try { - value.get(10, TimeUnit.SECONDS); + value.get(managedClusterProperties.getTimeout().getTopic().getAlterConfigs(), TimeUnit.MILLISECONDS); updatedTopic.getMetadata().setCreationTimestamp(Date.from(Instant.now())); updatedTopic.getMetadata().setGeneration(updatedTopic.getMetadata().getGeneration() + 1); updatedTopic.setStatus(Topic.TopicStatus.ofSuccess("Topic configs updated")); @@ -395,7 +387,8 @@ private void alterTopics(Map> toUpdate log.info("Success updating topic configs {} on {}: [{}]", key.name(), managedClusterProperties.getName(), - toUpdate.get(key).stream().map(AlterConfigOp::toString).collect(Collectors.joining(", "))); + toUpdate.get(key).stream().map(AlterConfigOp::toString).collect(Collectors.joining(", ")) + ); } catch (InterruptedException e) { log.error("Error", e); Thread.currentThread().interrupt(); @@ -434,7 +427,7 @@ private void createTopics(List topics) { .get(); try { - value.get(10, TimeUnit.SECONDS); + value.get(managedClusterProperties.getTimeout().getTopic().getCreate(), TimeUnit.MILLISECONDS); createdTopic.getMetadata().setCreationTimestamp(Date.from(Instant.now())); createdTopic.getMetadata().setGeneration(1); createdTopic.setStatus(Topic.TopicStatus.ofSuccess("Topic created")); @@ -575,8 +568,10 @@ private Collection computeConfigChanges(Map expec } return false; }) - .map(expectedEntry -> new AlterConfigOp(new ConfigEntry(expectedEntry.getKey(), expectedEntry.getValue()), - AlterConfigOp.OpType.SET)) + .map(expectedEntry -> new AlterConfigOp( + new ConfigEntry(expectedEntry.getKey(), expectedEntry.getValue()), + AlterConfigOp.OpType.SET) + ) .toList(); List total = new ArrayList<>(); @@ -600,7 +595,9 @@ public Map prepareRecordsToDelete(String topic) throws ExecutionException, InterruptedException { // List all partitions for topic and prepare a listOffsets call Map topicsPartitionsToDelete = - getAdminClient().describeTopics(List.of(topic)).allTopicNames().get() + getAdminClient().describeTopics(List.of(topic)) + .allTopicNames() + .get() .entrySet() .stream() .flatMap(topicDescriptionEntry -> topicDescriptionEntry.getValue().partitions().stream()) @@ -608,7 +605,10 @@ public Map prepareRecordsToDelete(String topic) .collect(Collectors.toMap(Function.identity(), v -> OffsetSpec.latest())); // list all latest offsets for each partitions - return getAdminClient().listOffsets(topicsPartitionsToDelete).all().get() + return getAdminClient() + .listOffsets(topicsPartitionsToDelete) + .all() + .get() .entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, kv -> RecordsToDelete.beforeOffset(kv.getValue().offset()))); @@ -642,6 +642,5 @@ public Map deleteRecords(Map> toUpdate = ns4kafkaUserQuotas.entrySet() .stream() .filter(entry -> brokerUserQuotas.containsKey(entry.getKey())) - .filter( - entry -> !entry.getValue().isEmpty() && !entry.getValue().equals(brokerUserQuotas.get(entry.getKey()))) + .filter(entry -> !entry.getValue().isEmpty() + && !entry.getValue().equals(brokerUserQuotas.get(entry.getKey()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!toCreate.isEmpty()) { @@ -162,13 +160,12 @@ interface AbstractUserSynchronizer { } static class Scram512UserSynchronizer implements AbstractUserSynchronizer { - private final ScramCredentialInfo info = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 4096); private final SecureRandom secureRandom = new SecureRandom(); - private final Admin admin; + private final ManagedClusterProperties managedClusterProperties; - public Scram512UserSynchronizer(Admin admin) { - this.admin = admin; + public Scram512UserSynchronizer(ManagedClusterProperties managedClusterProperties) { + this.managedClusterProperties = managedClusterProperties; } @Override @@ -187,8 +184,15 @@ public String resetPassword(String user) { secureRandom.nextBytes(randomBytes); String password = Base64.getEncoder().encodeToString(randomBytes); UserScramCredentialUpsertion update = new UserScramCredentialUpsertion(user, info, password); + try { - admin.alterUserScramCredentials(List.of(update)).all().get(10, TimeUnit.SECONDS); + managedClusterProperties.getAdminClient() + .alterUserScramCredentials(List.of(update)) + .all() + .get( + managedClusterProperties.getTimeout().getUser().getAlterScramCredentials(), + TimeUnit.MILLISECONDS + ); log.info("Success resetting password for user {}", user); } catch (InterruptedException e) { log.error("Error", e); @@ -205,7 +209,10 @@ public Map> listQuotas() { ClientQuotaFilter filter = ClientQuotaFilter.containsOnly( List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER))); try { - return admin.describeClientQuotas(filter).entities().get(10, TimeUnit.SECONDS) + return managedClusterProperties.getAdminClient() + .describeClientQuotas(filter) + .entities() + .get(managedClusterProperties.getTimeout().getUser().getDescribeQuotas(), TimeUnit.MILLISECONDS) .entrySet() .stream() .map(entry -> Map.entry(entry.getKey().entries().get(ClientQuotaEntity.USER), entry.getValue())) @@ -223,28 +230,38 @@ public Map> listQuotas() { @Override public void applyQuotas(String user, Map quotas) { ClientQuotaEntity client = new ClientQuotaEntity(Map.of("user", user)); - ClientQuotaAlteration.Op producerQuota = new ClientQuotaAlteration.Op("producer_byte_rate", - quotas.getOrDefault("producer_byte_rate", BYTE_RATE_DEFAULT_VALUE)); - ClientQuotaAlteration.Op consumerQuota = new ClientQuotaAlteration.Op("consumer_byte_rate", - quotas.getOrDefault("consumer_byte_rate", BYTE_RATE_DEFAULT_VALUE)); - ClientQuotaAlteration clientQuota = - new ClientQuotaAlteration(client, List.of(producerQuota, consumerQuota)); + + ClientQuotaAlteration.Op producerQuota = new ClientQuotaAlteration.Op( + "producer_byte_rate", + quotas.getOrDefault("producer_byte_rate", BYTE_RATE_DEFAULT_VALUE) + ); + + ClientQuotaAlteration.Op consumerQuota = new ClientQuotaAlteration.Op( + "consumer_byte_rate", + quotas.getOrDefault("consumer_byte_rate", BYTE_RATE_DEFAULT_VALUE) + ); + + ClientQuotaAlteration clientQuota = new ClientQuotaAlteration( + client, + List.of(producerQuota, consumerQuota) + ); + try { - admin.alterClientQuotas(List.of(clientQuota)).all().get(10, TimeUnit.SECONDS); + managedClusterProperties.getAdminClient() + .alterClientQuotas(List.of(clientQuota)) + .all() + .get(managedClusterProperties.getTimeout().getUser().getAlterQuotas(), TimeUnit.MILLISECONDS); log.info("Success applying quotas {} for user {}", clientQuota.ops(), user); } catch (InterruptedException e) { log.error("Error", e); Thread.currentThread().interrupt(); } catch (Exception e) { log.error(String.format("Error while applying quotas for user %s", user), e); - } - } } static class UnimplementedUserSynchronizer implements AbstractUserSynchronizer { - private final UnsupportedOperationException exception = new UnsupportedOperationException("This cluster provider doesn't support User operations."); diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConfigIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConfigIntegrationTest.java new file mode 100644 index 00000000..26163b65 --- /dev/null +++ b/src/test/java/com/michelin/ns4kafka/integration/ConfigIntegrationTest.java @@ -0,0 +1,37 @@ +package com.michelin.ns4kafka.integration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; +import com.michelin.ns4kafka.property.ManagedClusterProperties; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.util.List; +import org.junit.jupiter.api.Test; + +@MicronautTest +class ConfigIntegrationTest extends KafkaIntegrationTest { + + @Inject + List managedClusterProperties; + + @Test + void shouldHaveDefaultTimeouts() { + assertNotNull(managedClusterProperties.getFirst()); + + assertEquals(15000, managedClusterProperties.getFirst().getTimeout().getAcl().getCreate()); + assertEquals(15001, managedClusterProperties.getFirst().getTimeout().getAcl().getDelete()); + assertEquals(15002, managedClusterProperties.getFirst().getTimeout().getAcl().getDescribe()); + + assertEquals(15003, managedClusterProperties.getFirst().getTimeout().getTopic().getAlterConfigs()); + assertEquals(15004, managedClusterProperties.getFirst().getTimeout().getTopic().getCreate()); + assertEquals(15005, managedClusterProperties.getFirst().getTimeout().getTopic().getDescribeConfigs()); + assertEquals(15006, managedClusterProperties.getFirst().getTimeout().getTopic().getDelete()); + assertEquals(15007, managedClusterProperties.getFirst().getTimeout().getTopic().getList()); + + assertEquals(15008, managedClusterProperties.getFirst().getTimeout().getUser().getAlterQuotas()); + assertEquals(15009, managedClusterProperties.getFirst().getTimeout().getUser().getAlterScramCredentials()); + assertEquals(15010, managedClusterProperties.getFirst().getTimeout().getUser().getDescribeQuotas()); + } +} diff --git a/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java index 0a999173..c70f544c 100644 --- a/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java @@ -290,6 +290,15 @@ void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + ManagedClusterProperties.TimeoutProperties.TopicProperties topicProperties = new ManagedClusterProperties + .TimeoutProperties.TopicProperties(); + topicProperties.setDelete(1000); + + ManagedClusterProperties.TimeoutProperties timeoutProperties = new ManagedClusterProperties.TimeoutProperties(); + timeoutProperties.setTopic(topicProperties); + + when(managedClusterProperties.getTimeout()).thenReturn(timeoutProperties); + Topic topic = Topic.builder() .metadata(Metadata.builder() .name(TOPIC_NAME) @@ -309,6 +318,15 @@ void shouldDeleteMultipleTopics() throws ExecutionException, InterruptedExceptio when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + ManagedClusterProperties.TimeoutProperties.TopicProperties topicProperties = new ManagedClusterProperties + .TimeoutProperties.TopicProperties(); + topicProperties.setDelete(1000); + + ManagedClusterProperties.TimeoutProperties timeoutProperties = new ManagedClusterProperties.TimeoutProperties(); + timeoutProperties.setTopic(topicProperties); + + when(managedClusterProperties.getTimeout()).thenReturn(timeoutProperties); + Topic topic1 = Topic.builder() .metadata(Metadata.builder() .name("topic1") @@ -339,6 +357,15 @@ void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + ManagedClusterProperties.TimeoutProperties.TopicProperties topicProperties = new ManagedClusterProperties + .TimeoutProperties.TopicProperties(); + topicProperties.setDelete(1000); + + ManagedClusterProperties.TimeoutProperties timeoutProperties = new ManagedClusterProperties.TimeoutProperties(); + timeoutProperties.setTopic(topicProperties); + + when(managedClusterProperties.getTimeout()).thenReturn(timeoutProperties); + Topic topic = Topic.builder() .metadata(Metadata.builder() .name(TOPIC_NAME) diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index 48e9fcf3..0725322a 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -23,11 +23,26 @@ ns4kafka: - "userGroup" managed-clusters: test-cluster: - provider: SELF_MANAGED - manage-users: true manage-acls: true - manage-topics: true manage-connectors: true + manage-users: true + manage-topics: true + timeout: + acl: + create: 15000 + delete: 15001 + describe: 15002 + topic: + alter-configs: 15003 + create: 15004 + describe-configs: 15005 + delete: 15006 + list: 15007 + user: + alter-quotas: 15008 + alter-scram-credentials: 15009 + describe-quotas: 15010 + provider: SELF_MANAGED config: bootstrap.servers: "localhost:9092" # Replaced by Testcontainers sasl.mechanism: "PLAIN"