From c65ca92d5881287f7478a9aeb3ab900ef6b54a57 Mon Sep 17 00:00:00 2001 From: piece-of-tart Date: Thu, 16 Nov 2023 01:39:29 +0300 Subject: [PATCH] update: LeaderElection observers --- .../leader_election/LeaderElection.java | 116 ++++++++++++------ .../LeaderElectionScenarioTest.java | 91 +++++++------- 2 files changed, 130 insertions(+), 77 deletions(-) diff --git a/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java b/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java index f17212d44..d4dcc7713 100644 --- a/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java +++ b/coordination/src/main/java/tech/ydb/coordination/scenario/leader_election/LeaderElection.java @@ -2,10 +2,10 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayDeque; -import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; import javax.annotation.Nonnull; @@ -29,17 +29,21 @@ public class LeaderElection implements AutoCloseable { private final AtomicBoolean isElecting = new AtomicBoolean(true); private CompletableFuture acquireFuture; private CompletableFuture describeFuture; - private final Queue afterAcquireFuture; + private final Consumer takeLeadershipObserver; + private final Consumer changeLeaderObserver; private CompletableFuture changedEventFuture; private final CoordinationSession session; private final String name; private final byte[] data; - private LeaderElection(CoordinationSession session, String name, byte[] data) { + private LeaderElection(CoordinationSession session, String name, byte[] data, + Consumer takeLeadershipObserver, + Consumer changeLeaderObserver) { this.session = session; this.name = name; this.data = data; - afterAcquireFuture = new ArrayDeque<>(); + this.takeLeadershipObserver = takeLeadershipObserver; + this.changeLeaderObserver = changeLeaderObserver; initializeAcquireFuture(); recursiveAcquire(); recursiveDescribe(); @@ -47,9 +51,7 @@ private LeaderElection(CoordinationSession session, String name, byte[] data) { private void initializeAcquireFuture() { acquireFuture = new CompletableFuture<>(); - for (final Runnable r : afterAcquireFuture) { - acquireFuture.thenRun(r); - } + acquireFuture.thenRun(() -> takeLeadershipObserver.accept(this)); } @Nonnull @@ -75,24 +77,14 @@ private static RuntimeException getSemaphoreWatcherException(Result joinElectionAsync(CoordinationClient client, String fullPath, + public static LeaderElectionBuilder joinElection(CoordinationClient client, String fullPath, String endpoint, String semaphoreName) { - return client.createSession(fullPath) - .thenApply(session -> - new LeaderElection(session, semaphoreName, endpoint.getBytes(StandardCharsets.UTF_8))); - } - - /** - * {@link LeaderElection#joinElectionAsync(CoordinationClient, String, String, String)} - */ - public static LeaderElection joinElection(CoordinationClient client, String fullPath, String endpoint, - String semaphoreName) { - return joinElectionAsync(client, fullPath, endpoint, semaphoreName).join(); + return new LeaderElectionBuilder(client, fullPath, endpoint, semaphoreName); } - private CompletableFuture recursiveDescribeDetail(CoordinationSession session, String name) { + private CompletableFuture recursiveDescribeDetail() { return session.describeAndWatchSemaphore(name, DescribeSemaphoreMode.WITH_OWNERS, WatchSemaphoreMode.WATCH_OWNERS) @@ -124,7 +116,8 @@ private CompletableFuture recursiveDescribeDetail(Coordin private void recursiveDescribe() { describeFuture = new CompletableFuture<>(); - changedEventFuture = recursiveDescribeDetail(session, name); + describeFuture.thenRun(() -> changeLeaderObserver.accept(this)); + changedEventFuture = recursiveDescribeDetail(); changedEventFuture.whenComplete((semaphoreChangedEvent, th) -> { if (semaphoreChangedEvent != null && th == null && isElecting.get()) { recursiveDescribe(); @@ -199,18 +192,6 @@ public synchronized void interruptLeadership() { interruptLeadershipAsync().join(); } - /** - * Attention: using this method multiple times save all previous runnable arguments, - * and all of them will be executed. - * @param runnable - after acquiring leadership, execute this functional interface - */ - public synchronized void whenTakeLead(final Runnable runnable) { - afterAcquireFuture.add(runnable); - if (acquireFuture != null && !acquireFuture.isDone()) { - acquireFuture.thenRun(runnable); - } - } - /** * @return true, if you are a leader at this moment otherwise false */ @@ -230,4 +211,69 @@ public void close() { session.close(); } } + + public static class LeaderElectionBuilder { + private final CoordinationClient client; + private final String fullPath; + private final String endpoint; + private final String semaphoreName; + private Consumer takeLeadershipObserver; + private Consumer leaderChangeObserver; + + LeaderElectionBuilder(CoordinationClient client, String fullPath, String endpoint, + String semaphoreName) { + this.client = client; + this.fullPath = fullPath; + this.endpoint = endpoint; + this.semaphoreName = semaphoreName; + } + + /** + * Add observer for taking leadership. It will be called when your participant becomes a leader + * @param takeLeadershipObserver - callback for taking leadership observing + * @return LeaderElectionBuilder + */ + public LeaderElectionBuilder withTakeLeadershipObserver(Consumer takeLeadershipObserver) { + this.takeLeadershipObserver = takeLeadershipObserver; + return this; + } + + /** + * Add observer for changing leader. It will be called when participant joins the election, + * when participant is notified about leader change + * when participant calls forceUpdateLeader + * @param leaderChangeObserver - callback for leader change observing + * @return LeaderElectionBuilder + */ + public LeaderElectionBuilder withChangeLeaderObserver(Consumer leaderChangeObserver) { + this.leaderChangeObserver = leaderChangeObserver; + return this; + } + + /** + * Build Leader Election participant in asynchronous way + * @return Completable future with Leader Election + */ + public CompletableFuture buildAsync() { + return client.createSession(fullPath) + .thenApply(session -> + new LeaderElection( + session, + semaphoreName, + endpoint.getBytes(StandardCharsets.UTF_8), + takeLeadershipObserver == null ? Function.identity()::apply : + takeLeadershipObserver, + leaderChangeObserver == null ? Function.identity()::apply : leaderChangeObserver + ) + ); + } + + /** + * {@link LeaderElectionBuilder#buildAsync()} + */ + public LeaderElection build() { + return buildAsync().join(); + } + + } } diff --git a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java index a8d0f51cf..6ca5b010c 100644 --- a/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/LeaderElectionScenarioTest.java @@ -49,40 +49,47 @@ public void createNode() { public void leaderElectionBaseTest() { final String semaphoreName = SEMAPHORE_PREFIX + 1_000_001; + final AtomicReference leader = new AtomicReference<>(); + try (LeaderElection participant1 = LeaderElection - .joinElection(client, path, "endpoint-1", semaphoreName); + .joinElection(client, path, "endpoint-1", semaphoreName) + .withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-1")).build(); LeaderElection participant2 = LeaderElection - .joinElection(client, path, "endpoint-2", semaphoreName)) { - final String leader = participant1.getLeader(); - Assert.assertEquals(leader, participant2.getLeader()); - logger.info("The first leader: " + leader); + .joinElection(client, path, "endpoint-2", semaphoreName) + .withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-2")).build()) { + String leaderFromParticipant1 = participant1.forceUpdateLeader(); + String leaderFromParticipant2 = participant2.forceUpdateLeader(); + Assert.assertEquals(leader.get(), leaderFromParticipant1); + Assert.assertEquals(leader.get(), leaderFromParticipant2); + logger.info("The first leader: " + leader.get()); + + /* Leader change observer will be call 3 times: + first - after asking election who is a leader now + second - after leader call interruptLeadership() + third - after call forceUpdateLeader() on participant3 + */ + CountDownLatch counter = new CountDownLatch(3); try (LeaderElection participant3 = LeaderElection - .joinElection(client, path, "endpoint-3", semaphoreName)) { - Assert.assertEquals(leader, participant3.getLeader()); - - /* The leader is not a leader anymore */ - final String newLeader; - if (participant1.isLeader()) { - participant1.close(); - newLeader = participant2.forceUpdateLeader(); - Assert.assertEquals(newLeader, participant3.forceUpdateLeader()); - Assert.assertNotEquals(newLeader, leader); - } else if (participant2.isLeader()) { - participant2.close(); - newLeader = participant1.forceUpdateLeader(); - Assert.assertEquals(newLeader, participant3.forceUpdateLeader()); - Assert.assertNotEquals(newLeader, leader); - } else if (participant3.isLeader()) { - participant3.close(); - newLeader = participant1.forceUpdateLeader(); - Assert.assertEquals(newLeader, participant2.forceUpdateLeader()); - Assert.assertNotEquals(newLeader, leader); - } else { - newLeader = null; - Assert.fail("None of participants is a leader."); + .joinElection(client, path, "endpoint-3", semaphoreName) + .withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-3")) + .withChangeLeaderObserver(leaderElection -> counter.countDown()).build()) { + final String previousLeader = leader.get(); + switch (leader.get()) { + case "endpoint-1": + participant1.interruptLeadership(); + break; + case "endpoint-2": + participant2.interruptLeadership(); + break; + default: + participant3.interruptLeadership(); } - logger.info("The second leader: " + newLeader); + Assert.assertNotEquals(previousLeader, leader.get()); + Assert.assertEquals(leader.get(), participant1.forceUpdateLeader()); + Assert.assertEquals(leader.get(), participant2.forceUpdateLeader()); + Assert.assertEquals(leader.get(), participant3.forceUpdateLeader()); + Assert.assertTrue(counter.await(20_000, TimeUnit.MILLISECONDS)); } catch (Exception e) { Assert.fail("Exception in leader election test."); } @@ -94,8 +101,8 @@ public void leaderElectionBaseTest() { @Test(timeout = 20_000) public void leaderElectionBaseTest2() { final String name = "leader-election-base-test-2"; - try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name); - LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name) + try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name).build(); + LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name).build() ) { String firstLeader = participant1.getLeader(); if (firstLeader.equals("endpoint-1")) { @@ -113,18 +120,18 @@ public void leaderElectionBaseTest2() { @Test(timeout = 20_000) public void leaderElectionBaseTest3() { final String name = "leader-election-base-test-3"; - try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name); + CountDownLatch counter = new CountDownLatch(2); + try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name) + .withTakeLeadershipObserver(leaderElection -> { + logger.info("Endpoint-1 is a leader now!"); + counter.countDown(); + }).build(); LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name) + .withTakeLeadershipObserver(leaderElection -> { + logger.info("Endpoint-2 is a leader now!"); + counter.countDown(); + }).build() ) { - CountDownLatch counter = new CountDownLatch(2); - participant1.whenTakeLead(() -> { - logger.info("Endpoint-1 is a leader now!"); - counter.countDown(); - }); - participant2.whenTakeLead(() -> { - logger.info("Endpoint-2 is a leader now!"); - counter.countDown(); - }); if (participant1.isLeader()) { participant1.interruptLeadership(); } else { @@ -153,7 +160,7 @@ public void leaderElectionStressTest1() { final String semaphoreName = SEMAPHORE_PREFIX + 1_000_000; List participants = IntStream.range(0, sessionCount).mapToObj(id -> LeaderElection - .joinElection(client, path, "endpoint-" + id, semaphoreName)) + .joinElection(client, path, "endpoint-" + id, semaphoreName).build()) .collect(Collectors.toList()); final AtomicReference leader = new AtomicReference<>();