From e954b22687bf175b25a712dfef17ba27efb80197 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 9 Nov 2023 12:12:05 +0000 Subject: [PATCH 1/4] Update session settings --- .../impl/CoordinationSessionImpl.java | 2 +- .../settings/CoordinationSessionSettings.java | 29 ++++++++----------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java index a51f17389..dd4a05171 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java @@ -48,7 +48,7 @@ public static CompletableFuture newSession(CoordinationRpc final CoordinationRetryableStreamImpl stream = new CoordinationRetryableStreamImpl(rpc, executor, nodePath); final CoordinationSessionImpl session = new CoordinationSessionImpl(stream); final CompletableFuture sessionStartFuture = CompletableFuture.completedFuture(session); - return session.start(settings.getCreateTimeout()) + return session.start(settings.getConnectTimeout()) .thenAccept(session.sessionId::set) .thenCompose(ignored -> sessionStartFuture); } diff --git a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java index e3e4e67d6..095797fc2 100644 --- a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java +++ b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java @@ -8,26 +8,26 @@ * @author Aleksandr Gorshenin */ public class CoordinationSessionSettings { - private final Duration createTimeout; private final Executor executor; - private final int retriesCount; + private final Duration connectTimeout; + private final Duration reconnectBackoffDelay; private CoordinationSessionSettings(Builder builder) { - this.createTimeout = builder.createTimeout; + this.connectTimeout = builder.connectTimeout; this.executor = builder.executor; - this.retriesCount = builder.retriesCount; + this.reconnectBackoffDelay = builder.reconnectBackoffDelay; } - public Duration getCreateTimeout() { - return createTimeout; + public Duration getConnectTimeout() { + return connectTimeout; } public Executor getExecutor() { return executor; } - public int getRetriesCount() { - return retriesCount; + public Duration getReconnectBackoffDelay() { + return reconnectBackoffDelay; } public static Builder newBuilder() { @@ -35,22 +35,17 @@ public static Builder newBuilder() { } public static class Builder { - private Duration createTimeout = Duration.ofSeconds(5); private Executor executor = null; - private int retriesCount = 3; - - public Builder withCreateTimeout(Duration duration) { - this.createTimeout = duration; - return this; - } + private Duration connectTimeout = Duration.ofSeconds(5); + private Duration reconnectBackoffDelay = Duration.ofMillis(250); public Builder withExecutor(Executor executor) { this.executor = executor; return this; } - public Builder withRetriesCount(int retriesCount) { - this.retriesCount = retriesCount; + public Builder withConnectTimeout(Duration timeout) { + this.connectTimeout = timeout; return this; } From 5e9e8840d71ac492ba1428606c59e47a19def958 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 22 Nov 2023 10:58:08 +0000 Subject: [PATCH 2/4] Update CoordinationNodeSettings --- .../impl/CoordinationClientImpl.java | 125 ++++++++---------- .../settings/CoordinationNodeSettings.java | 118 +++++------------ .../settings/NodeConsistenteMode.java | 14 ++ .../settings/NodeRateLimiterCountersMode.java | 14 ++ 4 files changed, 120 insertions(+), 151 deletions(-) create mode 100644 coordination/src/main/java/tech/ydb/coordination/settings/NodeConsistenteMode.java create mode 100644 coordination/src/main/java/tech/ydb/coordination/settings/NodeRateLimiterCountersMode.java diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java index 3f1f0ba44..691f46d0f 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java @@ -9,7 +9,10 @@ import tech.ydb.coordination.settings.CoordinationSessionSettings; import tech.ydb.coordination.settings.DescribeCoordinationNodeSettings; import tech.ydb.coordination.settings.DropCoordinationNodeSettings; +import tech.ydb.coordination.settings.NodeConsistenteMode; +import tech.ydb.coordination.settings.NodeRateLimiterCountersMode; import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.operation.OperationUtils; import tech.ydb.proto.coordination.AlterNodeRequest; import tech.ydb.proto.coordination.Config; @@ -36,61 +39,49 @@ public CompletableFuture createSession(String path, Coordin } @Override - public CompletableFuture createNode( - String path, - CoordinationNodeSettings coordinationNodeSettings - ) { - return coordinationRpc.createNode( - CreateNodeRequest.newBuilder() - .setPath(path) - .setOperationParams(OperationUtils.createParams(coordinationNodeSettings)) - .setConfig(createConfig(coordinationNodeSettings)) - .build(), - OperationUtils.createGrpcRequestSettings(coordinationNodeSettings) - ); + public CompletableFuture createNode(String path, CoordinationNodeSettings settings) { + CreateNodeRequest request = CreateNodeRequest.newBuilder() + .setPath(path) + .setOperationParams(OperationUtils.createParams(settings)) + .setConfig(createConfig(settings)) + .build(); + + GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + return coordinationRpc.createNode(request, grpcSettings); } @Override - public CompletableFuture alterNode( - String path, - CoordinationNodeSettings coordinationNodeSettings - ) { - return coordinationRpc.alterNode( - AlterNodeRequest.newBuilder() - .setPath(path) - .setOperationParams(OperationUtils.createParams(coordinationNodeSettings)) - .setConfig(createConfig(coordinationNodeSettings)) - .build(), - OperationUtils.createGrpcRequestSettings(coordinationNodeSettings) - ); + public CompletableFuture alterNode(String path, CoordinationNodeSettings settings) { + AlterNodeRequest request = AlterNodeRequest.newBuilder() + .setPath(path) + .setOperationParams(OperationUtils.createParams(settings)) + .setConfig(createConfig(settings)) + .build(); + + GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + return coordinationRpc.alterNode(request, grpcSettings); } @Override - public CompletableFuture dropNode( - String path, - DropCoordinationNodeSettings dropCoordinationNodeSettings - ) { - return coordinationRpc.dropNode( - DropNodeRequest.newBuilder() - .setPath(path) - .setOperationParams(OperationUtils.createParams(dropCoordinationNodeSettings)) - .build(), - OperationUtils.createGrpcRequestSettings(dropCoordinationNodeSettings) - ); + public CompletableFuture dropNode(String path, DropCoordinationNodeSettings settings) { + DropNodeRequest request = DropNodeRequest.newBuilder() + .setPath(path) + .setOperationParams(OperationUtils.createParams(settings)) + .build(); + + GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + return coordinationRpc.dropNode(request, grpcSettings); } @Override - public CompletableFuture describeNode( - String path, - DescribeCoordinationNodeSettings describeCoordinationNodeSettings - ) { - return coordinationRpc.describeNode( - DescribeNodeRequest.newBuilder() - .setPath(path) - .setOperationParams(OperationUtils.createParams(describeCoordinationNodeSettings)) - .build(), - OperationUtils.createGrpcRequestSettings(describeCoordinationNodeSettings) - ); + public CompletableFuture describeNode(String path,DescribeCoordinationNodeSettings settings) { + DescribeNodeRequest request = DescribeNodeRequest.newBuilder() + .setPath(path) + .setOperationParams(OperationUtils.createParams(settings)) + .build(); + + GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings); + return coordinationRpc.describeNode(request, grpcSettings); } @Override @@ -98,41 +89,39 @@ public String getDatabase() { return coordinationRpc.getDatabase(); } - private static ConsistencyMode toProto(CoordinationNodeSettings.ConsistencyMode consistencyMode) { - switch (consistencyMode) { - case CONSISTENCY_MODE_STRICT: + private static ConsistencyMode toProto(NodeConsistenteMode mode) { + switch (mode) { + case UNSET: + return ConsistencyMode.CONSISTENCY_MODE_UNSET; + case STRICT: return ConsistencyMode.CONSISTENCY_MODE_STRICT; - case CONSISTENCY_MODE_RELAXED: + case RELAXED: return ConsistencyMode.CONSISTENCY_MODE_RELAXED; default: - throw new RuntimeException("Unknown consistency mode: " + consistencyMode); + throw new RuntimeException("Unknown consistency mode: " + mode); } } - private static RateLimiterCountersMode toProto( - CoordinationNodeSettings.RateLimiterCountersMode rateLimiterCountersMode - ) { - switch (rateLimiterCountersMode) { - case RATE_LIMITER_COUNTERS_MODE_DETAILED: + private static RateLimiterCountersMode toProto(NodeRateLimiterCountersMode mode) { + switch (mode) { + case UNSET: + return RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_UNSET; + case DETAILED: return RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_DETAILED; - case RATE_LIMITER_COUNTERS_MODE_AGGREGATED: + case AGGREGATED: return RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_AGGREGATED; default: - throw new RuntimeException("Unknown rate limiter counters mode: " + rateLimiterCountersMode); + throw new RuntimeException("Unknown rate limiter counters mode: " + mode); } } private static Config createConfig(CoordinationNodeSettings coordinationNodeSettings) { - Config.Builder configBuilder = Config.newBuilder() - .setSelfCheckPeriodMillis(coordinationNodeSettings.getSelfCheckPeriodMillis()) - .setSessionGracePeriodMillis(coordinationNodeSettings.getSessionGracePeriodMillis()) + return Config.newBuilder() + .setSelfCheckPeriodMillis((int) coordinationNodeSettings.getSelfCheckPeriod().toMillis()) + .setSessionGracePeriodMillis((int) coordinationNodeSettings.getSessionGracePeriod().toMillis()) .setReadConsistencyMode(toProto(coordinationNodeSettings.getReadConsistencyMode())) - .setAttachConsistencyMode(toProto(coordinationNodeSettings.getAttachConsistencyMode())); - - if (coordinationNodeSettings.getRateLimiterCountersMode() != null) { - configBuilder.setRateLimiterCountersMode(toProto(coordinationNodeSettings.getRateLimiterCountersMode())); - } - - return configBuilder.build(); + .setAttachConsistencyMode(toProto(coordinationNodeSettings.getAttachConsistencyMode())) + .setRateLimiterCountersMode(toProto(coordinationNodeSettings.getRateLimiterCountersMode())) + .build(); } } diff --git a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java index d03d8c5a9..9cddf3f46 100644 --- a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java +++ b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java @@ -1,5 +1,7 @@ package tech.ydb.coordination.settings; +import java.time.Duration; + import com.google.common.base.Preconditions; import tech.ydb.core.settings.OperationSettings; @@ -8,140 +10,90 @@ * @author Kirill Kurdyukov */ public class CoordinationNodeSettings extends OperationSettings { + private final Duration selfCheckPeriod; + private final Duration sessionGracePeriod; + private final NodeConsistenteMode readConsistencyMode; + private final NodeConsistenteMode attachConsistencyMode; + private final NodeRateLimiterCountersMode rateLimiterCountersMode; - /** - * Period in milliseconds for self-checks (default 1 second) - */ - private final int selfCheckPeriodMillis; - - /** - * Grace period for sessions on leader change (default 10 seconds) - */ - private final int sessionGracePeriodMillis; - - /** - * Consistency mode for read operations - */ - private final ConsistencyMode readConsistencyMode; - - /** - * Consistency mode for attach operations - */ - private final ConsistencyMode attachConsistencyMode; - - /** - * Rate limiter counters mode - */ - private final RateLimiterCountersMode rateLimiterCountersMode; - - private CoordinationNodeSettings( - Builder builder - ) { + private CoordinationNodeSettings(Builder builder) { super(builder); Preconditions.checkArgument( - builder.selfCheckPeriodMillis < builder.sessionGracePeriodMillis, + builder.selfCheckPeriod.compareTo(builder.sessionGracePeriod) < 0, "SessionGracePeriod must be strictly more than SelfCheckPeriod" ); - this.selfCheckPeriodMillis = builder.selfCheckPeriodMillis; - this.sessionGracePeriodMillis = builder.sessionGracePeriodMillis; + this.selfCheckPeriod = builder.selfCheckPeriod; + this.sessionGracePeriod = builder.sessionGracePeriod; this.readConsistencyMode = builder.readConsistencyMode; this.attachConsistencyMode = builder.attachConsistencyMode; this.rateLimiterCountersMode = builder.rateLimiterCountersMode; } - public int getSelfCheckPeriodMillis() { - return selfCheckPeriodMillis; + public Duration getSelfCheckPeriod() { + return selfCheckPeriod; } - public int getSessionGracePeriodMillis() { - return sessionGracePeriodMillis; + public Duration getSessionGracePeriod() { + return sessionGracePeriod; } - public ConsistencyMode getReadConsistencyMode() { + public NodeConsistenteMode getReadConsistencyMode() { return readConsistencyMode; } - public ConsistencyMode getAttachConsistencyMode() { + public NodeConsistenteMode getAttachConsistencyMode() { return attachConsistencyMode; } - public RateLimiterCountersMode getRateLimiterCountersMode() { + public NodeRateLimiterCountersMode getRateLimiterCountersMode() { return rateLimiterCountersMode; } - public enum ConsistencyMode { - /** - * Strict mode makes sure operations may only complete on current leader - */ - CONSISTENCY_MODE_STRICT, - - /** - * Relaxed mode allows operations to complete on stale masters - */ - CONSISTENCY_MODE_RELAXED - } - - public enum RateLimiterCountersMode { - /** - * Aggregated counters for resource tree - */ - RATE_LIMITER_COUNTERS_MODE_AGGREGATED, - - /** - * Counters on every resource - */ - RATE_LIMITER_COUNTERS_MODE_DETAILED - } public static Builder newBuilder() { return new Builder(); } public static class Builder extends OperationSettings.OperationBuilder { - private int selfCheckPeriodMillis = 1_000; - private int sessionGracePeriodMillis = 10_000; + private Duration selfCheckPeriod = Duration.ofSeconds(1); + private Duration sessionGracePeriod = Duration.ofSeconds(10); - private ConsistencyMode readConsistencyMode = ConsistencyMode.CONSISTENCY_MODE_RELAXED; - private ConsistencyMode attachConsistencyMode = ConsistencyMode.CONSISTENCY_MODE_STRICT; + private NodeConsistenteMode readConsistencyMode = NodeConsistenteMode.UNSET; + private NodeConsistenteMode attachConsistencyMode = NodeConsistenteMode.UNSET; - private RateLimiterCountersMode rateLimiterCountersMode = null; + private NodeRateLimiterCountersMode rateLimiterCountersMode = NodeRateLimiterCountersMode.UNSET; - public Builder setSelfCheckPeriodMillis(int selfCheckPeriodMillis) { + public Builder withSelfCheckPeriod(Duration period) { Preconditions.checkArgument( - selfCheckPeriodMillis > 0, + period.isNegative() || period.isZero(), "SelfCheckPeriod must be strictly greater than zero" ); - this.selfCheckPeriodMillis = selfCheckPeriodMillis; - + this.selfCheckPeriod = period; return this; } - public Builder setSessionGracePeriodMillis(int sessionGracePeriodMillis) { + public Builder withSessionGracePeriod(Duration period) { Preconditions.checkArgument( - sessionGracePeriodMillis > 0, + period.isNegative() || period.isZero(), "SessionGracePeriod must be strictly greater than zero" ); - this.sessionGracePeriodMillis = sessionGracePeriodMillis; - + this.sessionGracePeriod = period; return this; } - public Builder setReadConsistencyMode(ConsistencyMode readConsistencyMode) { - this.readConsistencyMode = readConsistencyMode; - + public Builder withReadConsistencyMode(NodeConsistenteMode mode) { + this.readConsistencyMode = mode; return this; } - public Builder setAttachConsistencyMode(ConsistencyMode attachConsistencyMode) { - this.attachConsistencyMode = attachConsistencyMode; - + public Builder withAttachConsistencyMode(NodeConsistenteMode mode) { + this.attachConsistencyMode = mode; return this; } - public Builder setRateLimiterCountersMode(RateLimiterCountersMode rateLimiterCountersMode) { - this.rateLimiterCountersMode = rateLimiterCountersMode; - + public Builder withRateLimiterCountersMode(NodeRateLimiterCountersMode mode) { + this.rateLimiterCountersMode = mode; return this; } diff --git a/coordination/src/main/java/tech/ydb/coordination/settings/NodeConsistenteMode.java b/coordination/src/main/java/tech/ydb/coordination/settings/NodeConsistenteMode.java new file mode 100644 index 000000000..e314346fc --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/settings/NodeConsistenteMode.java @@ -0,0 +1,14 @@ +package tech.ydb.coordination.settings; + +/** + * + * @author Aleksandr Gorshenin + */ +public enum NodeConsistenteMode { + /** The default or current value */ + UNSET, + /** Strict mode makes sure operations may only complete on current leader */ + STRICT, + /** Relaxed mode allows operations to complete on stale masters */ + RELAXED +} diff --git a/coordination/src/main/java/tech/ydb/coordination/settings/NodeRateLimiterCountersMode.java b/coordination/src/main/java/tech/ydb/coordination/settings/NodeRateLimiterCountersMode.java new file mode 100644 index 000000000..ca5949127 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/settings/NodeRateLimiterCountersMode.java @@ -0,0 +1,14 @@ +package tech.ydb.coordination.settings; + +/** + * + * @author Aleksandr Gorshenin + */ +public enum NodeRateLimiterCountersMode { + /** The default or current value */ + UNSET, + /** Aggregated counters for resource tree */ + AGGREGATED, + /** Counters on every resource */ + DETAILED +} From e1b8a98c42b1b8da94d71f8503c16ea51d5c51ea Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 22 Nov 2023 11:54:44 +0000 Subject: [PATCH 3/4] Update api for Session and SemaphoreLease --- .../ydb/coordination/CoordinationClient.java | 19 +++++++- .../ydb/coordination/CoordinationSession.java | 22 +++++----- .../tech/ydb/coordination/SemaphoreLease.java | 16 +++---- .../impl/CoordinationClientImpl.java | 2 +- .../impl/CoordinationSessionImpl.java | 44 ++++++------------- .../coordination/impl/SemaphoreLeaseImpl.java | 14 +++--- .../leader_election/LeaderElection.java | 26 +++++++---- .../scenario/service_discovery/Worker.java | 16 +++---- .../settings/CoordinationNodeSettings.java | 4 +- .../settings/CoordinationSessionSettings.java | 5 +++ .../ConfigurationScenarioTest.java | 14 +++--- .../coordination/CoordinationClientTest.java | 25 ++++++----- .../LeaderElectionScenarioTest.java | 8 ++-- .../ServiceDiscoveryScenarioTest.java | 5 +-- 14 files changed, 118 insertions(+), 102 deletions(-) diff --git a/coordination/src/main/java/tech/ydb/coordination/CoordinationClient.java b/coordination/src/main/java/tech/ydb/coordination/CoordinationClient.java index 16125ebff..b879d8f85 100644 --- a/coordination/src/main/java/tech/ydb/coordination/CoordinationClient.java +++ b/coordination/src/main/java/tech/ydb/coordination/CoordinationClient.java @@ -1,6 +1,8 @@ package tech.ydb.coordination; +import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; import javax.annotation.WillNotClose; @@ -10,6 +12,8 @@ import tech.ydb.coordination.settings.CoordinationSessionSettings; import tech.ydb.coordination.settings.DescribeCoordinationNodeSettings; import tech.ydb.coordination.settings.DropCoordinationNodeSettings; +import tech.ydb.coordination.settings.NodeConsistenteMode; +import tech.ydb.coordination.settings.NodeRateLimiterCountersMode; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; @@ -91,7 +95,11 @@ static CoordinationClient newClient(@WillNotClose GrpcTransport transport) { * @return future with instance of coordination session */ default CompletableFuture createSession(String path) { - return createSession(path, CoordinationSessionSettings.newBuilder().build()); + return createSession(path, CoordinationSessionSettings.newBuilder() + .withConnectTimeout(Duration.ofSeconds(5)) + .withReconnectBackoffDelay(Duration.ofMillis(250)) + .withExecutor(ForkJoinPool.commonPool()) + .build()); } /** @@ -101,7 +109,14 @@ default CompletableFuture createSession(String path) { * @return status of request */ default CompletableFuture createNode(String path) { - return createNode(path, CoordinationNodeSettings.newBuilder().build()); + return createNode(path, CoordinationNodeSettings.newBuilder() + .withSelfCheckPeriod(Duration.ofSeconds(1)) + .withSessionGracePeriod(Duration.ofSeconds(10)) + .withReadConsistencyMode(NodeConsistenteMode.RELAXED) + .withAttachConsistencyMode(NodeConsistenteMode.STRICT) + .withRateLimiterCountersMode(NodeRateLimiterCountersMode.UNSET) + .build() + ); } /** diff --git a/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java b/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java index 55368071b..f71c59613 100644 --- a/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java +++ b/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java @@ -71,7 +71,7 @@ public interface CoordinationSession extends AutoCloseable { * If there is no such a semaphore, future will complete exceptionally * with {@link tech.ydb.core.UnexpectedResultException}. */ - CompletableFuture acquireSemaphore(String name, long count, byte[] data, Duration timeout); + CompletableFuture> acquireSemaphore(String name, long count, byte[] data, Duration timeout); /** * Acquire an ephemeral semaphore. @@ -81,14 +81,14 @@ public interface CoordinationSession extends AutoCloseable { * Later requests override previous operations with the same semaphore, * e.g. to reduce acquired count, change timeout or attached data * - * @param name Name of the semaphore to acquire - * @param timeout Duration after which operation will fail if it's still waiting in the waiters queue - * @param data User-defined binary data that may be attached to the operation + * @param name Name of the semaphore to acquire + * @param exclusive Flag of exclusive acquiring + * @param timeout Duration after which operation will fail if it's still waiting in the waiters queue + * @param data User-defined binary data that may be attached to the operation * @return future with a semaphore lease object */ - CompletableFuture acquireEphemeralSemaphore(String name, byte[] data, Duration timeout); - - + CompletableFuture> acquireEphemeralSemaphore(String name, boolean exclusive, byte[] data, + Duration timeout); CompletableFuture> describeSemaphore(String name, DescribeSemaphoreMode mode); @@ -121,7 +121,7 @@ default CompletableFuture createSemaphore(String name, long limit) { * @param timeout Duration after which operation will fail if it's still waiting in the waiters queue * @return future with a semaphore lease object */ - default CompletableFuture acquireSemaphore(String name, long count, Duration timeout) { + default CompletableFuture> acquireSemaphore(String name, long count, Duration timeout) { return acquireSemaphore(name, count, null, timeout); } @@ -134,10 +134,12 @@ default CompletableFuture acquireSemaphore(String name, long cou * e.g. to reduce acquired count, change timeout or attached data * * @param name Name of the semaphore to acquire + * @param exclusive Flag of exclusive acquiring * @param timeout Duration after which operation will fail if it's still waiting in the waiters queue * @return future with a semaphore lease object */ - default CompletableFuture acquireEphemeralSemaphore(String name, Duration timeout) { - return acquireEphemeralSemaphore(name, null, timeout); + default CompletableFuture> acquireEphemeralSemaphore(String name, boolean exclusive, + Duration timeout) { + return acquireEphemeralSemaphore(name, exclusive, null, timeout); } } diff --git a/coordination/src/main/java/tech/ydb/coordination/SemaphoreLease.java b/coordination/src/main/java/tech/ydb/coordination/SemaphoreLease.java index 93e275bdd..98360bf91 100644 --- a/coordination/src/main/java/tech/ydb/coordination/SemaphoreLease.java +++ b/coordination/src/main/java/tech/ydb/coordination/SemaphoreLease.java @@ -2,23 +2,23 @@ import java.util.concurrent.CompletableFuture; -import tech.ydb.core.Status; /** * * @author Aleksandr Gorshenin */ -public interface SemaphoreLease { - - CoordinationSession getSession(); +public interface SemaphoreLease extends AutoCloseable { String getSemaphoreName(); - CompletableFuture getStatusFuture(); + CoordinationSession getSession(); + + boolean isActive(); - CompletableFuture release(); + CompletableFuture release(); - default boolean isValid() { - return !getStatusFuture().isDone(); + @Override + default void close() { + release().join(); } } diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java index 691f46d0f..b20e30166 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationClientImpl.java @@ -74,7 +74,7 @@ public CompletableFuture dropNode(String path, DropCoordinationNodeSetti } @Override - public CompletableFuture describeNode(String path,DescribeCoordinationNodeSettings settings) { + public CompletableFuture describeNode(String path, DescribeCoordinationNodeSettings settings) { DescribeNodeRequest request = DescribeNodeRequest.newBuilder() .setPath(path) .setOperationParams(OperationUtils.createParams(settings)) diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java index dd4a05171..ff7da2102 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java @@ -24,7 +24,6 @@ import tech.ydb.coordination.settings.WatchSemaphoreMode; import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; public class CoordinationSessionImpl implements CoordinationSession { private static final Logger logger = LoggerFactory.getLogger(CoordinationSession.class); @@ -69,45 +68,30 @@ public CompletableFuture createSemaphore(String semaphoreName, long limi } @Override - public CompletableFuture acquireSemaphore(String name, long count, byte[] data, Duration timeout) { - byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB; + public CompletableFuture> acquireSemaphore(String name, long count, byte[] data, + Duration timeout) { + byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB; final int reqId = lastId.getAndIncrement(); logger.trace("Send acquireSemaphore {} with count {}", name, count); - return stream.sendAcquireSemaphore(name, count, timeout, false, semaphoreData, reqId) - .thenApply(result -> { - SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name); - if (!result.getValue()) { // timeout expired - lease.completeLease(Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED)); - } - return lease; - }); + return stream.sendAcquireSemaphore(name, count, timeout, false, sepamhoreData, reqId) + .thenApply(r -> r.map(v -> new SemaphoreLeaseImpl(this, name))); } @Override - public CompletableFuture acquireEphemeralSemaphore(String name, byte[] data, Duration timeout) { - byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB; + public CompletableFuture> acquireEphemeralSemaphore(String name, boolean exclusive, + byte[] data, Duration timeout) { + byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB; final int reqId = lastId.getAndIncrement(); logger.trace("Send acquireEphemeralSemaphore {}", name); - return stream.sendAcquireSemaphore(name, -1L, timeout, true, semaphoreData, reqId) - .thenApply(result -> { - SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name); - if (!result.getValue()) { // timeout expired - lease.completeLease(Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED)); - } - return lease; - }); + long limit = exclusive ? -1L : 1L; + return stream.sendAcquireSemaphore(name, limit, timeout, true, sepamhoreData, reqId) + .thenApply(r -> r.map(v -> new SemaphoreLeaseImpl(this, name))); } - - CompletableFuture releaseSemaphore(SemaphoreLeaseImpl lease) { + CompletableFuture releaseSemaphore(String name) { final int semaphoreReleaseId = lastId.getAndIncrement(); - logger.trace("Send releaseSemaphore {}", lease.getSemaphoreName()); - return stream.sendReleaseSemaphore(lease.getSemaphoreName(), semaphoreReleaseId).thenApply(result -> { - if (result.getValue()) { - lease.completeLease(Status.SUCCESS); - } - return result.getValue(); - }); + logger.trace("Send releaseSemaphore {}", name); + return stream.sendReleaseSemaphore(name, semaphoreReleaseId).thenApply(Result::getValue); } @Override diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/SemaphoreLeaseImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/SemaphoreLeaseImpl.java index ba2f93455..ac3c4e66c 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/SemaphoreLeaseImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/SemaphoreLeaseImpl.java @@ -27,16 +27,16 @@ public String getSemaphoreName() { } @Override - public CompletableFuture getStatusFuture() { - return statusFuture; - } - - @Override - public CompletableFuture release() { - return session.releaseSemaphore(this); + public CompletableFuture release() { + return session.releaseSemaphore(name).thenApply(r -> null); } void completeLease(Status status) { statusFuture.complete(status); } + + @Override + public boolean isActive() { + return statusFuture.isDone(); + } } diff --git a/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java b/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java index f53ba20a4..9c85ca7bf 100644 --- a/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java +++ b/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java @@ -178,19 +178,27 @@ private boolean isSemaphoreExistenceException(Throwable th, Runnable callback) { private void recursiveAcquire() { session.acquireSemaphore(name, 1, data, Duration.ofHours(1)).whenComplete( - (lease, throwable) -> { - if (isSemaphoreExistenceException(throwable, this::recursiveAcquire)) { + (res, throwable) -> { + if (!isElecting.get()) { + if (res != null && res.isSuccess()) { + res.getValue().close(); + } return; } - if ((lease == null || throwable != null) && isElecting.get()) { + if (throwable != null) { recursiveAcquire(); - } else if (lease != null) { - if (!isElecting.get()) { - lease.release(); - } else { - acquireFuture.complete(lease); - } + return; + } + if (res.getStatus().getCode() == StatusCode.NOT_FOUND) { + createSemaphore(this::recursiveAcquire); + return; } + if (!res.isSuccess()) { + recursiveAcquire(); + return; + } + + acquireFuture.complete(res.getValue()); }); } diff --git a/coordination/src/main/java/tech/ydb/coordination/scenario/service_discovery/Worker.java b/coordination/src/main/java/tech/ydb/coordination/scenario/service_discovery/Worker.java index 30dc4e6ca..5fad699e6 100644 --- a/coordination/src/main/java/tech/ydb/coordination/scenario/service_discovery/Worker.java +++ b/coordination/src/main/java/tech/ydb/coordination/scenario/service_discovery/Worker.java @@ -31,12 +31,12 @@ public static CompletableFuture newWorkerAsync(CoordinationClient client Duration maxAttemptTimeout) { return client.createSession(fullPath).thenCompose(session -> { byte[] data = endpoint.getBytes(StandardCharsets.UTF_8); - return session.acquireSemaphore(SEMAPHORE_NAME, 1, data, maxAttemptTimeout).thenApply(lease -> { - if (lease.isValid()) { - return new Worker(session, lease); + return session.acquireSemaphore(SEMAPHORE_NAME, 1, data, maxAttemptTimeout).thenApply(res -> { + if (res.isSuccess()) { + return new Worker(session, res.getValue()); } else { throw new UnexpectedResultException("The semaphore for Worker wasn't acquired.", - lease.getStatusFuture().join()); + res.getStatus()); } }); }); @@ -54,8 +54,8 @@ public static Worker newWorker(CoordinationClient client, String fullPath, Strin * Stop showing the Worker * @return Completable future with true if semaphore release was success otherwise false */ - public CompletableFuture stopAsync() { - CompletableFuture releaseFuture = semaphore.release(); + public CompletableFuture stopAsync() { + CompletableFuture releaseFuture = semaphore.release(); releaseFuture.thenRun(session::close); return releaseFuture; } @@ -63,7 +63,7 @@ public CompletableFuture stopAsync() { /** * {@link Worker#stopAsync()} */ - public boolean stop() { - return stopAsync().join(); + public void stop() { + stopAsync().join(); } } diff --git a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java index 9cddf3f46..ef6a9b4c8 100644 --- a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java +++ b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationNodeSettings.java @@ -66,7 +66,7 @@ public static class Builder extends OperationSettings.OperationBuilder public Builder withSelfCheckPeriod(Duration period) { Preconditions.checkArgument( - period.isNegative() || period.isZero(), + !period.isNegative() && !period.isZero(), "SelfCheckPeriod must be strictly greater than zero" ); this.selfCheckPeriod = period; @@ -75,7 +75,7 @@ public Builder withSelfCheckPeriod(Duration period) { public Builder withSessionGracePeriod(Duration period) { Preconditions.checkArgument( - period.isNegative() || period.isZero(), + !period.isNegative() && !period.isZero(), "SessionGracePeriod must be strictly greater than zero" ); this.sessionGracePeriod = period; diff --git a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java index 095797fc2..876d70b66 100644 --- a/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java +++ b/coordination/src/main/java/tech/ydb/coordination/settings/CoordinationSessionSettings.java @@ -49,6 +49,11 @@ public Builder withConnectTimeout(Duration timeout) { return this; } + public Builder withReconnectBackoffDelay(Duration delay) { + this.reconnectBackoffDelay = delay; + return this; + } + public CoordinationSessionSettings build() { return new CoordinationSessionSettings(this); } diff --git a/coordination/src/test/java/tech/ydb/coordination/ConfigurationScenarioTest.java b/coordination/src/test/java/tech/ydb/coordination/ConfigurationScenarioTest.java index f2d4b35a3..83c671517 100644 --- a/coordination/src/test/java/tech/ydb/coordination/ConfigurationScenarioTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/ConfigurationScenarioTest.java @@ -58,12 +58,14 @@ public void configurationScenarioBaseTest() { if (Arrays.equals(data, dataNow.get().getBytes())) { subscriberApproveCounter.get().countDown(); } - }); - Subscriber subscriber2 = Subscriber.newSubscriber(client, path, semaphoreName, data -> { - if (Arrays.equals(data, dataNow.get().getBytes())) { - subscriberApproveCounter.get().countDown(); - } - })) { + }); Subscriber subscriber2 = Subscriber.newSubscriber(client, path, semaphoreName, data -> { + if (Arrays.equals(data, dataNow.get().getBytes())) { + subscriberApproveCounter.get().countDown(); + } + })) { + subscriber1.hashCode(); + subscriber2.hashCode(); + subscriberApproveCounter.get().await(); subscriberApproveCounter.set(new CountDownLatch(2)); diff --git a/coordination/src/test/java/tech/ydb/coordination/CoordinationClientTest.java b/coordination/src/test/java/tech/ydb/coordination/CoordinationClientTest.java index 8e722f266..2b42401e2 100644 --- a/coordination/src/test/java/tech/ydb/coordination/CoordinationClientTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/CoordinationClientTest.java @@ -27,7 +27,9 @@ import tech.ydb.coordination.settings.CoordinationNodeSettings; import tech.ydb.coordination.settings.DescribeSemaphoreMode; import tech.ydb.coordination.settings.DropCoordinationNodeSettings; +import tech.ydb.coordination.settings.NodeConsistenteMode; import tech.ydb.coordination.settings.WatchSemaphoreMode; +import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadWriteStream; import tech.ydb.core.grpc.GrpcRequestSettings; @@ -63,8 +65,8 @@ public void alterNodeTest() { CompletableFuture result = client.alterNode( path, CoordinationNodeSettings.newBuilder() - .setReadConsistencyMode(CoordinationNodeSettings.ConsistencyMode.CONSISTENCY_MODE_STRICT) - .setSelfCheckPeriodMillis(2_000) + .withReadConsistencyMode(NodeConsistenteMode.STRICT) + .withSelfCheckPeriod(Duration.ofSeconds(2)) .build() ); @@ -77,7 +79,7 @@ public void coordinationSessionFullCycleTest() { try (CoordinationSession session = client.createSession(path).join()) { session.createSemaphore(semaphoreName, 100).get(20, TimeUnit.SECONDS); SemaphoreLease semaphore = session.acquireSemaphore(semaphoreName, 70, timeout) - .join(); + .join().getValue(); SemaphoreWatcher watch = session.describeAndWatchSemaphore(semaphoreName, DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS, @@ -93,7 +95,7 @@ public void coordinationSessionFullCycleTest() { final byte[] data = "Hello".getBytes(StandardCharsets.UTF_8); session.updateSemaphore(semaphoreName, data).join(); Assert.assertTrue(watch.getChangedFuture().get(1, TimeUnit.MINUTES).isDataChanged()); - Assert.assertTrue(semaphore.release().join()); + semaphore.close(); } catch (Exception e) { Assert.fail("There have to be no exceptions. [exception]: " + e); throw new RuntimeException(e); @@ -104,8 +106,8 @@ public void coordinationSessionFullCycleTest() { public void ephemeralSemaphoreBaseTest() { final String semaphoreName = "coordination-client-ephemeral-semaphore-base-test"; try (CoordinationSession session = client.createSession(path).join()) { - session.acquireEphemeralSemaphore(semaphoreName, timeout) - .join(); + session.acquireEphemeralSemaphore(semaphoreName, true, timeout) + .join().getValue(); final SemaphoreDescription description = session.describeSemaphore(semaphoreName, DescribeSemaphoreMode.DATA_ONLY) .join() @@ -128,9 +130,8 @@ public void retryCoordinationSessionTest() { try (CoordinationSession session = mockClient.createSession(path).join()) { session.createSemaphore(semaphoreName, 90 + sessionNum + 1).join(); - SemaphoreLease lease = session.acquireSemaphore(semaphoreName, 90, timeout).join(); + SemaphoreLease lease = session.acquireSemaphore(semaphoreName, 90, timeout).join().getValue(); - Assert.assertTrue(lease.isValid()); Assert.assertEquals(session.updateSemaphore(semaphoreName, "data".getBytes(StandardCharsets.UTF_8)).join(), Status.SUCCESS); @@ -143,9 +144,9 @@ public void retryCoordinationSessionTest() { ProxyStream.IS_STOPPED.set(true); // ------------------------ - List> acquireFutures = new ArrayList<>(); + List>> acquireFutures = new ArrayList<>(); sessions.forEach(otherSession -> { - final CompletableFuture acquireFuture = new CompletableFuture<>(); + final CompletableFuture> acquireFuture = new CompletableFuture<>(); acquireFutures.add(acquireFuture); otherSession.createSemaphore(semaphoreName, 1) .whenComplete((result, thSem) -> { @@ -160,8 +161,8 @@ public void retryCoordinationSessionTest() { // ------------------------ ProxyStream.IS_STOPPED.set(false); - for (CompletableFuture future : acquireFutures) { - Assert.assertTrue(future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).isValid()); + for (CompletableFuture> future : acquireFutures) { + Assert.assertTrue(future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).isSuccess()); } final SemaphoreDescription desc = session.describeSemaphore(semaphoreName, DescribeSemaphoreMode.DATA_ONLY) .get(timeout.toMillis(), TimeUnit.MILLISECONDS) diff --git a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java index 009ee224c..83b0584dc 100644 --- a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java @@ -296,11 +296,11 @@ public void leaderElectionOnPureSessionsTest() throws InterruptedException { final CountDownLatch latch2 = new CountDownLatch(sessionCount); sessions.forEach(session -> session - .acquireSemaphore(semaphoreName, 1, String.valueOf(session.getId()).getBytes(), Duration.ZERO) - .whenComplete((lease, acquireSemaphoreTh) -> { + .acquireSemaphore(semaphoreName, 1, String.valueOf(session.getId()).getBytes(), Duration.ofSeconds(2)) + .whenComplete((res, acquireSemaphoreTh) -> { threadWorkAssert(assertChecker, acquireSemaphoreTh == null); - if (lease.isValid()) { - semaphore.complete(lease); + if (res.isSuccess()) { + semaphore.complete(res.getValue()); leader.complete(session); } latch2.countDown(); diff --git a/coordination/src/test/java/tech/ydb/coordination/ServiceDiscoveryScenarioTest.java b/coordination/src/test/java/tech/ydb/coordination/ServiceDiscoveryScenarioTest.java index 025b2dcee..dd23dc1bc 100644 --- a/coordination/src/test/java/tech/ydb/coordination/ServiceDiscoveryScenarioTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/ServiceDiscoveryScenarioTest.java @@ -80,8 +80,7 @@ public void serviceDiscoveryTest() { final CountDownLatch stopFirstWorkerLatch = new CountDownLatch(1); subscriber2.setUpdateWaiter(stopFirstWorkerLatch::countDown); - final boolean stoppedWorker1 = worker1.stop(); - Assert.assertTrue(stoppedWorker1); + worker1.stop(); Assert.assertTrue(stopFirstWorkerLatch.await(60, TimeUnit.SECONDS)); final SemaphoreDescription removeDescription = subscriber2.getDescription(); @@ -92,7 +91,7 @@ public void serviceDiscoveryTest() { .join() .getValue()); - Assert.assertTrue(worker2.stop()); + worker2.stop(); Status remove = checkSession.deleteSemaphore(Worker.SEMAPHORE_NAME, true).join(); Assert.assertTrue(remove.isSuccess()); From 55b88d1d2c0d7761f85c75e574881aed0f87eaba Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 23 Nov 2023 09:36:52 +0000 Subject: [PATCH 4/4] Small fix --- .../java/tech/ydb/coordination/LeaderElectionScenarioTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java index 83b0584dc..e491f49a1 100644 --- a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java @@ -108,6 +108,7 @@ public void leaderElectionBaseTest() { break; case "endpoint3": participant3.interruptLeadership(); + break; default: throw new RuntimeException("No leader was elected."); }