diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java new file mode 100644 index 00000000000..3129511b5e3 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.lock; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; +import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.util.Halt; +import org.apache.zookeeper.KeeperException.NoAuthException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceLockSupport { + + /** + * Lock Watcher used by Highly Available services. These are services where only instance is + * running at a time, but another backup service can be started that will be used if the active + * service instance fails and loses its lock in ZK. + */ + public static class HAServiceLockWatcher implements AccumuloLockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); + + private final String serviceName; + private volatile boolean acquiredLock = false; + private volatile boolean failedToAcquireLock = false; + + public HAServiceLockWatcher(String serviceName) { + this.serviceName = serviceName; + } + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, + () -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e)); + + } + + @Override + public synchronized void acquiredLock() { + LOG.debug("Acquired {} lock", serviceName); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + LOG.warn("Failed to get {} lock", serviceName, e); + + if (e instanceof NoAuthException) { + String msg = + "Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication."; + LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); + Halt.halt(msg, -1); + } + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, + -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + LOG.info("{} lock held by someone else, waiting for a change in state", serviceName); + wait(); + } catch (InterruptedException e) { + // empty + } + } + } + + public boolean isLockAcquired() { + return acquiredLock; + } + + public boolean isFailedToAcquireLock() { + return failedToAcquireLock; + } + + } + + /** + * Lock Watcher used by non-HA services + */ + public static class ServiceLockWatcher implements LockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); + + private final String serviceName; + private final Supplier shuttingDown; + private final Consumer lostLockAction; + + public ServiceLockWatcher(String serviceName, Supplier shuttingDown, + Consumer lostLockAction) { + this.serviceName = serviceName; + this.shuttingDown = shuttingDown; + this.lostLockAction = lostLockAction; + } + + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + if (!shuttingDown.get()) { + LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason); + } + lostLockAction.accept(serviceName); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e)); + } + + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index fd4e27fc65c..09d1852f466 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -64,6 +64,7 @@ import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -219,12 +220,12 @@ protected void getCoordinatorLock(HostAndPort clientAddress) ServiceLock.path(lockPath), zooLockUUID); while (true) { - CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); + HAServiceLockWatcher coordinatorLockWatcher = new HAServiceLockWatcher("coordinator"); coordinatorLock.lock(coordinatorLockWatcher, new ServiceLockData(zooLockUUID, coordinatorClientAddress, ThriftService.COORDINATOR)); coordinatorLockWatcher.waitForChange(); - if (coordinatorLockWatcher.isAcquiredLock()) { + if (coordinatorLockWatcher.isLockAcquired()) { break; } if (!coordinatorLockWatcher.isFailedToAcquireLock()) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java deleted file mode 100644 index bbbe73147d7..00000000000 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.coordinator; - -import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; -import org.apache.accumulo.core.util.Halt; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CoordinatorLockWatcher implements ServiceLock.AccumuloLockWatcher { - - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class); - - private volatile boolean acquiredLock = false; - private volatile boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - LOG.debug("Acquired Coordinator lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - LOG.warn("Failed to get Coordinator lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire Coordinator lock due to incorrect ZooKeeper authentication."; - LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - LOG.info("Coordinator lock held by someone else, waiting for a change in state"); - wait(); - } catch (InterruptedException e) {} - } - } - - public boolean isAcquiredLock() { - return acquiredLock; - } - - public boolean isFailedToAcquireLock() { - return failedToAcquireLock; - } - -} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 3fe294ad624..3d60d89fb19 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -75,12 +75,12 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -98,7 +98,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; @@ -273,20 +272,8 @@ protected void announceExistence(HostAndPort clientAddress) compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), ServiceLock.path(zPath), compactorId); - LockWatcher lw = new LockWatcher() { - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - LOG.error("Compactor lost lock (reason = {}), exiting.", reason); - getContext().getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("compactor", () -> false, + (name) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); try { for (int i = 0; i < 25; i++) { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 62c2280b844..adc15bea239 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -36,16 +36,14 @@ import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; -import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.gc.metrics.GcCycleMetrics; import org.apache.accumulo.gc.metrics.GcMetrics; @@ -336,31 +334,32 @@ boolean moveToTrash(Path path) throws IOException { private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { var path = ServiceLock.path(getContext().getZooKeeperRoot() + Constants.ZGC_LOCK); - LockWatcher lockWatcher = new LockWatcher() { - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); - } + UUID zooLockUUID = UUID.randomUUID(); + gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e)); + while (true) { + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); + gcLock.lock(gcLockWatcher, + new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC)); + gcLockWatcher.waitForChange(); + + if (gcLockWatcher.isLockAcquired()) { + break; } - }; - UUID zooLockUUID = UUID.randomUUID(); - gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - while (true) { - if (gcLock.tryLock(lockWatcher, - new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC))) { - log.debug("Got GC ZooKeeper lock"); - return; + if (!gcLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("gc lock in unknown state"); } + + gcLock.tryToCancelAsyncLockOrUnlock(); + log.debug("Failed to get GC ZooKeeper lock, will retry"); - sleepUninterruptibly(1, TimeUnit.SECONDS); + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); } + + log.info("Got GC lock."); + } private HostAndPort startStatsService() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index dfb672013fc..c52f6efcfb2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -74,10 +74,10 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; @@ -105,7 +105,6 @@ import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -150,7 +149,6 @@ import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -1564,65 +1562,6 @@ public ServiceLock getManagerLock() { return managerLock; } - private static class ManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - log.debug("Acquired manager lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get manager lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire manager lock due to incorrect ZooKeeper authentication."; - log.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) { - // empty - } - } - } - } - private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); @@ -1638,17 +1577,17 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); while (true) { - ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher(); + HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher("manager"); managerLock.lock(managerLockWatcher, sld); managerLockWatcher.waitForChange(); - if (managerLockWatcher.acquiredLock) { + if (managerLockWatcher.isLockAcquired()) { startServiceLockVerificationThread(); break; } - if (!managerLockWatcher.failedToAcquireLock) { + if (!managerLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("manager lock in unknown state"); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 239394f6ab7..839dfd0eaef 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -54,9 +54,9 @@ import org.apache.accumulo.core.gc.thrift.GCMonitorService; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TableInfo; @@ -70,7 +70,6 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; @@ -754,17 +753,17 @@ private void getMonitorLock(HostAndPort monitorLocation) monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, zooLockUUID); while (true) { - MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); + HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher("monitor"); monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID, monitorLocation.getHost() + ":" + monitorLocation.getPort(), ThriftService.NONE)); monitorLockWatcher.waitForChange(); - if (monitorLockWatcher.acquiredLock) { + if (monitorLockWatcher.isLockAcquired()) { break; } - if (!monitorLockWatcher.failedToAcquireLock) { + if (!monitorLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("monitor lock in unknown state"); } @@ -778,57 +777,6 @@ private void getMonitorLock(HostAndPort monitorLocation) log.info("Got Monitor lock."); } - /** - * Async Watcher for monitor lock - */ - private static class MoniterLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(-1, () -> log.error("No longer able to monitor Monitor lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get monitor lock " + e); - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) {} - } - } - } - public ManagerMonitorInfo getMmi() { return mmi; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 7fcd59dcc9b..8db73cb001a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -69,12 +69,12 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; @@ -90,7 +90,6 @@ import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -346,24 +345,8 @@ private ServiceLock announceExistence() { serverLockUUID = UUID.randomUUID(); scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID); - - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("scan server", () -> serverStopRequested, + (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 520e1403522..9d87e7fea63 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -78,12 +78,12 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.manager.thrift.BulkImportState; import org.apache.accumulo.core.manager.thrift.Compacting; import org.apache.accumulo.core.manager.thrift.ManagerClientService; @@ -100,7 +100,6 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; @@ -630,24 +629,8 @@ private void announceExistence() { UUID tabletServerUUID = UUID.randomUUID(); tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, tabletServerUUID); - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - log.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); - - } - }; + LockWatcher lw = new ServiceLockWatcher("tablet server", () -> serverStopRequested, + (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);