From ffbf6d27da32ecaef832cc7e7e8dff2b1a704873 Mon Sep 17 00:00:00 2001 From: piece-of-tart Date: Sun, 19 Nov 2023 02:38:30 +0300 Subject: [PATCH] update: Leader election service --- .../ydb/coordination/CoordinationSession.java | 10 +- .../impl/CoordinationSessionImpl.java | 10 +- .../leader_election/LeaderElection.java | 168 ++++++++++++------ .../LeaderElectionScenarioTest.java | 158 ++++++++++++---- 4 files changed, 246 insertions(+), 100 deletions(-) diff --git a/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java b/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java index 9254bcf2d..55368071b 100644 --- a/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java +++ b/coordination/src/main/java/tech/ydb/coordination/CoordinationSession.java @@ -31,7 +31,9 @@ public interface CoordinationSession extends AutoCloseable { * @param name Name of the semaphore to create * @param limit Number of tokens that may be acquired by sessions * @param data User-defined data that will be attached to the semaphore - * @return future with status of operation + * @return Future with status of operation. + * If there already was a semaphore with such a name, you get + * {@code ALREADY_EXISTS} status. */ CompletableFuture createSemaphore(String name, long limit, byte[] data); @@ -51,7 +53,7 @@ public interface CoordinationSession extends AutoCloseable { * * @param name Name of the semaphore to remove * @param force Will delete semaphore even if it's currently acquired by sessions - * @return future with status of operation + * @return Future with status of operation. */ CompletableFuture deleteSemaphore(String name, boolean force); @@ -65,7 +67,9 @@ public interface CoordinationSession extends AutoCloseable { * @param count Number of tokens to acquire on the semaphore * @param timeout Duration after which operation will fail if it's still waiting in the waiters queue * @param data User-defined binary data that may be attached to the operation - * @return future with a semaphore lease object + * @return If there is a semaphore with {@code name}, future will return a semaphore lease object. + * If there is no such a semaphore, future will complete exceptionally + * with {@link tech.ydb.core.UnexpectedResultException}. */ CompletableFuture acquireSemaphore(String name, long count, byte[] data, Duration timeout); diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java index 06706241b..a51f17389 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/CoordinationSessionImpl.java @@ -70,10 +70,10 @@ public CompletableFuture createSemaphore(String semaphoreName, long limi @Override public CompletableFuture acquireSemaphore(String name, long count, byte[] data, Duration timeout) { - byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB; + byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB; final int reqId = lastId.getAndIncrement(); logger.trace("Send acquireSemaphore {} with count {}", name, count); - return stream.sendAcquireSemaphore(name, count, timeout, false, sepamhoreData, reqId) + return stream.sendAcquireSemaphore(name, count, timeout, false, semaphoreData, reqId) .thenApply(result -> { SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name); if (!result.getValue()) { // timeout expired @@ -85,10 +85,10 @@ public CompletableFuture acquireSemaphore(String name, long coun @Override public CompletableFuture acquireEphemeralSemaphore(String name, byte[] data, Duration timeout) { - byte[] sepamhoreData = data != null ? data : BYTE_ARRAY_STUB; + byte[] semaphoreData = data != null ? data : BYTE_ARRAY_STUB; final int reqId = lastId.getAndIncrement(); logger.trace("Send acquireEphemeralSemaphore {}", name); - return stream.sendAcquireSemaphore(name, -1L, timeout, true, sepamhoreData, reqId) + return stream.sendAcquireSemaphore(name, -1L, timeout, true, semaphoreData, reqId) .thenApply(result -> { SemaphoreLeaseImpl lease = new SemaphoreLeaseImpl(this, name); if (!result.getValue()) { // timeout expired @@ -130,7 +130,7 @@ public CompletableFuture> describeAndWatchSemaphore(Str return stream.sendDescribeSemaphore(name, describeMode.includeOwners(), describeMode.includeWaiters(), watchMode.watchData(), watchMode.watchOwners(), - (ev) -> changeFuture.complete(ev) + changeFuture::complete ).thenApply(r -> r.map(desc -> new SemaphoreWatcher(desc, changeFuture))); } diff --git a/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java b/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java index d4dcc7713..ac8954e32 100644 --- a/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java +++ b/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java @@ -2,7 +2,10 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -27,16 +30,17 @@ public class LeaderElection implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class); private final AtomicBoolean isElecting = new AtomicBoolean(true); - private CompletableFuture acquireFuture; - private CompletableFuture describeFuture; private final Consumer takeLeadershipObserver; private final Consumer changeLeaderObserver; - private CompletableFuture changedEventFuture; private final CoordinationSession session; private final String name; private final byte[] data; + private CompletableFuture acquireFuture; + private CompletableFuture> describeFuture; + private CompletableFuture changedEventFuture; private LeaderElection(CoordinationSession session, String name, byte[] data, + LeadershipPolicy policy, Consumer takeLeadershipObserver, Consumer changeLeaderObserver) { this.session = session; @@ -45,15 +49,12 @@ private LeaderElection(CoordinationSession session, String name, byte[] data, this.takeLeadershipObserver = takeLeadershipObserver; this.changeLeaderObserver = changeLeaderObserver; initializeAcquireFuture(); - recursiveAcquire(); + if (policy == LeadershipPolicy.TAKE_LEADERSHIP) { + proposeLeadershipAsync(); + } recursiveDescribe(); } - private void initializeAcquireFuture() { - acquireFuture = new CompletableFuture<>(); - acquireFuture.thenRun(() -> takeLeadershipObserver.accept(this)); - } - @Nonnull private static RuntimeException getSemaphoreWatcherException(Result semaphoreWatcherResult) { final RuntimeException e; @@ -72,46 +73,56 @@ private static RuntimeException getSemaphoreWatcherException(Result(); + acquireFuture.thenRun(() -> takeLeadershipObserver.accept(this)); + } + private CompletableFuture recursiveDescribeDetail() { return session.describeAndWatchSemaphore(name, DescribeSemaphoreMode.WITH_OWNERS, WatchSemaphoreMode.WATCH_OWNERS) - .thenCompose((semaphoreWatcherResult -> { - if (semaphoreWatcherResult != null && semaphoreWatcherResult.isSuccess()) { - describeFuture.complete( - semaphoreWatcherResult - .getValue() - .getDescription() - .getOwnersList() - .get(0) - ); - return semaphoreWatcherResult.getValue().getChangedFuture(); - } else if (semaphoreWatcherResult != null && - semaphoreWatcherResult.getStatus().getCode() == StatusCode.NOT_FOUND) { - return CompletableFuture.completedFuture(new SemaphoreChangedEvent(false, false, - false)); - } else { - logger.debug("session.describeAndWatchSemaphore.whenComplete() {}", - semaphoreWatcherResult); - final RuntimeException e = getSemaphoreWatcherException(semaphoreWatcherResult); - close(); - describeFuture.completeExceptionally(e); - throw e; - } - }) - ); + .handle((semaphoreWatcherResult, th) -> { + if (th != null) { + CompletableFuture callback = new CompletableFuture<>(); + if (isSemaphoreExistenceException(th, () -> callback.complete(true))) { + callback.join(); + return recursiveDescribeDetail(); + } + logger.warn("Exception when trying to describe the semaphore:" + + " (semaphoreWatcherResult = {}, throwable = {})", semaphoreWatcherResult, th); + isElecting.set(false); + } + if (semaphoreWatcherResult != null && semaphoreWatcherResult.isSuccess()) { + final List leader = semaphoreWatcherResult.getValue().getDescription().getOwnersList(); + describeFuture.complete(leader.isEmpty() ? Optional.empty() : Optional.of(leader.get(0))); + return semaphoreWatcherResult.getValue().getChangedFuture(); + } else if (semaphoreWatcherResult != null && + semaphoreWatcherResult.getStatus().getCode() == StatusCode.NOT_FOUND) { + return CompletableFuture.completedFuture(new SemaphoreChangedEvent(false, false, + false)); + } else { + logger.debug("session.describeAndWatchSemaphore.whenComplete() {}", + semaphoreWatcherResult); + final RuntimeException e = getSemaphoreWatcherException(semaphoreWatcherResult); + close(); + describeFuture.completeExceptionally(e); + throw e; + } + }).thenCompose(Function.identity()); } private void recursiveDescribe() { @@ -119,15 +130,51 @@ private void recursiveDescribe() { describeFuture.thenRun(() -> changeLeaderObserver.accept(this)); changedEventFuture = recursiveDescribeDetail(); changedEventFuture.whenComplete((semaphoreChangedEvent, th) -> { + if (isSemaphoreExistenceException(th, this::recursiveDescribe)) { + return; + } if (semaphoreChangedEvent != null && th == null && isElecting.get()) { recursiveDescribe(); + return; } + logger.warn("Exception when trying to check changes at the semaphore:" + + " (semaphore changed event: {}, throwable: {})", semaphoreChangedEvent, th); + isElecting.set(false); }); } + private void createSemaphore(Runnable callback) { + session.createSemaphore(name, 1).whenComplete(((status, throwable) -> { + if (status == null || throwable != null || + (status.getCode() != StatusCode.ALREADY_EXISTS && !status.isSuccess())) { + logger.warn("Exception when trying to create the semaphore: (status: {}, throwable: {})", + status, throwable); + isElecting.set(false); + } + callback.run(); + })); + } + + private boolean isSemaphoreExistenceException(Throwable th, Runnable callback) { + if (th instanceof CompletionException) { + th = th.getCause(); + } + if (th instanceof UnexpectedResultException) { + UnexpectedResultException e = (UnexpectedResultException) th; + if (e.getStatus().getCode() == StatusCode.NOT_FOUND) { + createSemaphore(callback); + return true; + } + } + return false; + } + private void recursiveAcquire() { - session.acquireEphemeralSemaphore(name, data, Duration.ofHours(1)).whenComplete( + session.acquireSemaphore(name, 1, data, Duration.ofHours(1)).whenComplete( (lease, throwable) -> { + if (isSemaphoreExistenceException(throwable, this::recursiveAcquire)) { + return; + } if ((lease == null || throwable != null) && isElecting.get()) { recursiveAcquire(); } else if (lease != null) { @@ -140,11 +187,16 @@ private void recursiveAcquire() { }); } + public void proposeLeadershipAsync() { + recursiveAcquire(); + } + /** * Don't wait until the node notifies session about change the leader and require information about current leader + * * @return Completable future of leader's endpoint. */ - public synchronized CompletableFuture forceUpdateLeaderAsync() { + public synchronized CompletableFuture> forceUpdateLeaderAsync() { changedEventFuture.complete(new SemaphoreChangedEvent(false, false, false)); return getLeaderAsync(); } @@ -152,23 +204,17 @@ public synchronized CompletableFuture forceUpdateLeaderAsync() { /** * {@link LeaderElection#forceUpdateLeaderAsync()} */ - public String forceUpdateLeader() { + public Optional forceUpdateLeader() { return forceUpdateLeaderAsync().join(); } /** - * When your participant know the leader, you can see its endpoint - * @return Completable future of leader's endpoint. - */ - public CompletableFuture getLeaderAsync() { - return describeFuture.thenApply(session -> new String(session.getData(), StandardCharsets.UTF_8)); - } - - /** - * {@link LeaderElection#getLeaderAsync()} + * When your participant know the leader, you can see its endpoint. + * @return Completable future of leader's endpoint if leader is present. */ - public String getLeader() { - return getLeaderAsync().join(); + private CompletableFuture> getLeaderAsync() { + return describeFuture.thenApply(optional -> optional.map(session -> + new String(session.getData(), StandardCharsets.UTF_8))); } /** @@ -219,9 +265,10 @@ public static class LeaderElectionBuilder { private final String semaphoreName; private Consumer takeLeadershipObserver; private Consumer leaderChangeObserver; + private LeadershipPolicy policy = LeadershipPolicy.ONLY_WATCH_LEADER; LeaderElectionBuilder(CoordinationClient client, String fullPath, String endpoint, - String semaphoreName) { + String semaphoreName) { this.client = client; this.fullPath = fullPath; this.endpoint = endpoint; @@ -230,6 +277,7 @@ public static class LeaderElectionBuilder { /** * Add observer for taking leadership. It will be called when your participant becomes a leader + * * @param takeLeadershipObserver - callback for taking leadership observing * @return LeaderElectionBuilder */ @@ -240,8 +288,9 @@ public LeaderElectionBuilder withTakeLeadershipObserver(Consumer /** * Add observer for changing leader. It will be called when participant joins the election, - * when participant is notified about leader change - * when participant calls forceUpdateLeader + * when participant is notified about leader change + * when participant calls forceUpdateLeader + * * @param leaderChangeObserver - callback for leader change observing * @return LeaderElectionBuilder */ @@ -250,8 +299,14 @@ public LeaderElectionBuilder withChangeLeaderObserver(Consumer l return this; } + public LeaderElectionBuilder withLeadershipPolicy(LeadershipPolicy policy) { + this.policy = policy; + return this; + } + /** * Build Leader Election participant in asynchronous way + * * @return Completable future with Leader Election */ public CompletableFuture buildAsync() { @@ -261,6 +316,7 @@ public CompletableFuture buildAsync() { session, semaphoreName, endpoint.getBytes(StandardCharsets.UTF_8), + policy, takeLeadershipObserver == null ? Function.identity()::apply : takeLeadershipObserver, leaderChangeObserver == null ? Function.identity()::apply : leaderChangeObserver @@ -274,6 +330,10 @@ public CompletableFuture buildAsync() { public LeaderElection build() { return buildAsync().join(); } + } + public enum LeadershipPolicy { + TAKE_LEADERSHIP, + ONLY_WATCH_LEADER } } diff --git a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java index 6ca5b010c..b0effcb32 100644 --- a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java @@ -2,9 +2,12 @@ import java.time.Duration; import java.util.List; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -19,6 +22,7 @@ import org.slf4j.LoggerFactory; import tech.ydb.coordination.scenario.leader_election.LeaderElection; +import tech.ydb.coordination.scenario.leader_election.LeaderElection.LeadershipPolicy; import tech.ydb.coordination.settings.CoordinationNodeSettings; import tech.ydb.coordination.settings.DescribeSemaphoreMode; import tech.ydb.coordination.settings.DropCoordinationNodeSettings; @@ -27,10 +31,10 @@ import tech.ydb.test.junit4.GrpcTransportRule; public class LeaderElectionScenarioTest { - private static final Logger logger = LoggerFactory.getLogger(LeaderElectionScenarioTest.class); - private static final String SEMAPHORE_PREFIX = "leader-election-"; @ClassRule public static final GrpcTransportRule YDB_TRANSPORT = new GrpcTransportRule(); + private static final Logger logger = LoggerFactory.getLogger(LeaderElectionScenarioTest.class); + private static final String SEMAPHORE_PREFIX = "leader-election-"; private final String path = YDB_TRANSPORT.getDatabase() + "/coordination-node"; private final CoordinationClient client = CoordinationClient.newClient(YDB_TRANSPORT); @@ -47,22 +51,34 @@ public void createNode() { @Test(timeout = 20_000) public void leaderElectionBaseTest() { - final String semaphoreName = SEMAPHORE_PREFIX + 1_000_001; + final String semaphoreName = "leader-election-base-test"; final AtomicReference leader = new AtomicReference<>(); - + final CyclicBarrier barrier = new CyclicBarrier(2); try (LeaderElection participant1 = LeaderElection .joinElection(client, path, "endpoint-1", semaphoreName) - .withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-1")).build(); + .withLeadershipPolicy(LeadershipPolicy.TAKE_LEADERSHIP) + .withTakeLeadershipObserver(leaderElection -> { + leader.set("endpoint-1"); + awaitBarrier(barrier); + }).build(); + LeaderElection participant2 = LeaderElection .joinElection(client, path, "endpoint-2", semaphoreName) - .withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-2")).build()) { - String leaderFromParticipant1 = participant1.forceUpdateLeader(); - String leaderFromParticipant2 = participant2.forceUpdateLeader(); + .withLeadershipPolicy(LeadershipPolicy.TAKE_LEADERSHIP) + .withTakeLeadershipObserver(leaderElection -> { + leader.set("endpoint-2"); + awaitBarrier(barrier); + }).build() + ) { + awaitBarrier(barrier); + String leaderFromParticipant1 = participant1.forceUpdateLeader().orElse("none"); + String leaderFromParticipant2 = participant2.forceUpdateLeader().orElse("none"); Assert.assertEquals(leader.get(), leaderFromParticipant1); Assert.assertEquals(leader.get(), leaderFromParticipant2); logger.info("The first leader: " + leader.get()); + barrier.reset(); /* Leader change observer will be call 3 times: first - after asking election who is a leader now @@ -72,8 +88,13 @@ public void leaderElectionBaseTest() { CountDownLatch counter = new CountDownLatch(3); try (LeaderElection participant3 = LeaderElection .joinElection(client, path, "endpoint-3", semaphoreName) - .withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-3")) - .withChangeLeaderObserver(leaderElection -> counter.countDown()).build()) { + .withLeadershipPolicy(LeadershipPolicy.TAKE_LEADERSHIP) + .withTakeLeadershipObserver(leaderElection -> { + leader.set("endpoint-3"); + awaitBarrier(barrier); + }) + .withChangeLeaderObserver(leaderElection -> counter.countDown()).build() + ) { final String previousLeader = leader.get(); switch (leader.get()) { case "endpoint-1": @@ -82,36 +103,81 @@ public void leaderElectionBaseTest() { case "endpoint-2": participant2.interruptLeadership(); break; - default: + case "endpoint3": participant3.interruptLeadership(); + default: + throw new RuntimeException("No leader was elected."); } + + awaitBarrier(barrier); Assert.assertNotEquals(previousLeader, leader.get()); - Assert.assertEquals(leader.get(), participant1.forceUpdateLeader()); - Assert.assertEquals(leader.get(), participant2.forceUpdateLeader()); - Assert.assertEquals(leader.get(), participant3.forceUpdateLeader()); + Assert.assertEquals(leader.get(), participant1.forceUpdateLeader().orElse("none")); + Assert.assertEquals(leader.get(), participant2.forceUpdateLeader().orElse("none")); + Assert.assertEquals(leader.get(), participant3.forceUpdateLeader().orElse("none")); Assert.assertTrue(counter.await(20_000, TimeUnit.MILLISECONDS)); } catch (Exception e) { - Assert.fail("Exception in leader election test."); + Assert.fail("Exception in leader election test."); } } catch (Exception e) { Assert.fail("Exception in leader election test."); } } + @Test(timeout = 20_000) + public void leaderElectionOneLeaderSeveralFollowerTest() { + final String name = "leader-election-one-leader-several-followers"; + final AtomicBoolean isFollowerALeader = new AtomicBoolean(false); + /* Check that after leader.interruptLeadership() this leader will be chosen again */ + final CountDownLatch latch = new CountDownLatch(1); + + try (LeaderElection follower1 = LeaderElection.joinElection(client, path, "endpoint-1", name) + .withTakeLeadershipObserver(leaderElection -> isFollowerALeader.set(true)).build(); + + LeaderElection follower2 = LeaderElection.joinElection(client, path, "endpoint-2", name) + .withTakeLeadershipObserver(leaderElection -> isFollowerALeader.set(true)).build(); + + LeaderElection leader = LeaderElection.joinElection(client, path, "endpoint-3", name) + .withTakeLeadershipObserver(leaderElection -> { + latch.countDown(); + leaderElection.interruptLeadership(); + }).build() + ) { + + leader.proposeLeadershipAsync(); + + Assert.assertTrue(latch.await(20_000, TimeUnit.MILLISECONDS)); + + Assert.assertFalse(follower1.isLeader()); + Assert.assertFalse(follower2.isLeader()); + Assert.assertFalse(isFollowerALeader.get()); + } catch (Exception e) { + Assert.fail("Exception in leader election test: " + e.getMessage()); + } + } + @Test(timeout = 20_000) public void leaderElectionBaseTest2() { final String name = "leader-election-base-test-2"; - try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name).build(); + CyclicBarrier barrier = new CyclicBarrier(2); + + try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name) + .withLeadershipPolicy(LeadershipPolicy.TAKE_LEADERSHIP) + .withTakeLeadershipObserver(leaderElection -> awaitBarrier(barrier)) + .build(); + LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name).build() ) { - String firstLeader = participant1.getLeader(); - if (firstLeader.equals("endpoint-1")) { - participant1.interruptLeadership(); - } else { - participant2.interruptLeadership(); - } - Assert.assertNotEquals(firstLeader, participant1.forceUpdateLeader()); - Assert.assertNotEquals(firstLeader, participant2.forceUpdateLeader()); + awaitBarrier(barrier); + String firstLeader = participant1.forceUpdateLeader().orElse("none"); + Assert.assertEquals("endpoint-1", firstLeader); + + barrier.reset(); + + participant1.interruptLeadership(); + awaitBarrier(barrier); + + Assert.assertEquals(firstLeader, participant1.forceUpdateLeader().orElse("none")); + Assert.assertEquals(firstLeader, participant2.forceUpdateLeader().orElse("none")); } catch (Exception e) { Assert.fail("Exception while testing leader election scenario."); } @@ -120,34 +186,35 @@ public void leaderElectionBaseTest2() { @Test(timeout = 20_000) public void leaderElectionBaseTest3() { final String name = "leader-election-base-test-3"; - CountDownLatch counter = new CountDownLatch(2); + CyclicBarrier barrier = new CyclicBarrier(2); try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name) .withTakeLeadershipObserver(leaderElection -> { logger.info("Endpoint-1 is a leader now!"); - counter.countDown(); + awaitBarrier(barrier); }).build(); LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name) .withTakeLeadershipObserver(leaderElection -> { logger.info("Endpoint-2 is a leader now!"); - counter.countDown(); + awaitBarrier(barrier); }).build() ) { + participant1.proposeLeadershipAsync(); + participant2.proposeLeadershipAsync(); + awaitBarrier(barrier); + barrier.reset(); if (participant1.isLeader()) { participant1.interruptLeadership(); } else { participant2.interruptLeadership(); } - - participant1.forceUpdateLeader(); - participant2.forceUpdateLeader(); - + awaitBarrier(barrier); + barrier.reset(); if (participant1.isLeader()) { participant1.interruptLeadership(); } else { participant2.interruptLeadership(); } - - Assert.assertTrue(counter.await(20_000, TimeUnit.MILLISECONDS)); + awaitBarrier(barrier); } catch (Exception e) { Assert.fail("Exception while testing leader election scenario."); } @@ -156,20 +223,26 @@ public void leaderElectionBaseTest3() { @Test(timeout = 60_000) public void leaderElectionStressTest1() { final int sessionCount = 20; - /* ID for definition Leader Election. Every session has to point the same token. */ - final String semaphoreName = SEMAPHORE_PREFIX + 1_000_000; + final String semaphoreName = "leader-election-stress-test-1"; + CyclicBarrier barrier = new CyclicBarrier(2); List participants = IntStream.range(0, sessionCount).mapToObj(id -> LeaderElection - .joinElection(client, path, "endpoint-" + id, semaphoreName).build()) + .joinElection(client, path, "endpoint-" + id, semaphoreName) + .withTakeLeadershipObserver(leaderElection -> awaitBarrier(barrier)) + .withLeadershipPolicy(LeadershipPolicy.TAKE_LEADERSHIP) + .build()) .collect(Collectors.toList()); + awaitBarrier(barrier); final AtomicReference leader = new AtomicReference<>(); for (LeaderElection participant : participants) { - String localLeader = participant.getLeader(); + String localLeader = participant.forceUpdateLeader().orElse("none"); leader.updateAndGet(currLeader -> currLeader == null ? localLeader : currLeader); Assert.assertEquals(leader.get(), localLeader); } + barrier.reset(); + /* The leader is not a leader anymore */ for (int i = 0; i < sessionCount; i++) { if (participants.get(i).isLeader()) { @@ -178,10 +251,11 @@ public void leaderElectionStressTest1() { } } + awaitBarrier(barrier); final AtomicReference newLeader = new AtomicReference<>(); for (LeaderElection participant : participants) { participant.forceUpdateLeader(); - String localLeader = participant.getLeader(); + String localLeader = participant.forceUpdateLeader().orElse("none"); newLeader.updateAndGet(currLeader -> currLeader == null ? localLeader : currLeader); Assert.assertEquals(newLeader.get(), localLeader); } @@ -254,4 +328,12 @@ public void deleteNode() { ); Assert.assertTrue(result.join().isSuccess()); } + + private static void awaitBarrier(CyclicBarrier barrier) { + try { + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } } \ No newline at end of file