Skip to content

Commit

Permalink
update: LeaderElection observers
Browse files Browse the repository at this point in the history
  • Loading branch information
piece-of-tart committed Nov 15, 2023
1 parent 6226aa7 commit c65ca92
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;

import javax.annotation.Nonnull;

Expand All @@ -29,27 +29,29 @@ public class LeaderElection implements AutoCloseable {
private final AtomicBoolean isElecting = new AtomicBoolean(true);
private CompletableFuture<SemaphoreLease> acquireFuture;
private CompletableFuture<Session> describeFuture;
private final Queue<Runnable> afterAcquireFuture;
private final Consumer<LeaderElection> takeLeadershipObserver;
private final Consumer<LeaderElection> changeLeaderObserver;
private CompletableFuture<SemaphoreChangedEvent> changedEventFuture;
private final CoordinationSession session;
private final String name;
private final byte[] data;

private LeaderElection(CoordinationSession session, String name, byte[] data) {
private LeaderElection(CoordinationSession session, String name, byte[] data,
Consumer<LeaderElection> takeLeadershipObserver,
Consumer<LeaderElection> changeLeaderObserver) {
this.session = session;
this.name = name;
this.data = data;
afterAcquireFuture = new ArrayDeque<>();
this.takeLeadershipObserver = takeLeadershipObserver;
this.changeLeaderObserver = changeLeaderObserver;
initializeAcquireFuture();
recursiveAcquire();
recursiveDescribe();
}

private void initializeAcquireFuture() {
acquireFuture = new CompletableFuture<>();
for (final Runnable r : afterAcquireFuture) {
acquireFuture.thenRun(r);
}
acquireFuture.thenRun(() -> takeLeadershipObserver.accept(this));
}

@Nonnull
Expand All @@ -75,24 +77,14 @@ private static RuntimeException getSemaphoreWatcherException(Result<SemaphoreWat
* @param endpoint - Leader's identifier. All participants see leader's endpoint
* @param semaphoreName - All participants try to acquire the same semaphore. This semaphore will be deleted after
* election, hence you shouldn't create semaphore with this name before election.
* @return Completable future with leader election participant class.
* @return LeaderElectionBuilder where you can add event's observer
*/
public static CompletableFuture<LeaderElection> joinElectionAsync(CoordinationClient client, String fullPath,
public static LeaderElectionBuilder joinElection(CoordinationClient client, String fullPath,
String endpoint, String semaphoreName) {
return client.createSession(fullPath)
.thenApply(session ->
new LeaderElection(session, semaphoreName, endpoint.getBytes(StandardCharsets.UTF_8)));
}

/**
* {@link LeaderElection#joinElectionAsync(CoordinationClient, String, String, String)}
*/
public static LeaderElection joinElection(CoordinationClient client, String fullPath, String endpoint,
String semaphoreName) {
return joinElectionAsync(client, fullPath, endpoint, semaphoreName).join();
return new LeaderElectionBuilder(client, fullPath, endpoint, semaphoreName);
}

private CompletableFuture<SemaphoreChangedEvent> recursiveDescribeDetail(CoordinationSession session, String name) {
private CompletableFuture<SemaphoreChangedEvent> recursiveDescribeDetail() {
return session.describeAndWatchSemaphore(name,
DescribeSemaphoreMode.WITH_OWNERS,
WatchSemaphoreMode.WATCH_OWNERS)
Expand Down Expand Up @@ -124,7 +116,8 @@ private CompletableFuture<SemaphoreChangedEvent> recursiveDescribeDetail(Coordin

private void recursiveDescribe() {
describeFuture = new CompletableFuture<>();
changedEventFuture = recursiveDescribeDetail(session, name);
describeFuture.thenRun(() -> changeLeaderObserver.accept(this));
changedEventFuture = recursiveDescribeDetail();
changedEventFuture.whenComplete((semaphoreChangedEvent, th) -> {
if (semaphoreChangedEvent != null && th == null && isElecting.get()) {
recursiveDescribe();
Expand Down Expand Up @@ -199,18 +192,6 @@ public synchronized void interruptLeadership() {
interruptLeadershipAsync().join();
}

/**
* Attention: using this method multiple times save all previous runnable arguments,
* and all of them will be executed.
* @param runnable - after acquiring leadership, execute this functional interface
*/
public synchronized void whenTakeLead(final Runnable runnable) {
afterAcquireFuture.add(runnable);
if (acquireFuture != null && !acquireFuture.isDone()) {
acquireFuture.thenRun(runnable);
}
}

/**
* @return true, if you are a leader at this moment otherwise false
*/
Expand All @@ -230,4 +211,69 @@ public void close() {
session.close();
}
}

public static class LeaderElectionBuilder {
private final CoordinationClient client;
private final String fullPath;
private final String endpoint;
private final String semaphoreName;
private Consumer<LeaderElection> takeLeadershipObserver;
private Consumer<LeaderElection> leaderChangeObserver;

LeaderElectionBuilder(CoordinationClient client, String fullPath, String endpoint,
String semaphoreName) {
this.client = client;
this.fullPath = fullPath;
this.endpoint = endpoint;
this.semaphoreName = semaphoreName;
}

/**
* Add observer for taking leadership. It will be called when your participant becomes a leader
* @param takeLeadershipObserver - callback for taking leadership observing
* @return LeaderElectionBuilder
*/
public LeaderElectionBuilder withTakeLeadershipObserver(Consumer<LeaderElection> takeLeadershipObserver) {
this.takeLeadershipObserver = takeLeadershipObserver;
return this;
}

/**
* Add observer for changing leader. It will be called when participant joins the election,
* when participant is notified about leader change
* when participant calls forceUpdateLeader
* @param leaderChangeObserver - callback for leader change observing
* @return LeaderElectionBuilder
*/
public LeaderElectionBuilder withChangeLeaderObserver(Consumer<LeaderElection> leaderChangeObserver) {
this.leaderChangeObserver = leaderChangeObserver;
return this;
}

/**
* Build Leader Election participant in asynchronous way
* @return Completable future with Leader Election
*/
public CompletableFuture<LeaderElection> buildAsync() {
return client.createSession(fullPath)
.thenApply(session ->
new LeaderElection(
session,
semaphoreName,
endpoint.getBytes(StandardCharsets.UTF_8),
takeLeadershipObserver == null ? Function.identity()::apply :
takeLeadershipObserver,
leaderChangeObserver == null ? Function.identity()::apply : leaderChangeObserver
)
);
}

/**
* {@link LeaderElectionBuilder#buildAsync()}
*/
public LeaderElection build() {
return buildAsync().join();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,40 +49,47 @@ public void createNode() {
public void leaderElectionBaseTest() {
final String semaphoreName = SEMAPHORE_PREFIX + 1_000_001;

final AtomicReference<String> leader = new AtomicReference<>();

try (LeaderElection participant1 = LeaderElection
.joinElection(client, path, "endpoint-1", semaphoreName);
.joinElection(client, path, "endpoint-1", semaphoreName)
.withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-1")).build();
LeaderElection participant2 = LeaderElection
.joinElection(client, path, "endpoint-2", semaphoreName)) {
final String leader = participant1.getLeader();
Assert.assertEquals(leader, participant2.getLeader());
logger.info("The first leader: " + leader);
.joinElection(client, path, "endpoint-2", semaphoreName)
.withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-2")).build()) {
String leaderFromParticipant1 = participant1.forceUpdateLeader();
String leaderFromParticipant2 = participant2.forceUpdateLeader();
Assert.assertEquals(leader.get(), leaderFromParticipant1);
Assert.assertEquals(leader.get(), leaderFromParticipant2);

logger.info("The first leader: " + leader.get());

/* Leader change observer will be call 3 times:
first - after asking election who is a leader now
second - after leader call interruptLeadership()
third - after call forceUpdateLeader() on participant3
*/
CountDownLatch counter = new CountDownLatch(3);
try (LeaderElection participant3 = LeaderElection
.joinElection(client, path, "endpoint-3", semaphoreName)) {
Assert.assertEquals(leader, participant3.getLeader());

/* The leader is not a leader anymore */
final String newLeader;
if (participant1.isLeader()) {
participant1.close();
newLeader = participant2.forceUpdateLeader();
Assert.assertEquals(newLeader, participant3.forceUpdateLeader());
Assert.assertNotEquals(newLeader, leader);
} else if (participant2.isLeader()) {
participant2.close();
newLeader = participant1.forceUpdateLeader();
Assert.assertEquals(newLeader, participant3.forceUpdateLeader());
Assert.assertNotEquals(newLeader, leader);
} else if (participant3.isLeader()) {
participant3.close();
newLeader = participant1.forceUpdateLeader();
Assert.assertEquals(newLeader, participant2.forceUpdateLeader());
Assert.assertNotEquals(newLeader, leader);
} else {
newLeader = null;
Assert.fail("None of participants is a leader.");
.joinElection(client, path, "endpoint-3", semaphoreName)
.withTakeLeadershipObserver(leaderElection -> leader.set("endpoint-3"))
.withChangeLeaderObserver(leaderElection -> counter.countDown()).build()) {
final String previousLeader = leader.get();
switch (leader.get()) {
case "endpoint-1":
participant1.interruptLeadership();
break;
case "endpoint-2":
participant2.interruptLeadership();
break;
default:
participant3.interruptLeadership();
}
logger.info("The second leader: " + newLeader);
Assert.assertNotEquals(previousLeader, leader.get());
Assert.assertEquals(leader.get(), participant1.forceUpdateLeader());
Assert.assertEquals(leader.get(), participant2.forceUpdateLeader());
Assert.assertEquals(leader.get(), participant3.forceUpdateLeader());
Assert.assertTrue(counter.await(20_000, TimeUnit.MILLISECONDS));
} catch (Exception e) {
Assert.fail("Exception in leader election test.");
}
Expand All @@ -94,8 +101,8 @@ public void leaderElectionBaseTest() {
@Test(timeout = 20_000)
public void leaderElectionBaseTest2() {
final String name = "leader-election-base-test-2";
try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name);
LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name)
try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name).build();
LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name).build()
) {
String firstLeader = participant1.getLeader();
if (firstLeader.equals("endpoint-1")) {
Expand All @@ -113,18 +120,18 @@ public void leaderElectionBaseTest2() {
@Test(timeout = 20_000)
public void leaderElectionBaseTest3() {
final String name = "leader-election-base-test-3";
try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name);
CountDownLatch counter = new CountDownLatch(2);
try (LeaderElection participant1 = LeaderElection.joinElection(client, path, "endpoint-1", name)
.withTakeLeadershipObserver(leaderElection -> {
logger.info("Endpoint-1 is a leader now!");
counter.countDown();
}).build();
LeaderElection participant2 = LeaderElection.joinElection(client, path, "endpoint-2", name)
.withTakeLeadershipObserver(leaderElection -> {
logger.info("Endpoint-2 is a leader now!");
counter.countDown();
}).build()
) {
CountDownLatch counter = new CountDownLatch(2);
participant1.whenTakeLead(() -> {
logger.info("Endpoint-1 is a leader now!");
counter.countDown();
});
participant2.whenTakeLead(() -> {
logger.info("Endpoint-2 is a leader now!");
counter.countDown();
});
if (participant1.isLeader()) {
participant1.interruptLeadership();
} else {
Expand Down Expand Up @@ -153,7 +160,7 @@ public void leaderElectionStressTest1() {
final String semaphoreName = SEMAPHORE_PREFIX + 1_000_000;

List<LeaderElection> participants = IntStream.range(0, sessionCount).mapToObj(id -> LeaderElection
.joinElection(client, path, "endpoint-" + id, semaphoreName))
.joinElection(client, path, "endpoint-" + id, semaphoreName).build())
.collect(Collectors.toList());

final AtomicReference<String> leader = new AtomicReference<>();
Expand Down

0 comments on commit c65ca92

Please sign in to comment.