Skip to content

Commit

Permalink
Moved getLock() to AbstractServer, added ServiceLock verification thread
Browse files Browse the repository at this point in the history
Moved getLock() from TabletHostingServer to AbstractServer. Added method
to ServiceLock to verfify the lock is being held in ZooKeeper versus
relying on the Watcher. Added method to start verification thread in
AbstractServer, which is called from the Manager and TabletServer.

Closes apache#5132
  • Loading branch information
dlmarion committed Dec 5, 2024
1 parent 355b0d7 commit 27712b8
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public enum Property {
+ " was changed and it now can accept multiple class names. The metrics spi was introduced in 2.1.3,"
+ " the deprecated factory is org.apache.accumulo.core.metrics.MeterRegistryFactory.",
"2.1.0"),
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "2m",
PropertyType.TIMEDURATION,
"Interval at which the Manager and TabletServer should verify their server locks. A value of zero"
+ " disables this check.",
"2.1.4"),
// properties that are specific to manager server behavior
MANAGER_PREFIX("manager.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the manager server. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,4 +762,26 @@ public static boolean deleteLock(ZooReaderWriter zk, ServiceLockPath path, Strin

return false;
}

/**
* Checks that the lock still exists in ZooKeeper. The typical mechanism for determining if a lock
* is lost depends on a Watcher set on the lock node. There exists a case where the Watcher may
* not get called if another Watcher is stuck waiting on I/O or otherwise hung. In the case where
* this method returns false, then the typical action is to exit the server process.
*
* @return true if lock path still exists, false otherwise and on error
*/
public boolean verifyLockAtSource() {
if (getLockPath() == null) {
// lock not set yet
return false;
}
try {
return null != this.zooKeeper.exists(getLockPath(), false);
} catch (KeeperException | InterruptedException e) {
LOG.error("Error verfiying lock at {}", getLockPath(), e);
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
package org.apache.accumulo.server;

import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.metrics.ProcessMetrics;
import org.apache.accumulo.server.security.SecurityUtil;
import org.slf4j.Logger;
Expand All @@ -44,6 +48,8 @@ public abstract class AbstractServer implements AutoCloseable, MetricsProducer,
private final ProcessMetrics processMetrics;
protected final long idleReportingPeriodNanos;
private volatile long idlePeriodStartNanos = 0L;
private volatile Thread serverThread;
private volatile Thread verificationThread;

protected AbstractServer(String appName, ServerOpts opts, String[] args) {
this.log = LoggerFactory.getLogger(getClass().getName());
Expand Down Expand Up @@ -99,10 +105,14 @@ protected void updateIdleStatus(boolean isIdle) {
*/
public void runServer() throws Exception {
final AtomicReference<Throwable> err = new AtomicReference<>();
Thread service = new Thread(TraceUtil.wrap(this), applicationName);
service.setUncaughtExceptionHandler((thread, exception) -> err.set(exception));
service.start();
service.join();
serverThread = new Thread(TraceUtil.wrap(this), applicationName);
serverThread.setUncaughtExceptionHandler((thread, exception) -> err.set(exception));
serverThread.start();
serverThread.join();
if (verificationThread != null) {
verificationThread.interrupt();
verificationThread.join();
}
Throwable thrown = err.get();
if (thrown != null) {
if (thrown instanceof Error) {
Expand Down Expand Up @@ -139,6 +149,41 @@ public String getApplicationName() {
return applicationName;
}

/**
* Get the ServiceLock for this server process. May return null if called before the lock is
* acquired.
*
* @return lock ServiceLock or null
*/
public abstract ServiceLock getLock();

public void startServiceLockVerificationThread() {
final long interval =
getConfiguration().getTimeInMillis(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL);
if (interval > 0) {
final Thread lockVerificationThread = Threads.createThread("service-lock-verification-thread",
OptionalInt.of(Thread.NORM_PRIORITY + 1), () -> {
while (true && serverThread.isAlive()) {
ServiceLock lock = getLock();
try {
if (lock != null && !lock.verifyLockAtSource()) {
Halt.halt("Lock verification thread could not find lock", -1);
}
// Need to sleep, not yield when the thread priority is greater than NORM_PRIORITY
// so that this thread does not get immediately rescheduled.
Thread.sleep(interval);
} catch (InterruptedException e) {
if (serverThread.isAlive()) {
// throw an Error, which will cause this process to be terminated
throw new Error("Sleep interrupted in ServiceLock verification thread");
}
}
}
});
lockVerificationThread.start();
}
}

@Override
public void close() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,11 @@ private void cleanUpCompactors() {
}
}

@Override
public ServiceLock getLock() {
return coordinatorLock;
}

public static void main(String[] args) throws Exception {
try (CompactionCoordinator compactor = new CompactionCoordinator(new ServerOpts(), args)) {
compactor.runServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,4 +987,10 @@ public String getRunningCompactionId(TInfo tinfo, TCredentials credentials)
return eci.canonical();
}
}

@Override
public ServiceLock getLock() {
return compactorLock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface {
new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());

private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics();
private ServiceLock gcLock;

SimpleGarbageCollector(ServerOpts opts, String[] args) {
super("gc", opts, args);
Expand Down Expand Up @@ -379,10 +380,9 @@ public void unableToMonitorLockNode(final Exception e) {
};

UUID zooLockUUID = UUID.randomUUID();
ServiceLock lock =
new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID);
gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID);
while (true) {
if (lock.tryLock(lockWatcher,
if (gcLock.tryLock(lockWatcher,
new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8))) {
log.debug("Got GC ZooKeeper lock");
return;
Expand Down Expand Up @@ -439,4 +439,9 @@ public GcCycleMetrics getGcCycleMetrics() {
return gcCycleMetrics;
}

@Override
public ServiceLock getLock() {
return gcLock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1714,6 +1714,7 @@ private void getManagerLock(final ServiceLockPath zManagerLoc)
managerLockWatcher.waitForChange();

if (managerLockWatcher.acquiredLock) {
startServiceLockVerificationThread();
break;
}

Expand Down Expand Up @@ -2001,4 +2002,10 @@ void getAssignments(SortedMap<TServerInstance,TabletServerStatus> currentStatus,
assignedOut);
tabletBalancer.getAssignments(params);
}

@Override
public ServiceLock getLock() {
return managerLock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1048,4 +1048,9 @@ public Optional<HostAndPort> getCoordinatorHost() {
public int getLivePort() {
return livePort;
}

@Override
public ServiceLock getLock() {
return monitorLock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ public void unableToMonitorLockNode(final Exception e) {
lockSessionId = tabletServerLock.getSessionId();
log.debug("Obtained tablet server lock {} {}", tabletServerLock.getLockPath(),
getTabletSession());
startServiceLockVerificationThread();
return;
}
log.info("Waiting for tablet server lock");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public void testDeleteParent() throws Exception {
ServiceLock zl = getZooLock(parent, UUID.randomUUID());

assertFalse(zl.isLocked());
assertFalse(zl.verifyLockAtSource());

ZooReaderWriter zk = szk.getZooReaderWriter();

Expand All @@ -235,10 +236,12 @@ public void testDeleteParent() throws Exception {

assertTrue(lw.locked);
assertTrue(zl.isLocked());
assertTrue(zl.verifyLockAtSource());
assertNull(lw.exception);
assertNull(lw.reason);

zl.unlock();
assertFalse(zl.verifyLockAtSource());
}

@Test
Expand All @@ -250,6 +253,7 @@ public void testNoParent() throws Exception {
ServiceLock zl = getZooLock(parent, UUID.randomUUID());

assertFalse(zl.isLocked());
assertFalse(zl.verifyLockAtSource());

TestALW lw = new TestALW();

Expand All @@ -259,6 +263,7 @@ public void testNoParent() throws Exception {

assertFalse(lw.locked);
assertFalse(zl.isLocked());
assertFalse(zl.verifyLockAtSource());
assertNotNull(lw.exception);
assertNull(lw.reason);
}
Expand All @@ -275,6 +280,7 @@ public void testDeleteLock() throws Exception {
ServiceLock zl = getZooLock(parent, UUID.randomUUID());

assertFalse(zl.isLocked());
assertFalse(zl.verifyLockAtSource());

TestALW lw = new TestALW();

Expand All @@ -284,6 +290,7 @@ public void testDeleteLock() throws Exception {

assertTrue(lw.locked);
assertTrue(zl.isLocked());
assertTrue(zl.verifyLockAtSource());
assertNull(lw.exception);
assertNull(lw.reason);

Expand All @@ -293,7 +300,7 @@ public void testDeleteLock() throws Exception {

assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
assertNull(lw.exception);

assertFalse(zl.verifyLockAtSource());
}

@Test
Expand All @@ -308,6 +315,7 @@ public void testDeleteWaiting() throws Exception {
ServiceLock zl = getZooLock(parent, UUID.randomUUID());

assertFalse(zl.isLocked());
assertFalse(zl.verifyLockAtSource());

TestALW lw = new TestALW();

Expand All @@ -317,6 +325,7 @@ public void testDeleteWaiting() throws Exception {

assertTrue(lw.locked);
assertTrue(zl.isLocked());
assertTrue(zl.verifyLockAtSource());
assertNull(lw.exception);
assertNull(lw.reason);

Expand All @@ -328,6 +337,7 @@ public void testDeleteWaiting() throws Exception {

assertFalse(lw2.locked);
assertFalse(zl2.isLocked());
assertFalse(zl2.verifyLockAtSource());

ServiceLock zl3 = getZooLock(parent, UUID.randomUUID());

Expand Down Expand Up @@ -356,10 +366,12 @@ public void testDeleteWaiting() throws Exception {

assertTrue(lw3.locked);
assertTrue(zl3.isLocked());
assertTrue(zl3.verifyLockAtSource());
assertNull(lw3.exception);
assertNull(lw3.reason);

zl3.unlock();
assertFalse(zl3.verifyLockAtSource());

}

Expand All @@ -380,6 +392,7 @@ public void testUnexpectedEvent() throws Exception {
ServiceLock zl = getZooLock(parent, UUID.randomUUID());

assertFalse(zl.isLocked());
assertFalse(zl.verifyLockAtSource());

// would not expect data to be set on this node, but it should not cause problems.....
zk.setData(parent.toString(), "foo".getBytes(UTF_8), -1);
Expand All @@ -392,6 +405,7 @@ public void testUnexpectedEvent() throws Exception {

assertTrue(lw.locked);
assertTrue(zl.isLocked());
assertTrue(zl.verifyLockAtSource());
assertNull(lw.exception);
assertNull(lw.reason);

Expand All @@ -404,6 +418,7 @@ public void testUnexpectedEvent() throws Exception {

assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
assertNull(lw.exception);
assertFalse(zl.verifyLockAtSource());
}

}
Expand Down Expand Up @@ -476,16 +491,23 @@ public void testLockSerial() throws Exception {
assertEquals("/zlretryLockSerial/zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000",
zl2.getWatching());

assertTrue(zl1.verifyLockAtSource());
assertFalse(zl2.verifyLockAtSource());

zl1.unlock();
assertFalse(zlw1.isLockHeld());
assertFalse(zl1.verifyLockAtSource());
assertFalse(zl2.verifyLockAtSource());
zk1.close();

while (!zlw2.isLockHeld()) {
LockSupport.parkNanos(50);
}

assertTrue(zlw2.isLockHeld());
assertTrue(zl2.verifyLockAtSource());
zl2.unlock();
assertFalse(zl2.verifyLockAtSource());
}

}
Expand Down

0 comments on commit 27712b8

Please sign in to comment.