Skip to content

Commit

Permalink
Created common method for creating non-HA server zk path (apache#5225)
Browse files Browse the repository at this point in the history
For HA servers (Manager, Monitor, etc) the ZK node is created at
instance initialization time. For non-HA servers (compactor, sserver,
tserver) the paths are created when the server is started. Each
server impl had code to do this and I just moved it into a common
location.
  • Loading branch information
dlmarion authored Jan 13, 2025
1 parent e745a5d commit 34d27c9
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,45 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.util.Halt;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoAuthException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceLockSupport {

private static final Logger LOG = LoggerFactory.getLogger(ServiceLockSupport.class);

/**
* Ensures that the resource group node in ZooKeeper is created for this server
*/
public static void createNonHaServiceLockPath(Type server, ZooReaderWriter zrw,
ServiceLockPath slp) throws KeeperException, InterruptedException {
// The ServiceLockPath contains a resource group in the path which is not created
// at initialization time. If it does not exist, then create it.
String rgPath = slp.toString().substring(0, slp.toString().lastIndexOf("/" + slp.getServer()));
LOG.debug("Creating {} resource group path in zookeeper: {}", server, rgPath);
try {
zrw.mkdirs(rgPath);
zrw.putPersistentData(slp.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NOAUTH) {
LOG.error("Failed to write to ZooKeeper. Ensure that"
+ " accumulo.properties, specifically instance.secret, is consistent.");
}
throw e;
}

}

/**
* Lock Watcher used by Highly Available services. These are services where only instance is
* running at a time, but another backup service can be started that will be used if the active
Expand All @@ -40,30 +69,29 @@ public static class HAServiceLockWatcher implements AccumuloLockWatcher {

private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class);

private final String serviceName;
private final Type server;
private volatile boolean acquiredLock = false;
private volatile boolean failedToAcquireLock = false;

public HAServiceLockWatcher(String serviceName) {
this.serviceName = serviceName;
public HAServiceLockWatcher(Type server) {
this.server = server;
}

@Override
public void lostLock(LockLossReason reason) {
Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
}

@Override
public void unableToMonitorLockNode(final Exception e) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
Halt.halt(-1,
() -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e));
Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor {} lock node", server, e));

}

@Override
public synchronized void acquiredLock() {
LOG.debug("Acquired {} lock", serviceName);
LOG.debug("Acquired {} lock", server);

if (acquiredLock || failedToAcquireLock) {
Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1);
Expand All @@ -75,11 +103,11 @@ public synchronized void acquiredLock() {

@Override
public synchronized void failedToAcquireLock(Exception e) {
LOG.warn("Failed to get {} lock", serviceName, e);
LOG.warn("Failed to get {} lock", server, e);

if (e instanceof NoAuthException) {
String msg =
"Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication.";
"Failed to acquire " + server + " lock due to incorrect ZooKeeper authentication.";
LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e);
Halt.halt(msg, -1);
}
Expand All @@ -96,7 +124,7 @@ public synchronized void failedToAcquireLock(Exception e) {
public synchronized void waitForChange() {
while (!acquiredLock && !failedToAcquireLock) {
try {
LOG.info("{} lock held by someone else, waiting for a change in state", serviceName);
LOG.info("{} lock held by someone else, waiting for a change in state", server);
wait();
} catch (InterruptedException e) {
// empty
Expand All @@ -121,13 +149,13 @@ public static class ServiceLockWatcher implements LockWatcher {

private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class);

private final String serviceName;
private final Type server;
private final Supplier<Boolean> shuttingDown;
private final Consumer<String> lostLockAction;
private final Consumer<Type> lostLockAction;

public ServiceLockWatcher(String serviceName, Supplier<Boolean> shuttingDown,
Consumer<String> lostLockAction) {
this.serviceName = serviceName;
public ServiceLockWatcher(Type server, Supplier<Boolean> shuttingDown,
Consumer<Type> lostLockAction) {
this.server = server;
this.shuttingDown = shuttingDown;
this.lostLockAction = lostLockAction;
}
Expand All @@ -136,15 +164,15 @@ public ServiceLockWatcher(String serviceName, Supplier<Boolean> shuttingDown,
public void lostLock(final LockLossReason reason) {
Halt.halt(1, () -> {
if (!shuttingDown.get()) {
LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason);
LOG.error("{} lost lock (reason = {}), exiting.", server, reason);
}
lostLockAction.accept(serviceName);
lostLockAction.accept(server);
});
}

@Override
public void unableToMonitorLockNode(final Exception e) {
Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e));
Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", server, e));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
Expand Down Expand Up @@ -80,6 +81,7 @@
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.lock.ServiceLockSupport;
import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
Expand Down Expand Up @@ -272,27 +274,13 @@ protected void checkIfCanceled() {
protected void announceExistence(HostAndPort clientAddress)
throws KeeperException, InterruptedException {

ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();

final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
final ServiceLockPath path =
getContext().getServerPaths().createCompactorPath(getResourceGroup(), clientAddress);
// The ServiceLockPath contains a resource group in the path which is not created
// at initialization time. If it does not exist, then create it.
final String compactorGroupPath =
path.toString().substring(0, path.toString().lastIndexOf("/" + path.getServer()));
LOG.debug("Creating compactor resource group path in zookeeper: {}", compactorGroupPath);
try {
zoo.mkdirs(compactorGroupPath);
zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
} catch (KeeperException.NoAuthException e) {
LOG.error("Failed to write to ZooKeeper. Ensure that"
+ " accumulo.properties, specifically instance.secret, is consistent.");
throw e;
}

ServiceLockSupport.createNonHaServiceLockPath(Type.COMPACTOR, zoo, path);
compactorLock = new ServiceLock(getContext().getZooSession(), path, compactorId);
LockWatcher lw = new ServiceLockWatcher("compactor", () -> false,
(name) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));
LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> false,
(type) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));

try {
for (int i = 0; i < 25; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -370,7 +371,7 @@ private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedExc

UUID zooLockUUID = UUID.randomUUID();
gcLock = new ServiceLock(getContext().getZooSession(), path, zooLockUUID);
HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc");
HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher(Type.GARBAGE_COLLECTOR);

while (true) {
gcLock.lock(gcLockWatcher, new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
Expand Down Expand Up @@ -1499,14 +1500,12 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc)
ServiceDescriptors descriptors = new ServiceDescriptors();
descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.MANAGER,
managerClientAddress, this.getResourceGroup()));

ServiceLockData sld = new ServiceLockData(descriptors);

managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher(Type.MANAGER);

while (true) {

HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher("manager");
managerLock.lock(managerLockWatcher, sld);

managerLockWatcher.waitForChange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
Expand Down Expand Up @@ -741,7 +742,7 @@ private void getMonitorLock(HostAndPort monitorLocation)
// Get a ZooLock for the monitor
UUID zooLockUUID = UUID.randomUUID();
monitorLock = new ServiceLock(context.getZooSession(), monitorLockPath, zooLockUUID);
HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher("monitor");
HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher(Type.MONITOR);

while (true) {
monitorLock.lock(monitorLockWatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
Expand All @@ -75,6 +76,7 @@
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.lock.ServiceLockSupport;
import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
Expand Down Expand Up @@ -119,7 +121,6 @@
import org.apache.accumulo.tserver.tablet.TabletBase;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.zookeeper.KeeperException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -326,31 +327,16 @@ public String getClientAddressString() {
* Set up nodes and locks in ZooKeeper for this Compactor
*/
private ServiceLock announceExistence() {
ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
try {

final ServiceLockPath zLockPath =
context.getServerPaths().createScanServerPath(getResourceGroup(), clientAddress);
ServiceLockSupport.createNonHaServiceLockPath(Type.SCAN_SERVER, zoo, zLockPath);
serverLockUUID = UUID.randomUUID();
// The ServiceLockPath contains a resource group in the path which is not created
// at initialization time. If it does not exist, then create it.
String sserverGroupPath = zLockPath.toString().substring(0,
zLockPath.toString().lastIndexOf("/" + zLockPath.getServer()));
LOG.debug("Creating sserver resource group path in zookeeper: {}", sserverGroupPath);
try {
zoo.mkdirs(sserverGroupPath);
// Old zk nodes can be cleaned up by ZooZap
zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NOAUTH) {
LOG.error("Failed to write to ZooKeeper. Ensure that"
+ " accumulo.properties, specifically instance.secret, is consistent.");
}
throw e;
}
scanServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, serverLockUUID);
LockWatcher lw = new ServiceLockWatcher("scan server", () -> serverStopRequested,
(name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));
LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> serverStopRequested,
(type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));

for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.ClientTabletCache;
import org.apache.accumulo.core.clientImpl.DurabilityImpl;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
Expand All @@ -85,6 +86,7 @@
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.lock.ServiceLockSupport;
import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.manager.thrift.Compacting;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
Expand Down Expand Up @@ -487,31 +489,17 @@ public ZooCache getManagerLockCache() {
}

private void announceExistence() {
ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
try {

final ServiceLockPath zLockPath =
context.getServerPaths().createTabletServerPath(getResourceGroup(), clientAddress);
// The ServiceLockPath contains a resource group in the path which is not created
// at initialization time. If it does not exist, then create it.
String tserverGroupPath = zLockPath.toString().substring(0,
zLockPath.toString().lastIndexOf("/" + zLockPath.getServer()));
log.debug("Creating tserver resource group path in zookeeper: {}", tserverGroupPath);
try {
zoo.mkdirs(tserverGroupPath);
zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NOAUTH) {
log.error("Failed to write to ZooKeeper. Ensure that"
+ " accumulo.properties, specifically instance.secret, is consistent.");
}
throw e;
}
ServiceLockSupport.createNonHaServiceLockPath(Type.TABLET_SERVER, zoo, zLockPath);
UUID tabletServerUUID = UUID.randomUUID();
tabletServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, tabletServerUUID);

LockWatcher lw = new ServiceLockWatcher("tablet server", () -> serverStopRequested,
(name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));
LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> serverStopRequested,
(type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));

for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
Expand Down

0 comments on commit 34d27c9

Please sign in to comment.