Skip to content

Commit

Permalink
Modified lock watchers to halt if shutting down, but not shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 17, 2025
1 parent eae2f22 commit 3b7dd6a
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ public static class HAServiceLockWatcher implements AccumuloLockWatcher {
private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class);

private final String serviceName;
private final Supplier<Boolean> shutdownRequested;
private final Supplier<Boolean> shutdownComplete;
private volatile boolean acquiredLock = false;
private volatile boolean failedToAcquireLock = false;

public HAServiceLockWatcher(String serviceName, Supplier<Boolean> shutdownRequested) {
public HAServiceLockWatcher(String serviceName, Supplier<Boolean> shutdownComplete) {
this.serviceName = serviceName;
this.shutdownRequested = shutdownRequested;
this.shutdownComplete = shutdownComplete;
}

@Override
public void lostLock(LockLossReason reason) {
if (shutdownRequested.get()) {
LOG.warn("{} lost lock (reason = {}), not exiting because shutdown requested.", serviceName,
reason);
if (shutdownComplete.get()) {
LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.",
serviceName, reason);
} else {
Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
}
Expand Down Expand Up @@ -129,28 +129,24 @@ public static class ServiceLockWatcher implements LockWatcher {
private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class);

private final String serviceName;
private final Supplier<Boolean> shuttingDown;
private final Supplier<Boolean> shutdownComplete;
private final Consumer<String> lostLockAction;

public ServiceLockWatcher(String serviceName, Supplier<Boolean> shuttingDown,
Supplier<Boolean> shutdownComplete, Consumer<String> lostLockAction) {
public ServiceLockWatcher(String serviceName, Supplier<Boolean> shutdownComplete,
Consumer<String> lostLockAction) {
this.serviceName = serviceName;
this.shuttingDown = shuttingDown;
this.shutdownComplete = shutdownComplete;
this.lostLockAction = lostLockAction;
}

@Override
public void lostLock(final LockLossReason reason) {
if (shuttingDown.get() && shutdownComplete.get()) {
LOG.warn("{} lost lock (reason = {}), not exiting because shutdown requested.", serviceName,
reason);
if (shutdownComplete.get()) {
LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.",
serviceName, reason);
} else {
Halt.halt(1, () -> {
if (!shuttingDown.get()) {
LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason);
}
LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason);
lostLockAction.accept(serviceName);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class AbstractServer
private volatile Thread serverThread;
private volatile Thread verificationThread;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final AtomicBoolean shutdownComplete = new AtomicBoolean(false);

protected AbstractServer(String appName, ServerOpts opts, String[] args) {
this.log = LoggerFactory.getLogger(getClass().getName());
Expand Down Expand Up @@ -134,6 +135,10 @@ public boolean isShutdownRequested() {
return shutdownRequested.get();
}

public AtomicBoolean getShutdownComplete() {
return shutdownComplete;
}

/**
* Run this server in a main thread. The server's run method should set up the server, then wait
* on isShutdownRequested() to return false, like so:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ protected void getCoordinatorLock(HostAndPort clientAddress)
coordinatorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
ServiceLock.path(lockPath), zooLockUUID);
HAServiceLockWatcher coordinatorLockWatcher =
new HAServiceLockWatcher("coordinator", () -> isShutdownRequested());
new HAServiceLockWatcher("coordinator", () -> getShutdownComplete().get());
while (true) {

coordinatorLock.lock(coordinatorLockWatcher, coordinatorClientAddress.getBytes(UTF_8));
Expand Down Expand Up @@ -341,7 +341,14 @@ public void run() {
if (coordinatorAddress.server != null) {
coordinatorAddress.server.stop();
}
getShutdownComplete().set(true);
LOG.info("stop requested. exiting ... ");
try {
coordinatorLock.unlock();
} catch (Exception e) {
LOG.warn("Failed to release Coordinator lock", e);
}

}

private Map<String,List<HostAndPort>> getIdleCompactors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class TestCoordinator extends CompactionCoordinator implements ServerProc
private final ServerAddress client;
private final TabletClientService.Client tabletServerClient;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shutdownComplete = new AtomicBoolean(false);

private Set<ExternalCompactionId> metadataCompactionIds = null;

Expand Down Expand Up @@ -133,6 +134,10 @@ public boolean isShutdownRequested() {
return shutdown.get();
}

public AtomicBoolean getShutdownComplete() {
return shutdownComplete;
}

@Override
protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ public String getQueueName() {
private ServerAddress compactorAddress = null;

private final AtomicBoolean compactionRunning = new AtomicBoolean(false);
private volatile boolean shutdownComplete = false;

protected Compactor(CompactorServerOpts opts, String[] args) {
super("compactor", opts, args);
Expand Down Expand Up @@ -288,8 +287,8 @@ protected void announceExistence(HostAndPort clientAddress)
compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
ServiceLock.path(zPath), compactorId);

LockWatcher lw = new ServiceLockWatcher("compactor", () -> isShutdownRequested(),
() -> shutdownComplete, (name) -> gcLogger.logGCInfo(getConfiguration()));
LockWatcher lw = new ServiceLockWatcher("compactor", () -> getShutdownComplete().get(),
(name) -> gcLogger.logGCInfo(getConfiguration()));

try {
byte[] lockContent =
Expand Down Expand Up @@ -891,8 +890,8 @@ public void run() {
}

gcLogger.logGCInfo(getConfiguration());
getShutdownComplete().set(true);
LOG.info("stop requested. exiting ... ");
shutdownComplete = true;
try {
if (null != compactorLock) {
compactorLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public class SuccessfulCompactor extends Compactor implements ServerProcessServi
private volatile boolean failedCalled = false;
private TCompactionStatusUpdate latestState = null;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shutdownComplete = new AtomicBoolean(false);

SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address, TExternalCompactionJob job,
ServerContext context, ExternalCompactionId eci, CompactorServerOpts compactorServerOpts) {
Expand Down Expand Up @@ -245,6 +246,10 @@ public boolean isShutdownRequested() {
return shutdown.get();
}

public AtomicBoolean getShutdownComplete() {
return shutdownComplete;
}

@Override
protected synchronized void checkIfCanceled() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,14 @@ public void run() {
gracefulShutdown(getContext().rpcCreds());
}
}
getShutdownComplete().set(true);
log.info("stop requested. exiting ... ");
try {
gcLock.unlock();
} catch (Exception e) {
log.warn("Failed to release GarbageCollector lock", e);
}

}

private void incrementStatsForRun(GCRun gcRun) {
Expand Down Expand Up @@ -375,7 +382,7 @@ private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedExc
UUID zooLockUUID = UUID.randomUUID();
gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID);
HAServiceLockWatcher gcLockWatcher =
new HAServiceLockWatcher("gc", () -> isShutdownRequested());
new HAServiceLockWatcher("gc", () -> getShutdownComplete().get());

while (true) {
gcLock.lock(gcLockWatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,13 @@ boolean canSuspendTablets() {
throw new IllegalStateException("Exception waiting on watcher", e);
}
}
getShutdownComplete().set(true);
log.info("stop requested. exiting ... ");
try {
managerLock.unlock();
} catch (Exception e) {
log.warn("Failed to release Manager lock", e);
}
}

@Deprecated
Expand Down Expand Up @@ -1656,7 +1662,7 @@ private void getManagerLock(final ServiceLockPath zManagerLoc)
UUID zooLockUUID = UUID.randomUUID();
managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
HAServiceLockWatcher managerLockWatcher =
new HAServiceLockWatcher("manager", () -> isShutdownRequested());
new HAServiceLockWatcher("manager", () -> getShutdownComplete().get());

while (true) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ private TabletMetadataLoader(Ample ample) {
protected TabletServerScanMetrics scanMetrics;
private ScanServerMetrics scanServerMetrics;
private BlockCacheMetrics blockCacheMetrics;
private volatile boolean shutdownComplete = false;

private final ZooCache managerLockCache;

Expand Down Expand Up @@ -349,8 +348,8 @@ private ServiceLock announceExistence() {

serverLockUUID = UUID.randomUUID();
scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID);
LockWatcher lw = new ServiceLockWatcher("scan server", () -> isShutdownRequested(),
() -> shutdownComplete, (name) -> gcLogger.logGCInfo(getConfiguration()));
LockWatcher lw = new ServiceLockWatcher("scan server", () -> getShutdownComplete().get(),
(name) -> gcLogger.logGCInfo(getConfiguration()));

// Don't use the normal ServerServices lock content, instead put the server UUID here.
byte[] lockContent = (serverLockUUID.toString() + "," + groupName).getBytes(UTF_8);
Expand Down Expand Up @@ -448,8 +447,8 @@ public void run() {
}

gcLogger.logGCInfo(getConfiguration());
getShutdownComplete().set(true);
LOG.info("stop requested. exiting ... ");
shutdownComplete = true;
try {
if (null != lock) {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ public TabletServerMinCMetrics getMinCMetrics() {
private final ZooAuthenticationKeyWatcher authKeyWatcher;
private final WalStateManager walMarker;
private final ServerContext context;
private volatile boolean shutdownComplete = false;

public static void main(String[] args) throws Exception {
try (TabletServer tserver = new TabletServer(new ServerOpts(), args)) {
Expand Down Expand Up @@ -688,8 +687,8 @@ private void announceExistence() {

tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID());

LockWatcher lw = new ServiceLockWatcher("tablet server", () -> isShutdownRequested(),
() -> shutdownComplete, (name) -> gcLogger.logGCInfo(getConfiguration()));
LockWatcher lw = new ServiceLockWatcher("tablet server", () -> getShutdownComplete().get(),
(name) -> gcLogger.logGCInfo(getConfiguration()));

byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT)
.toString().getBytes(UTF_8);
Expand Down Expand Up @@ -1022,9 +1021,8 @@ public void run() {

gcLogger.logGCInfo(getConfiguration());

getShutdownComplete().set(true);
log.info("TServerInfo: stop requested. exiting ... ");

shutdownComplete = true;
try {
tabletServerLock.unlock();
} catch (Exception e) {
Expand Down

0 comments on commit 3b7dd6a

Please sign in to comment.