Skip to content

Commit

Permalink
update: Leader election service
Browse files Browse the repository at this point in the history
  • Loading branch information
piece-of-tart committed Nov 18, 2023
1 parent c65ca92 commit ffbf6d2
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public interface CoordinationSession extends AutoCloseable {
* @param name Name of the semaphore to create
* @param limit Number of tokens that may be acquired by sessions
* @param data User-defined data that will be attached to the semaphore
* @return future with status of operation
* @return Future with status of operation.
* If there already was a semaphore with such a name, you get
* {@code ALREADY_EXISTS} status.
*/
CompletableFuture<Status> createSemaphore(String name, long limit, byte[] data);

Expand All @@ -51,7 +53,7 @@ public interface CoordinationSession extends AutoCloseable {
*
* @param name Name of the semaphore to remove
* @param force Will delete semaphore even if it's currently acquired by sessions
* @return future with status of operation
* @return Future with status of operation.
*/
CompletableFuture<Status> deleteSemaphore(String name, boolean force);

Expand All @@ -65,7 +67,9 @@ public interface CoordinationSession extends AutoCloseable {
* @param count Number of tokens to acquire on the semaphore
* @param timeout Duration after which operation will fail if it's still waiting in the waiters queue
* @param data User-defined binary data that may be attached to the operation
* @return future with a semaphore lease object
* @return If there is a semaphore with {@code name}, future will return a semaphore lease object.
* If there is no such a semaphore, future will complete exceptionally
* with {@link tech.ydb.core.UnexpectedResultException}.
*/
CompletableFuture<SemaphoreLease> acquireSemaphore(String name, long count, byte[] data, Duration timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public CompletableFuture<Status> createSemaphore(String semaphoreName, long limi

@Override
public CompletableFuture<SemaphoreLease> acquireSemaphore(String name, long count, byte[] data, Duration timeout) {
byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB;
byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB;
final int reqId = lastId.getAndIncrement();
logger.trace("Send acquireSemaphore {} with count {}", name, count);
return stream.sendAcquireSemaphore(name, count, timeout, false, sepamhoreData, reqId)
return stream.sendAcquireSemaphore(name, count, timeout, false, semaphoreData, reqId)
.thenApply(result -> {
SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name);
if (!result.getValue()) { // timeout expired
Expand All @@ -85,10 +85,10 @@ public CompletableFuture<SemaphoreLease> acquireSemaphore(String name, long coun

@Override
public CompletableFuture<SemaphoreLease> acquireEphemeralSemaphore(String name, byte[] data, Duration timeout) {
byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB;
byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB;
final int reqId = lastId.getAndIncrement();
logger.trace("Send acquireEphemeralSemaphore {}", name);
return stream.sendAcquireSemaphore(name, -1L, timeout, true, sepamhoreData, reqId)
return stream.sendAcquireSemaphore(name, -1L, timeout, true, semaphoreData, reqId)
.thenApply(result -> {
SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name);
if (!result.getValue()) { // timeout expired
Expand Down Expand Up @@ -130,7 +130,7 @@ public CompletableFuture<Result<SemaphoreWatcher>> describeAndWatchSemaphore(Str
return stream.sendDescribeSemaphore(name,
describeMode.includeOwners(), describeMode.includeWaiters(),
watchMode.watchData(), watchMode.watchOwners(),
(ev) -> changeFuture.complete(ev)
changeFuture::complete
).thenApply(r -> r.map(desc -> new SemaphoreWatcher(desc, changeFuture)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -27,16 +30,17 @@
public class LeaderElection implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class);
private final AtomicBoolean isElecting = new AtomicBoolean(true);
private CompletableFuture<SemaphoreLease> acquireFuture;
private CompletableFuture<Session> describeFuture;
private final Consumer<LeaderElection> takeLeadershipObserver;
private final Consumer<LeaderElection> changeLeaderObserver;
private CompletableFuture<SemaphoreChangedEvent> changedEventFuture;
private final CoordinationSession session;
private final String name;
private final byte[] data;
private CompletableFuture<SemaphoreLease> acquireFuture;
private CompletableFuture<Optional<Session>> describeFuture;
private CompletableFuture<SemaphoreChangedEvent> changedEventFuture;

private LeaderElection(CoordinationSession session, String name, byte[] data,
LeadershipPolicy policy,
Consumer<LeaderElection> takeLeadershipObserver,
Consumer<LeaderElection> changeLeaderObserver) {
this.session = session;
Expand All @@ -45,15 +49,12 @@ private LeaderElection(CoordinationSession session, String name, byte[] data,
this.takeLeadershipObserver = takeLeadershipObserver;
this.changeLeaderObserver = changeLeaderObserver;
initializeAcquireFuture();
recursiveAcquire();
if (policy == LeadershipPolicy.TAKE_LEADERSHIP) {
proposeLeadershipAsync();
}
recursiveDescribe();
}

private void initializeAcquireFuture() {
acquireFuture = new CompletableFuture<>();
acquireFuture.thenRun(() -> takeLeadershipObserver.accept(this));
}

@Nonnull
private static RuntimeException getSemaphoreWatcherException(Result<SemaphoreWatcher> semaphoreWatcherResult) {
final RuntimeException e;
Expand All @@ -72,62 +73,108 @@ private static RuntimeException getSemaphoreWatcherException(Result<SemaphoreWat
* Join an election.
* When you only start an election, you should use this method as well as when you join an already
* existing election.
* @param client - Coordination client
* @param fullPath - full path to the coordination node
* @param endpoint - Leader's identifier. All participants see leader's endpoint
*
* @param client - Coordination client
* @param fullPath - full path to the coordination node
* @param endpoint - Leader's identifier. All participants see leader's endpoint
* @param semaphoreName - All participants try to acquire the same semaphore. This semaphore will be deleted after
* election, hence you shouldn't create semaphore with this name before election.
* @return LeaderElectionBuilder where you can add event's observer
*/
public static LeaderElectionBuilder joinElection(CoordinationClient client, String fullPath,
String endpoint, String semaphoreName) {
String endpoint, String semaphoreName) {
return new LeaderElectionBuilder(client, fullPath, endpoint, semaphoreName);
}

private void initializeAcquireFuture() {
acquireFuture = new CompletableFuture<>();
acquireFuture.thenRun(() -> takeLeadershipObserver.accept(this));
}

private CompletableFuture<SemaphoreChangedEvent> recursiveDescribeDetail() {
return session.describeAndWatchSemaphore(name,
DescribeSemaphoreMode.WITH_OWNERS,
WatchSemaphoreMode.WATCH_OWNERS)
.thenCompose((semaphoreWatcherResult -> {
if (semaphoreWatcherResult != null && semaphoreWatcherResult.isSuccess()) {
describeFuture.complete(
semaphoreWatcherResult
.getValue()
.getDescription()
.getOwnersList()
.get(0)
);
return semaphoreWatcherResult.getValue().getChangedFuture();
} else if (semaphoreWatcherResult != null &&
semaphoreWatcherResult.getStatus().getCode() == StatusCode.NOT_FOUND) {
return CompletableFuture.completedFuture(new SemaphoreChangedEvent(false, false,
false));
} else {
logger.debug("session.describeAndWatchSemaphore.whenComplete() {}",
semaphoreWatcherResult);
final RuntimeException e = getSemaphoreWatcherException(semaphoreWatcherResult);
close();
describeFuture.completeExceptionally(e);
throw e;
}
})
);
.handle((semaphoreWatcherResult, th) -> {
if (th != null) {
CompletableFuture<Boolean> callback = new CompletableFuture<>();
if (isSemaphoreExistenceException(th, () -> callback.complete(true))) {
callback.join();
return recursiveDescribeDetail();
}
logger.warn("Exception when trying to describe the semaphore:" +
" (semaphoreWatcherResult = {}, throwable = {})", semaphoreWatcherResult, th);
isElecting.set(false);
}
if (semaphoreWatcherResult != null && semaphoreWatcherResult.isSuccess()) {
final List<Session> leader = semaphoreWatcherResult.getValue().getDescription().getOwnersList();
describeFuture.complete(leader.isEmpty() ? Optional.empty() : Optional.of(leader.get(0)));
return semaphoreWatcherResult.getValue().getChangedFuture();
} else if (semaphoreWatcherResult != null &&
semaphoreWatcherResult.getStatus().getCode() == StatusCode.NOT_FOUND) {
return CompletableFuture.completedFuture(new SemaphoreChangedEvent(false, false,
false));
} else {
logger.debug("session.describeAndWatchSemaphore.whenComplete() {}",
semaphoreWatcherResult);
final RuntimeException e = getSemaphoreWatcherException(semaphoreWatcherResult);
close();
describeFuture.completeExceptionally(e);
throw e;
}
}).thenCompose(Function.identity());
}

private void recursiveDescribe() {
describeFuture = new CompletableFuture<>();
describeFuture.thenRun(() -> changeLeaderObserver.accept(this));
changedEventFuture = recursiveDescribeDetail();
changedEventFuture.whenComplete((semaphoreChangedEvent, th) -> {
if (isSemaphoreExistenceException(th, this::recursiveDescribe)) {
return;
}
if (semaphoreChangedEvent != null && th == null && isElecting.get()) {
recursiveDescribe();
return;
}
logger.warn("Exception when trying to check changes at the semaphore:" +
" (semaphore changed event: {}, throwable: {})", semaphoreChangedEvent, th);
isElecting.set(false);
});
}

private void createSemaphore(Runnable callback) {
session.createSemaphore(name, 1).whenComplete(((status, throwable) -> {
if (status == null || throwable != null ||
(status.getCode() != StatusCode.ALREADY_EXISTS && !status.isSuccess())) {
logger.warn("Exception when trying to create the semaphore: (status: {}, throwable: {})",
status, throwable);
isElecting.set(false);
}
callback.run();
}));
}

private boolean isSemaphoreExistenceException(Throwable th, Runnable callback) {
if (th instanceof CompletionException) {
th = th.getCause();
}
if (th instanceof UnexpectedResultException) {
UnexpectedResultException e = (UnexpectedResultException) th;
if (e.getStatus().getCode() == StatusCode.NOT_FOUND) {
createSemaphore(callback);
return true;
}
}
return false;
}

private void recursiveAcquire() {
session.acquireEphemeralSemaphore(name, data, Duration.ofHours(1)).whenComplete(
session.acquireSemaphore(name, 1, data, Duration.ofHours(1)).whenComplete(
(lease, throwable) -> {
if (isSemaphoreExistenceException(throwable, this::recursiveAcquire)) {
return;
}
if ((lease == null || throwable != null) && isElecting.get()) {
recursiveAcquire();
} else if (lease != null) {
Expand All @@ -140,35 +187,34 @@ private void recursiveAcquire() {
});
}

public void proposeLeadershipAsync() {
recursiveAcquire();
}

/**
* Don't wait until the node notifies session about change the leader and require information about current leader
*
* @return Completable future of leader's endpoint.
*/
public synchronized CompletableFuture<String> forceUpdateLeaderAsync() {
public synchronized CompletableFuture<Optional<String>> forceUpdateLeaderAsync() {
changedEventFuture.complete(new SemaphoreChangedEvent(false, false, false));
return getLeaderAsync();
}

/**
* {@link LeaderElection#forceUpdateLeaderAsync()}
*/
public String forceUpdateLeader() {
public Optional<String> forceUpdateLeader() {
return forceUpdateLeaderAsync().join();
}

/**
* When your participant know the leader, you can see its endpoint
* @return Completable future of leader's endpoint.
*/
public CompletableFuture<String> getLeaderAsync() {
return describeFuture.thenApply(session -> new String(session.getData(), StandardCharsets.UTF_8));
}

/**
* {@link LeaderElection#getLeaderAsync()}
* When your participant know the leader, you can see its endpoint.
* @return Completable future of leader's endpoint if leader is present.
*/
public String getLeader() {
return getLeaderAsync().join();
private CompletableFuture<Optional<String>> getLeaderAsync() {
return describeFuture.thenApply(optional -> optional.map(session ->
new String(session.getData(), StandardCharsets.UTF_8)));
}

/**
Expand Down Expand Up @@ -219,9 +265,10 @@ public static class LeaderElectionBuilder {
private final String semaphoreName;
private Consumer<LeaderElection> takeLeadershipObserver;
private Consumer<LeaderElection> leaderChangeObserver;
private LeadershipPolicy policy = LeadershipPolicy.ONLY_WATCH_LEADER;

LeaderElectionBuilder(CoordinationClient client, String fullPath, String endpoint,
String semaphoreName) {
String semaphoreName) {
this.client = client;
this.fullPath = fullPath;
this.endpoint = endpoint;
Expand All @@ -230,6 +277,7 @@ public static class LeaderElectionBuilder {

/**
* Add observer for taking leadership. It will be called when your participant becomes a leader
*
* @param takeLeadershipObserver - callback for taking leadership observing
* @return LeaderElectionBuilder
*/
Expand All @@ -240,8 +288,9 @@ public LeaderElectionBuilder withTakeLeadershipObserver(Consumer<LeaderElection>

/**
* Add observer for changing leader. It will be called when participant joins the election,
* when participant is notified about leader change
* when participant calls forceUpdateLeader
* when participant is notified about leader change
* when participant calls forceUpdateLeader
*
* @param leaderChangeObserver - callback for leader change observing
* @return LeaderElectionBuilder
*/
Expand All @@ -250,8 +299,14 @@ public LeaderElectionBuilder withChangeLeaderObserver(Consumer<LeaderElection> l
return this;
}

public LeaderElectionBuilder withLeadershipPolicy(LeadershipPolicy policy) {
this.policy = policy;
return this;
}

/**
* Build Leader Election participant in asynchronous way
*
* @return Completable future with Leader Election
*/
public CompletableFuture<LeaderElection> buildAsync() {
Expand All @@ -261,6 +316,7 @@ public CompletableFuture<LeaderElection> buildAsync() {
session,
semaphoreName,
endpoint.getBytes(StandardCharsets.UTF_8),
policy,
takeLeadershipObserver == null ? Function.identity()::apply :
takeLeadershipObserver,
leaderChangeObserver == null ? Function.identity()::apply : leaderChangeObserver
Expand All @@ -274,6 +330,10 @@ public CompletableFuture<LeaderElection> buildAsync() {
public LeaderElection build() {
return buildAsync().join();
}
}

public enum LeadershipPolicy {
TAKE_LEADERSHIP,
ONLY_WATCH_LEADER
}
}
Loading

0 comments on commit ffbf6d2

Please sign in to comment.