Skip to content

Commit

Permalink
Merge pull request #8 from alex268/coord_session
Browse files Browse the repository at this point in the history
Update api of coordination service
  • Loading branch information
piece-of-tart authored Nov 24, 2023
2 parents 6b1e9eb + 55b88d1 commit 1600d27
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 267 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package tech.ydb.coordination;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;

import javax.annotation.WillNotClose;

Expand All @@ -10,6 +12,8 @@
import tech.ydb.coordination.settings.CoordinationSessionSettings;
import tech.ydb.coordination.settings.DescribeCoordinationNodeSettings;
import tech.ydb.coordination.settings.DropCoordinationNodeSettings;
import tech.ydb.coordination.settings.NodeConsistenteMode;
import tech.ydb.coordination.settings.NodeRateLimiterCountersMode;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;

Expand Down Expand Up @@ -91,7 +95,11 @@ static CoordinationClient newClient(@WillNotClose GrpcTransport transport) {
* @return future with instance of coordination session
*/
default CompletableFuture<CoordinationSession> createSession(String path) {
return createSession(path, CoordinationSessionSettings.newBuilder().build());
return createSession(path, CoordinationSessionSettings.newBuilder()
.withConnectTimeout(Duration.ofSeconds(5))
.withReconnectBackoffDelay(Duration.ofMillis(250))
.withExecutor(ForkJoinPool.commonPool())
.build());
}

/**
Expand All @@ -101,7 +109,14 @@ default CompletableFuture<CoordinationSession> createSession(String path) {
* @return status of request
*/
default CompletableFuture<Status> createNode(String path) {
return createNode(path, CoordinationNodeSettings.newBuilder().build());
return createNode(path, CoordinationNodeSettings.newBuilder()
.withSelfCheckPeriod(Duration.ofSeconds(1))
.withSessionGracePeriod(Duration.ofSeconds(10))
.withReadConsistencyMode(NodeConsistenteMode.RELAXED)
.withAttachConsistencyMode(NodeConsistenteMode.STRICT)
.withRateLimiterCountersMode(NodeRateLimiterCountersMode.UNSET)
.build()
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public interface CoordinationSession extends AutoCloseable {
* If there is no such a semaphore, future will complete exceptionally
* with {@link tech.ydb.core.UnexpectedResultException}.
*/
CompletableFuture<SemaphoreLease> acquireSemaphore(String name, long count, byte[] data, Duration timeout);
CompletableFuture<Result<SemaphoreLease>> acquireSemaphore(String name, long count, byte[] data, Duration timeout);

/**
* Acquire an ephemeral semaphore.
Expand All @@ -81,14 +81,14 @@ public interface CoordinationSession extends AutoCloseable {
* Later requests override previous operations with the same semaphore,
* e.g. to reduce acquired count, change timeout or attached data
*
* @param name Name of the semaphore to acquire
* @param timeout Duration after which operation will fail if it's still waiting in the waiters queue
* @param data User-defined binary data that may be attached to the operation
* @param name Name of the semaphore to acquire
* @param exclusive Flag of exclusive acquiring
* @param timeout Duration after which operation will fail if it's still waiting in the waiters queue
* @param data User-defined binary data that may be attached to the operation
* @return future with a semaphore lease object
*/
CompletableFuture<SemaphoreLease> acquireEphemeralSemaphore(String name, byte[] data, Duration timeout);


CompletableFuture<Result<SemaphoreLease>> acquireEphemeralSemaphore(String name, boolean exclusive, byte[] data,
Duration timeout);

CompletableFuture<Result<SemaphoreDescription>> describeSemaphore(String name, DescribeSemaphoreMode mode);

Expand Down Expand Up @@ -121,7 +121,7 @@ default CompletableFuture<Status> createSemaphore(String name, long limit) {
* @param timeout Duration after which operation will fail if it's still waiting in the waiters queue
* @return future with a semaphore lease object
*/
default CompletableFuture<SemaphoreLease> acquireSemaphore(String name, long count, Duration timeout) {
default CompletableFuture<Result<SemaphoreLease>> acquireSemaphore(String name, long count, Duration timeout) {
return acquireSemaphore(name, count, null, timeout);
}

Expand All @@ -134,10 +134,12 @@ default CompletableFuture<SemaphoreLease> acquireSemaphore(String name, long cou
* e.g. to reduce acquired count, change timeout or attached data
*
* @param name Name of the semaphore to acquire
* @param exclusive Flag of exclusive acquiring
* @param timeout Duration after which operation will fail if it's still waiting in the waiters queue
* @return future with a semaphore lease object
*/
default CompletableFuture<SemaphoreLease> acquireEphemeralSemaphore(String name, Duration timeout) {
return acquireEphemeralSemaphore(name, null, timeout);
default CompletableFuture<Result<SemaphoreLease>> acquireEphemeralSemaphore(String name, boolean exclusive,
Duration timeout) {
return acquireEphemeralSemaphore(name, exclusive, null, timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@

import java.util.concurrent.CompletableFuture;

import tech.ydb.core.Status;

/**
*
* @author Aleksandr Gorshenin
*/
public interface SemaphoreLease {

CoordinationSession getSession();
public interface SemaphoreLease extends AutoCloseable {

String getSemaphoreName();

CompletableFuture<Status> getStatusFuture();
CoordinationSession getSession();

boolean isActive();

CompletableFuture<Boolean> release();
CompletableFuture<Void> release();

default boolean isValid() {
return !getStatusFuture().isDone();
@Override
default void close() {
release().join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import tech.ydb.coordination.settings.CoordinationSessionSettings;
import tech.ydb.coordination.settings.DescribeCoordinationNodeSettings;
import tech.ydb.coordination.settings.DropCoordinationNodeSettings;
import tech.ydb.coordination.settings.NodeConsistenteMode;
import tech.ydb.coordination.settings.NodeRateLimiterCountersMode;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.operation.OperationUtils;
import tech.ydb.proto.coordination.AlterNodeRequest;
import tech.ydb.proto.coordination.Config;
Expand All @@ -36,103 +39,89 @@ public CompletableFuture<CoordinationSession> createSession(String path, Coordin
}

@Override
public CompletableFuture<Status> createNode(
String path,
CoordinationNodeSettings coordinationNodeSettings
) {
return coordinationRpc.createNode(
CreateNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(coordinationNodeSettings))
.setConfig(createConfig(coordinationNodeSettings))
.build(),
OperationUtils.createGrpcRequestSettings(coordinationNodeSettings)
);
public CompletableFuture<Status> createNode(String path, CoordinationNodeSettings settings) {
CreateNodeRequest request = CreateNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(settings))
.setConfig(createConfig(settings))
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
return coordinationRpc.createNode(request, grpcSettings);
}

@Override
public CompletableFuture<Status> alterNode(
String path,
CoordinationNodeSettings coordinationNodeSettings
) {
return coordinationRpc.alterNode(
AlterNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(coordinationNodeSettings))
.setConfig(createConfig(coordinationNodeSettings))
.build(),
OperationUtils.createGrpcRequestSettings(coordinationNodeSettings)
);
public CompletableFuture<Status> alterNode(String path, CoordinationNodeSettings settings) {
AlterNodeRequest request = AlterNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(settings))
.setConfig(createConfig(settings))
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
return coordinationRpc.alterNode(request, grpcSettings);
}

@Override
public CompletableFuture<Status> dropNode(
String path,
DropCoordinationNodeSettings dropCoordinationNodeSettings
) {
return coordinationRpc.dropNode(
DropNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(dropCoordinationNodeSettings))
.build(),
OperationUtils.createGrpcRequestSettings(dropCoordinationNodeSettings)
);
public CompletableFuture<Status> dropNode(String path, DropCoordinationNodeSettings settings) {
DropNodeRequest request = DropNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(settings))
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
return coordinationRpc.dropNode(request, grpcSettings);
}

@Override
public CompletableFuture<Status> describeNode(
String path,
DescribeCoordinationNodeSettings describeCoordinationNodeSettings
) {
return coordinationRpc.describeNode(
DescribeNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(describeCoordinationNodeSettings))
.build(),
OperationUtils.createGrpcRequestSettings(describeCoordinationNodeSettings)
);
public CompletableFuture<Status> describeNode(String path, DescribeCoordinationNodeSettings settings) {
DescribeNodeRequest request = DescribeNodeRequest.newBuilder()
.setPath(path)
.setOperationParams(OperationUtils.createParams(settings))
.build();

GrpcRequestSettings grpcSettings = OperationUtils.createGrpcRequestSettings(settings);
return coordinationRpc.describeNode(request, grpcSettings);
}

@Override
public String getDatabase() {
return coordinationRpc.getDatabase();
}

private static ConsistencyMode toProto(CoordinationNodeSettings.ConsistencyMode consistencyMode) {
switch (consistencyMode) {
case CONSISTENCY_MODE_STRICT:
private static ConsistencyMode toProto(NodeConsistenteMode mode) {
switch (mode) {
case UNSET:
return ConsistencyMode.CONSISTENCY_MODE_UNSET;
case STRICT:
return ConsistencyMode.CONSISTENCY_MODE_STRICT;
case CONSISTENCY_MODE_RELAXED:
case RELAXED:
return ConsistencyMode.CONSISTENCY_MODE_RELAXED;
default:
throw new RuntimeException("Unknown consistency mode: " + consistencyMode);
throw new RuntimeException("Unknown consistency mode: " + mode);
}
}

private static RateLimiterCountersMode toProto(
CoordinationNodeSettings.RateLimiterCountersMode rateLimiterCountersMode
) {
switch (rateLimiterCountersMode) {
case RATE_LIMITER_COUNTERS_MODE_DETAILED:
private static RateLimiterCountersMode toProto(NodeRateLimiterCountersMode mode) {
switch (mode) {
case UNSET:
return RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_UNSET;
case DETAILED:
return RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_DETAILED;
case RATE_LIMITER_COUNTERS_MODE_AGGREGATED:
case AGGREGATED:
return RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_AGGREGATED;
default:
throw new RuntimeException("Unknown rate limiter counters mode: " + rateLimiterCountersMode);
throw new RuntimeException("Unknown rate limiter counters mode: " + mode);
}
}

private static Config createConfig(CoordinationNodeSettings coordinationNodeSettings) {
Config.Builder configBuilder = Config.newBuilder()
.setSelfCheckPeriodMillis(coordinationNodeSettings.getSelfCheckPeriodMillis())
.setSessionGracePeriodMillis(coordinationNodeSettings.getSessionGracePeriodMillis())
return Config.newBuilder()
.setSelfCheckPeriodMillis((int) coordinationNodeSettings.getSelfCheckPeriod().toMillis())
.setSessionGracePeriodMillis((int) coordinationNodeSettings.getSessionGracePeriod().toMillis())
.setReadConsistencyMode(toProto(coordinationNodeSettings.getReadConsistencyMode()))
.setAttachConsistencyMode(toProto(coordinationNodeSettings.getAttachConsistencyMode()));

if (coordinationNodeSettings.getRateLimiterCountersMode() != null) {
configBuilder.setRateLimiterCountersMode(toProto(coordinationNodeSettings.getRateLimiterCountersMode()));
}

return configBuilder.build();
.setAttachConsistencyMode(toProto(coordinationNodeSettings.getAttachConsistencyMode()))
.setRateLimiterCountersMode(toProto(coordinationNodeSettings.getRateLimiterCountersMode()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import tech.ydb.coordination.settings.WatchSemaphoreMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;

public class CoordinationSessionImpl implements CoordinationSession {
private static final Logger logger = LoggerFactory.getLogger(CoordinationSession.class);
Expand All @@ -48,7 +47,7 @@ public static CompletableFuture<CoordinationSession> newSession(CoordinationRpc
final CoordinationRetryableStreamImpl stream = new CoordinationRetryableStreamImpl(rpc, executor, nodePath);
final CoordinationSessionImpl session = new CoordinationSessionImpl(stream);
final CompletableFuture<CoordinationSession> sessionStartFuture = CompletableFuture.completedFuture(session);
return session.start(settings.getCreateTimeout())
return session.start(settings.getConnectTimeout())
.thenAccept(session.sessionId::set)
.thenCompose(ignored -> sessionStartFuture);
}
Expand All @@ -69,45 +68,30 @@ public CompletableFuture<Status> createSemaphore(String semaphoreName, long limi
}

@Override
public CompletableFuture<SemaphoreLease> acquireSemaphore(String name, long count, byte[] data, Duration timeout) {
byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB;
public CompletableFuture<Result<SemaphoreLease>> acquireSemaphore(String name, long count, byte[] data,
Duration timeout) {
byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB;
final int reqId = lastId.getAndIncrement();
logger.trace("Send acquireSemaphore {} with count {}", name, count);
return stream.sendAcquireSemaphore(name, count, timeout, false, semaphoreData, reqId)
.thenApply(result -> {
SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name);
if (!result.getValue()) { // timeout expired
lease.completeLease(Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED));
}
return lease;
});
return stream.sendAcquireSemaphore(name, count, timeout, false, sepamhoreData, reqId)
.thenApply(r -> r.map(v -> new SemaphoreLeaseImpl(this, name)));
}

@Override
public CompletableFuture<SemaphoreLease> acquireEphemeralSemaphore(String name, byte[] data, Duration timeout) {
byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB;
public CompletableFuture<Result<SemaphoreLease>> acquireEphemeralSemaphore(String name, boolean exclusive,
byte[] data, Duration timeout) {
byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB;
final int reqId = lastId.getAndIncrement();
logger.trace("Send acquireEphemeralSemaphore {}", name);
return stream.sendAcquireSemaphore(name, -1L, timeout, true, semaphoreData, reqId)
.thenApply(result -> {
SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name);
if (!result.getValue()) { // timeout expired
lease.completeLease(Status.of(StatusCode.CLIENT_DEADLINE_EXPIRED));
}
return lease;
});
long limit = exclusive ? -1L : 1L;
return stream.sendAcquireSemaphore(name, limit, timeout, true, sepamhoreData, reqId)
.thenApply(r -> r.map(v -> new SemaphoreLeaseImpl(this, name)));
}


CompletableFuture<Boolean> releaseSemaphore(SemaphoreLeaseImpl lease) {
CompletableFuture<Boolean> releaseSemaphore(String name) {
final int semaphoreReleaseId = lastId.getAndIncrement();
logger.trace("Send releaseSemaphore {}", lease.getSemaphoreName());
return stream.sendReleaseSemaphore(lease.getSemaphoreName(), semaphoreReleaseId).thenApply(result -> {
if (result.getValue()) {
lease.completeLease(Status.SUCCESS);
}
return result.getValue();
});
logger.trace("Send releaseSemaphore {}", name);
return stream.sendReleaseSemaphore(name, semaphoreReleaseId).thenApply(Result::getValue);
}

@Override
Expand Down
Loading

0 comments on commit 1600d27

Please sign in to comment.