From b3424a86b9cc62cfba3ca5eb5f3eef249c82fa30 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 6 Jan 2025 14:08:55 -0500 Subject: [PATCH] Made lock acquisition consistent (#5224) Modified SimpleGarbageCollector to acquire the service lock in the same manner that other HA services acquire the service lock. Created a new class called ServiceLockSupport that is currently used to hold LockWatcher implementations for HA and non-HA servers. Closes #4839 Co-authored-by: Keith Turner --- .../fate/zookeeper/ServiceLockSupport.java | 152 ++++++++++++++++++ .../coordinator/CompactionCoordinator.java | 5 +- .../coordinator/CoordinatorLockWatcher.java | 94 ----------- .../apache/accumulo/compactor/Compactor.java | 18 +-- .../accumulo/gc/SimpleGarbageCollector.java | 41 +++-- .../org/apache/accumulo/manager/Manager.java | 69 +------- .../org/apache/accumulo/monitor/Monitor.java | 60 +------ .../apache/accumulo/tserver/ScanServer.java | 23 +-- .../apache/accumulo/tserver/TabletServer.java | 23 +-- 9 files changed, 192 insertions(+), 293 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java delete mode 100644 server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java new file mode 100644 index 00000000000..b4d95a703aa --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.zookeeper; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.AccumuloLockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.util.Halt; +import org.apache.zookeeper.KeeperException.NoAuthException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceLockSupport { + + /** + * Lock Watcher used by Highly Available services. These are services where only instance is + * running at a time, but another backup service can be started that will be used if the active + * service instance fails and loses its lock in ZK. + */ + public static class HAServiceLockWatcher implements AccumuloLockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); + + private final String serviceName; + private volatile boolean acquiredLock = false; + private volatile boolean failedToAcquireLock = false; + + public HAServiceLockWatcher(String serviceName) { + this.serviceName = serviceName; + } + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, + () -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e)); + + } + + @Override + public synchronized void acquiredLock() { + LOG.debug("Acquired {} lock", serviceName); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + LOG.warn("Failed to get {} lock", serviceName, e); + + if (e instanceof NoAuthException) { + String msg = + "Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication."; + LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); + Halt.halt(msg, -1); + } + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, + -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + LOG.info("{} lock held by someone else, waiting for a change in state", serviceName); + wait(); + } catch (InterruptedException e) { + // empty + } + } + } + + public boolean isLockAcquired() { + return acquiredLock; + } + + public boolean isFailedToAcquireLock() { + return failedToAcquireLock; + } + + } + + /** + * Lock Watcher used by non-HA services + */ + public static class ServiceLockWatcher implements LockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); + + private final String serviceName; + private final Supplier shuttingDown; + private final Consumer lostLockAction; + + public ServiceLockWatcher(String serviceName, Supplier shuttingDown, + Consumer lostLockAction) { + this.serviceName = serviceName; + this.shuttingDown = shuttingDown; + this.lostLockAction = lostLockAction; + } + + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + if (!shuttingDown.get()) { + LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason); + } + lostLockAction.accept(serviceName); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e)); + } + + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b735d8544dc..7d853b48089 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -58,6 +58,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; @@ -223,11 +224,11 @@ protected void getCoordinatorLock(HostAndPort clientAddress) ServiceLock.path(lockPath), zooLockUUID); while (true) { - CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); + HAServiceLockWatcher coordinatorLockWatcher = new HAServiceLockWatcher("coordinator"); coordinatorLock.lock(coordinatorLockWatcher, coordinatorClientAddress.getBytes(UTF_8)); coordinatorLockWatcher.waitForChange(); - if (coordinatorLockWatcher.isAcquiredLock()) { + if (coordinatorLockWatcher.isLockAcquired()) { break; } if (!coordinatorLockWatcher.isFailedToAcquireLock()) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java deleted file mode 100644 index 663ecaf4a9f..00000000000 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.coordinator; - -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.util.Halt; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CoordinatorLockWatcher implements ServiceLock.AccumuloLockWatcher { - - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class); - - private volatile boolean acquiredLock = false; - private volatile boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - LOG.debug("Acquired Coordinator lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - LOG.warn("Failed to get Coordinator lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire Coordinator lock due to incorrect ZooKeeper authentication."; - LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - LOG.info("Coordinator lock held by someone else, waiting for a change in state"); - wait(); - } catch (InterruptedException e) {} - } - } - - public boolean isAcquiredLock() { - return acquiredLock; - } - - public boolean isFailedToAcquireLock() { - return failedToAcquireLock; - } - -} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 83ff5de7ee0..f95fad41ab2 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -66,8 +66,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.FileOperations; @@ -91,7 +91,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; @@ -288,20 +287,9 @@ protected void announceExistence(HostAndPort clientAddress) compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), ServiceLock.path(zPath), compactorId); - LockWatcher lw = new LockWatcher() { - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - LOG.error("Compactor lost lock (reason = {}), exiting.", reason); - gcLogger.logGCInfo(getConfiguration()); - }); - } - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("compactor", () -> false, + (name) -> gcLogger.logGCInfo(getConfiguration())); try { byte[] lockContent = diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index bd78388836e..9658a21e7a2 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -32,8 +32,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; @@ -44,7 +43,6 @@ import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; @@ -365,31 +363,32 @@ boolean moveToTrash(Path path) throws IOException { private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { var path = ServiceLock.path(getContext().getZooKeeperRoot() + Constants.ZGC_LOCK); - LockWatcher lockWatcher = new LockWatcher() { - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); - } + UUID zooLockUUID = UUID.randomUUID(); + gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e)); + while (true) { + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); + gcLock.lock(gcLockWatcher, + new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8)); + gcLockWatcher.waitForChange(); + + if (gcLockWatcher.isLockAcquired()) { + break; } - }; - UUID zooLockUUID = UUID.randomUUID(); - gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - while (true) { - if (gcLock.tryLock(lockWatcher, - new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8))) { - log.debug("Got GC ZooKeeper lock"); - return; + if (!gcLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("gc lock in unknown state"); } + + gcLock.tryToCancelAsyncLockOrUnlock(); + log.debug("Failed to get GC ZooKeeper lock, will retry"); - sleepUninterruptibly(1, TimeUnit.SECONDS); + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); } + + log.info("Got GC lock."); + } private HostAndPort startStatsService() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a3540fa062a..7ca35732cca 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -68,8 +68,8 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.TStore; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; @@ -105,7 +105,6 @@ import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; @@ -148,7 +147,6 @@ import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -1637,65 +1635,6 @@ public ServiceLock getManagerLock() { return managerLock; } - private static class ManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - log.debug("Acquired manager lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get manager lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire manager lock due to incorrect ZooKeeper authentication."; - log.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) { - // empty - } - } - } - } - private void getManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); @@ -1708,17 +1647,17 @@ private void getManagerLock(final ServiceLockPath zManagerLoc) managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); while (true) { - ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher(); + HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher("manager"); managerLock.lock(managerLockWatcher, managerClientAddress.getBytes(UTF_8)); managerLockWatcher.waitForChange(); - if (managerLockWatcher.acquiredLock) { + if (managerLockWatcher.isLockAcquired()) { startServiceLockVerificationThread(); break; } - if (!managerLockWatcher.failedToAcquireLock) { + if (!managerLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("manager lock in unknown state"); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 4108282bcb3..17ed1737149 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -52,7 +52,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -71,7 +71,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; @@ -858,16 +857,16 @@ private void getMonitorLock() throws KeeperException, InterruptedException { monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, zooLockUUID); while (true) { - MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); + HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher("monitor"); monitorLock.lock(monitorLockWatcher, new byte[0]); monitorLockWatcher.waitForChange(); - if (monitorLockWatcher.acquiredLock) { + if (monitorLockWatcher.isLockAcquired()) { break; } - if (!monitorLockWatcher.failedToAcquireLock) { + if (!monitorLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("monitor lock in unknown state"); } @@ -881,57 +880,6 @@ private void getMonitorLock() throws KeeperException, InterruptedException { log.info("Got Monitor lock."); } - /** - * Async Watcher for monitor lock - */ - private static class MoniterLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(-1, () -> log.error("No longer able to monitor Monitor lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get monitor lock " + e); - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) {} - } - } - } - public ManagerMonitorInfo getMmi() { return mmi; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index b3301f426c2..bfbcdeed9b0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -64,8 +64,8 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -86,7 +86,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -349,24 +348,8 @@ private ServiceLock announceExistence() { serverLockUUID = UUID.randomUUID(); scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID); - - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - gcLogger.logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("scan server", () -> serverStopRequested, + (name) -> gcLogger.logGCInfo(getConfiguration())); // Don't use the normal ServerServices lock content, instead put the server UUID here. byte[] lockContent = (serverLockUUID.toString() + "," + groupName).getBytes(UTF_8); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 54764b86725..3a3ad3f3b14 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -73,8 +73,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -95,7 +95,6 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; @@ -679,24 +678,8 @@ private void announceExistence() { tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID()); - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - log.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - gcLogger.logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); - - } - }; + LockWatcher lw = new ServiceLockWatcher("tablet server", () -> serverStopRequested, + (name) -> gcLogger.logGCInfo(getConfiguration())); byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT) .toString().getBytes(UTF_8);