From 34d27c9f0e89a5dac4af368dfc9d98c159fb0878 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 13 Jan 2025 08:43:26 -0500 Subject: [PATCH] Created common method for creating non-HA server zk path (#5225) For HA servers (Manager, Monitor, etc) the ZK node is created at instance initialization time. For non-HA servers (compactor, sserver, tserver) the paths are created when the server is started. Each server impl had code to do this and I just moved it into a common location. --- .../core/lock/ServiceLockSupport.java | 64 +++++++++++++------ .../apache/accumulo/compactor/Compactor.java | 24 ++----- .../accumulo/gc/SimpleGarbageCollector.java | 3 +- .../org/apache/accumulo/manager/Manager.java | 5 +- .../org/apache/accumulo/monitor/Monitor.java | 3 +- .../apache/accumulo/tserver/ScanServer.java | 26 ++------ .../apache/accumulo/tserver/TabletServer.java | 24 ++----- 7 files changed, 70 insertions(+), 79 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java index 3129511b5e3..eb70d8e84ca 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@ -21,16 +21,45 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.util.Halt; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoAuthException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServiceLockSupport { + private static final Logger LOG = LoggerFactory.getLogger(ServiceLockSupport.class); + + /** + * Ensures that the resource group node in ZooKeeper is created for this server + */ + public static void createNonHaServiceLockPath(Type server, ZooReaderWriter zrw, + ServiceLockPath slp) throws KeeperException, InterruptedException { + // The ServiceLockPath contains a resource group in the path which is not created + // at initialization time. If it does not exist, then create it. + String rgPath = slp.toString().substring(0, slp.toString().lastIndexOf("/" + slp.getServer())); + LOG.debug("Creating {} resource group path in zookeeper: {}", server, rgPath); + try { + zrw.mkdirs(rgPath); + zrw.putPersistentData(slp.toString(), new byte[] {}, NodeExistsPolicy.SKIP); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.NOAUTH) { + LOG.error("Failed to write to ZooKeeper. Ensure that" + + " accumulo.properties, specifically instance.secret, is consistent."); + } + throw e; + } + + } + /** * Lock Watcher used by Highly Available services. These are services where only instance is * running at a time, but another backup service can be started that will be used if the active @@ -40,30 +69,29 @@ public static class HAServiceLockWatcher implements AccumuloLockWatcher { private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); - private final String serviceName; + private final Type server; private volatile boolean acquiredLock = false; private volatile boolean failedToAcquireLock = false; - public HAServiceLockWatcher(String serviceName) { - this.serviceName = serviceName; + public HAServiceLockWatcher(Type server) { + this.server = server; } @Override public void lostLock(LockLossReason reason) { - Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); } @Override public void unableToMonitorLockNode(final Exception e) { // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, - () -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e)); + Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor {} lock node", server, e)); } @Override public synchronized void acquiredLock() { - LOG.debug("Acquired {} lock", serviceName); + LOG.debug("Acquired {} lock", server); if (acquiredLock || failedToAcquireLock) { Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); @@ -75,11 +103,11 @@ public synchronized void acquiredLock() { @Override public synchronized void failedToAcquireLock(Exception e) { - LOG.warn("Failed to get {} lock", serviceName, e); + LOG.warn("Failed to get {} lock", server, e); if (e instanceof NoAuthException) { String msg = - "Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication."; + "Failed to acquire " + server + " lock due to incorrect ZooKeeper authentication."; LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); Halt.halt(msg, -1); } @@ -96,7 +124,7 @@ public synchronized void failedToAcquireLock(Exception e) { public synchronized void waitForChange() { while (!acquiredLock && !failedToAcquireLock) { try { - LOG.info("{} lock held by someone else, waiting for a change in state", serviceName); + LOG.info("{} lock held by someone else, waiting for a change in state", server); wait(); } catch (InterruptedException e) { // empty @@ -121,13 +149,13 @@ public static class ServiceLockWatcher implements LockWatcher { private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); - private final String serviceName; + private final Type server; private final Supplier shuttingDown; - private final Consumer lostLockAction; + private final Consumer lostLockAction; - public ServiceLockWatcher(String serviceName, Supplier shuttingDown, - Consumer lostLockAction) { - this.serviceName = serviceName; + public ServiceLockWatcher(Type server, Supplier shuttingDown, + Consumer lostLockAction) { + this.server = server; this.shuttingDown = shuttingDown; this.lostLockAction = lostLockAction; } @@ -136,15 +164,15 @@ public ServiceLockWatcher(String serviceName, Supplier shuttingDown, public void lostLock(final LockLossReason reason) { Halt.halt(1, () -> { if (!shuttingDown.get()) { - LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason); + LOG.error("{} lost lock (reason = {}), exiting.", server, reason); } - lostLockAction.accept(serviceName); + lostLockAction.accept(server); }); } @Override public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e)); + Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", server, e)); } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 20502cfcaa0..e61ab5a1124 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@ -80,6 +81,7 @@ import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockSupport; import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.ReferencedTabletFile; @@ -272,27 +274,13 @@ protected void checkIfCanceled() { protected void announceExistence(HostAndPort clientAddress) throws KeeperException, InterruptedException { - ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); - + final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); final ServiceLockPath path = getContext().getServerPaths().createCompactorPath(getResourceGroup(), clientAddress); - // The ServiceLockPath contains a resource group in the path which is not created - // at initialization time. If it does not exist, then create it. - final String compactorGroupPath = - path.toString().substring(0, path.toString().lastIndexOf("/" + path.getServer())); - LOG.debug("Creating compactor resource group path in zookeeper: {}", compactorGroupPath); - try { - zoo.mkdirs(compactorGroupPath); - zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP); - } catch (KeeperException.NoAuthException e) { - LOG.error("Failed to write to ZooKeeper. Ensure that" - + " accumulo.properties, specifically instance.secret, is consistent."); - throw e; - } - + ServiceLockSupport.createNonHaServiceLockPath(Type.COMPACTOR, zoo, path); compactorLock = new ServiceLock(getContext().getZooSession(), path, compactorId); - LockWatcher lw = new ServiceLockWatcher("compactor", () -> false, - (name) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); + LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> false, + (type) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); try { for (int i = 0; i < 25; i++) { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index efef0dd72ca..62449eabb65 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -370,7 +371,7 @@ private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedExc UUID zooLockUUID = UUID.randomUUID(); gcLock = new ServiceLock(getContext().getZooSession(), path, zooLockUUID); - HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher(Type.GARBAGE_COLLECTOR); while (true) { gcLock.lock(gcLockWatcher, new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 17ae1d10d74..eb22227bb91 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -61,6 +61,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; @@ -1499,14 +1500,12 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) ServiceDescriptors descriptors = new ServiceDescriptors(); descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.MANAGER, managerClientAddress, this.getResourceGroup())); - ServiceLockData sld = new ServiceLockData(descriptors); - managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); + HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher(Type.MANAGER); while (true) { - HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher("manager"); managerLock.lock(managerLockWatcher, sld); managerLockWatcher.waitForChange(); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index cabd4ef957d..b728b8a2b77 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; @@ -741,7 +742,7 @@ private void getMonitorLock(HostAndPort monitorLocation) // Get a ZooLock for the monitor UUID zooLockUUID = UUID.randomUUID(); monitorLock = new ServiceLock(context.getZooSession(), monitorLockPath, zooLockUUID); - HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher("monitor"); + HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher(Type.MONITOR); while (true) { monitorLock.lock(monitorLockWatcher, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 9d414477b4a..8a409d566b6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -50,6 +50,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -75,6 +76,7 @@ import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockSupport; import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -119,7 +121,6 @@ import org.apache.accumulo.tserver.tablet.TabletBase; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; -import org.apache.zookeeper.KeeperException; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -326,31 +327,16 @@ public String getClientAddressString() { * Set up nodes and locks in ZooKeeper for this Compactor */ private ServiceLock announceExistence() { - ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); + final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); try { final ServiceLockPath zLockPath = context.getServerPaths().createScanServerPath(getResourceGroup(), clientAddress); + ServiceLockSupport.createNonHaServiceLockPath(Type.SCAN_SERVER, zoo, zLockPath); serverLockUUID = UUID.randomUUID(); - // The ServiceLockPath contains a resource group in the path which is not created - // at initialization time. If it does not exist, then create it. - String sserverGroupPath = zLockPath.toString().substring(0, - zLockPath.toString().lastIndexOf("/" + zLockPath.getServer())); - LOG.debug("Creating sserver resource group path in zookeeper: {}", sserverGroupPath); - try { - zoo.mkdirs(sserverGroupPath); - // Old zk nodes can be cleaned up by ZooZap - zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.NOAUTH) { - LOG.error("Failed to write to ZooKeeper. Ensure that" - + " accumulo.properties, specifically instance.secret, is consistent."); - } - throw e; - } scanServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, serverLockUUID); - LockWatcher lw = new ServiceLockWatcher("scan server", () -> serverStopRequested, - (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); + LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> serverStopRequested, + (type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 08d83f148d7..aea44fcdafc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -66,6 +66,7 @@ import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.ClientTabletCache; import org.apache.accumulo.core.clientImpl.DurabilityImpl; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -85,6 +86,7 @@ import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockSupport; import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.manager.thrift.Compacting; import org.apache.accumulo.core.manager.thrift.ManagerClientService; @@ -487,31 +489,17 @@ public ZooCache getManagerLockCache() { } private void announceExistence() { - ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); + final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); try { final ServiceLockPath zLockPath = context.getServerPaths().createTabletServerPath(getResourceGroup(), clientAddress); - // The ServiceLockPath contains a resource group in the path which is not created - // at initialization time. If it does not exist, then create it. - String tserverGroupPath = zLockPath.toString().substring(0, - zLockPath.toString().lastIndexOf("/" + zLockPath.getServer())); - log.debug("Creating tserver resource group path in zookeeper: {}", tserverGroupPath); - try { - zoo.mkdirs(tserverGroupPath); - zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.NOAUTH) { - log.error("Failed to write to ZooKeeper. Ensure that" - + " accumulo.properties, specifically instance.secret, is consistent."); - } - throw e; - } + ServiceLockSupport.createNonHaServiceLockPath(Type.TABLET_SERVER, zoo, zLockPath); UUID tabletServerUUID = UUID.randomUUID(); tabletServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, tabletServerUUID); - LockWatcher lw = new ServiceLockWatcher("tablet server", () -> serverStopRequested, - (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); + LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> serverStopRequested, + (type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);