diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java index 372246d4f73..eb70d8e84ca 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@ -18,11 +18,19 @@ */ package org.apache.accumulo.core.lock; +import java.util.function.Consumer; +import java.util.function.Supplier; + import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; +import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.util.Halt; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoAuthException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,4 +59,122 @@ public static void createNonHaServiceLockPath(Type server, ZooReaderWriter zrw, } } + + /** + * Lock Watcher used by Highly Available services. These are services where only instance is + * running at a time, but another backup service can be started that will be used if the active + * service instance fails and loses its lock in ZK. + */ + public static class HAServiceLockWatcher implements AccumuloLockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); + + private final Type server; + private volatile boolean acquiredLock = false; + private volatile boolean failedToAcquireLock = false; + + public HAServiceLockWatcher(Type server) { + this.server = server; + } + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor {} lock node", server, e)); + + } + + @Override + public synchronized void acquiredLock() { + LOG.debug("Acquired {} lock", server); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + LOG.warn("Failed to get {} lock", server, e); + + if (e instanceof NoAuthException) { + String msg = + "Failed to acquire " + server + " lock due to incorrect ZooKeeper authentication."; + LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); + Halt.halt(msg, -1); + } + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, + -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + LOG.info("{} lock held by someone else, waiting for a change in state", server); + wait(); + } catch (InterruptedException e) { + // empty + } + } + } + + public boolean isLockAcquired() { + return acquiredLock; + } + + public boolean isFailedToAcquireLock() { + return failedToAcquireLock; + } + + } + + /** + * Lock Watcher used by non-HA services + */ + public static class ServiceLockWatcher implements LockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); + + private final Type server; + private final Supplier shuttingDown; + private final Consumer lostLockAction; + + public ServiceLockWatcher(Type server, Supplier shuttingDown, + Consumer lostLockAction) { + this.server = server; + this.shuttingDown = shuttingDown; + this.lostLockAction = lostLockAction; + } + + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + if (!shuttingDown.get()) { + LOG.error("{} lost lock (reason = {}), exiting.", server, reason); + } + lostLockAction.accept(server); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", server, e)); + } + + } + } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 7135b5a978f..367ee6fe64a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.admin.TabletAvailability; @@ -623,6 +624,16 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t * let the rejected status carry forward in this case. */ void submit(RejectionHandler rejectionHandler); + + /** + * Overloaded version of {@link #submit(RejectionHandler)} that takes a short description of the + * operation to assist with debugging. + * + * @param rejectionHandler The rejection handler + * @param description A short description of the operation (e.g., "bulk import", "compaction") + */ + void submit(RejectionHandler rejectionHandler, Supplier description); + } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 4ae46676024..ac0dc4b1126 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -36,6 +36,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.TabletAvailability; @@ -86,17 +87,20 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase> descriptionConsumer; private boolean sawOperationRequirement = false; private boolean checkPrevEndRow = true; protected ConditionalTabletMutatorImpl(ServerContext context, KeyExtent extent, Consumer mutationConsumer, - BiConsumer rejectionHandlerConsumer) { + BiConsumer rejectionHandlerConsumer, + BiConsumer> descriptionConsumer) { super(new ConditionalMutation(extent.toMetaRow())); this.mutation = (ConditionalMutation) super.mutation; this.mutationConsumer = mutationConsumer; this.rejectionHandlerConsumer = rejectionHandlerConsumer; + this.descriptionConsumer = descriptionConsumer; this.extent = extent; this.context = context; this.lock = this.context.getServiceLock(); @@ -390,4 +394,10 @@ public void submit(Ample.RejectionHandler rejectionCheck) { mutationConsumer.accept(mutation); rejectionHandlerConsumer.accept(extent, rejectionCheck); } + + @Override + public void submit(Ample.RejectionHandler rejectionHandler, Supplier description) { + descriptionConsumer.accept(extent, description); + this.submit(rejectionHandler); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java index 660453f2998..777364e0c48 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; @@ -66,6 +67,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu private boolean active = true; final Map rejectedHandlers = new HashMap<>(); + private final Map> operationDescriptions = new HashMap<>(); private final Function tableMapper; public ConditionalTabletsMutatorImpl(ServerContext context) { @@ -93,7 +95,8 @@ public Ample.OperationRequirements mutateTablet(KeyExtent extent) { Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent) == null, "Duplicate extents not handled %s", extent); - return new ConditionalTabletMutatorImpl(context, extent, mutations::add, rejectedHandlers::put); + return new ConditionalTabletMutatorImpl(context, extent, mutations::add, rejectedHandlers::put, + operationDescriptions::put); } protected ConditionalWriter createConditionalWriter(Ample.DataLevel dataLevel) @@ -262,16 +265,20 @@ public Status getStatus() { status = Status.ACCEPTED; } + Supplier descSupplier = operationDescriptions.get(extent); + String desc = (descSupplier == null) ? null : descSupplier.get(); + if (log.isTraceEnabled()) { // log detailed info about tablet metadata and mutation - log.trace("Mutation was rejected, status:{} {} {}", status, tabletMetadata, - result.getMutation().prettyPrint()); + log.trace("Mutation was rejected, status:{}. Operation description: {} {} {}", + status, desc, tabletMetadata, result.getMutation().prettyPrint()); } else if (log.isDebugEnabled()) { // log a single line of info that makes it apparent this happened and gives enough // information to investigate - log.debug("Mutation was rejected, status:{} extent:{} row:{}", status, - tabletMetadata == null ? null : tabletMetadata.getExtent(), - new String(result.getMutation().getRow(), UTF_8)); + log.debug( + "Mutation was rejected, status:{} extent:{} row:{} operation description: {}", + status, tabletMetadata == null ? null : tabletMetadata.getExtent(), + new String(result.getMutation().getRow(), UTF_8), desc); } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 09c1c43857e..9c13647f663 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -75,7 +75,6 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; @@ -83,6 +82,7 @@ import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockSupport; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -101,7 +101,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; @@ -281,20 +280,8 @@ protected void announceExistence(HostAndPort clientAddress) ServiceLockSupport.createNonHaServiceLockPath(Type.COMPACTOR, zoo, path); compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, compactorId); - LockWatcher lw = new LockWatcher() { - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - LOG.error("Compactor lost lock (reason = {}), exiting.", reason); - getContext().getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> false, + (type) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); try { for (int i = 0; i < 25; i++) { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 65f80b1864b..3fce46273a2 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -43,17 +44,15 @@ import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; -import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -370,31 +369,32 @@ boolean moveToTrash(Path path) throws IOException { private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { var path = getContext().getServerPaths().createGarbageCollectorPath(); - LockWatcher lockWatcher = new LockWatcher() { - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); - } + UUID zooLockUUID = UUID.randomUUID(); + gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher(Type.GARBAGE_COLLECTOR); - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e)); + while (true) { + gcLock.lock(gcLockWatcher, new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC, + this.getResourceGroup())); + gcLockWatcher.waitForChange(); + + if (gcLockWatcher.isLockAcquired()) { + break; } - }; - UUID zooLockUUID = UUID.randomUUID(); - gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - while (true) { - if (gcLock.tryLock(lockWatcher, new ServiceLockData(zooLockUUID, addr.toString(), - ThriftService.GC, this.getResourceGroup()))) { - log.debug("Got GC ZooKeeper lock"); - return; + if (!gcLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("gc lock in unknown state"); } + + gcLock.tryToCancelAsyncLockOrUnlock(); + log.debug("Failed to get GC ZooKeeper lock, will retry"); - sleepUninterruptibly(1, TimeUnit.SECONDS); + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); } + + log.info("Got GC lock."); + } private HostAndPort startStatsService() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 062e3062026..3781d8024d4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -61,6 +61,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; @@ -82,13 +83,13 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; @@ -114,7 +115,6 @@ import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -158,7 +158,6 @@ import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -1488,65 +1487,6 @@ public ServiceLock getManagerLock() { return managerLock; } - private static class ManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - log.debug("Acquired manager lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get manager lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire manager lock due to incorrect ZooKeeper authentication."; - log.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) { - // empty - } - } - } - } - private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); @@ -1567,17 +1507,17 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) while (true) { - ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher(); + HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher(Type.MANAGER); managerLock.lock(managerLockWatcher, sld); managerLockWatcher.waitForChange(); - if (managerLockWatcher.acquiredLock) { + if (managerLockWatcher.isLockAcquired()) { startServiceLockVerificationThread(); break; } - if (!managerLockWatcher.failedToAcquireLock) { + if (!managerLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("manager lock in unknown state"); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 6496f500e9f..e29413e82aa 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -350,12 +350,10 @@ public void hostOndemand(Collection extents) { // Do not add any code here, it may interfere with the finally block removing extents from // hostingRequestInProgress try (var mutator = manager.getContext().getAmple().conditionallyMutateTablets()) { - inProgress.forEach(ke -> { - mutator.mutateTablet(ke).requireAbsentOperation() - .requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation() - .setHostingRequested().submit(TabletMetadata::getHostingRequested); - - }); + inProgress.forEach(ke -> mutator.mutateTablet(ke).requireAbsentOperation() + .requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation() + .setHostingRequested() + .submit(TabletMetadata::getHostingRequested, () -> "host ondemand")); List ranges = new ArrayList<>(); @@ -1094,7 +1092,7 @@ private void replaceVolumes(List volumeReplacemen "replaceVolume conditional mutation rejection check {} logsRemoved:{} filesRemoved:{}", tm.getExtent(), logsRemoved, filesRemoved); return logsRemoved && filesRemoved; - }); + }, () -> "replace volume"); } tabletsMutator.process().forEach((extent, result) -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 6f2eca37565..fce920e4cba 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -635,8 +635,9 @@ public CompactionMetadata get() { } tabletMutator.putExternalCompaction(externalCompactionId, ecm); - tabletMutator - .submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId)); + tabletMutator.submit( + tm -> tm.getExternalCompactions().containsKey(externalCompactionId), + () -> "compaction reservation"); var result = tabletsMutator.process().get(extent); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index cb9492d7e30..29e626d9540 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -128,8 +128,9 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId // make the needed updates to the tablet updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator); - tabletMutator - .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + tabletMutator.submit( + tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid), + () -> "commit compaction " + ecid); if (LOG.isDebugEnabled()) { LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java index bd94afd15b4..db4fcdbeaf2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java @@ -136,7 +136,8 @@ public long isReady(FateId fateId, Manager manager) throws Exception { tabletExtent); mutator.mutateTablet(tabletExtent).requireAbsentOperation() .putTabletAvailability(tabletAvailability) - .submit(tabletMeta -> tabletMeta.getTabletAvailability() == tabletAvailability); + .submit(tabletMeta -> tabletMeta.getTabletAvailability() == tabletAvailability, + () -> "set tablet availability"); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index 32003695251..e5cdfdb085a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@ -111,7 +111,7 @@ private static void removeBulkLoadEntries(Ample ample, TableId tableId, FateId f tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue().equals(fateId)) .map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile); - tabletMutator.submit(tm -> false); + tabletMutator.submit(tm -> false, () -> "remove bulk load entries " + fateId); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 5ff2ca3ec39..c38fb54ce69 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -251,7 +251,7 @@ void load(List tablets, Files files) { Preconditions.checkState( loadingFiles.put(tablet.getExtent(), List.copyOf(filesToLoad.keySet())) == null); - tabletMutator.submit(tm -> false); + tabletMutator.submit(tm -> false, () -> "bulk load files " + fateId); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 464f3651210..51c4186bf9c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -211,7 +211,8 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) // this tablet has no files try to mark it as done tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, COMPACTED).putCompacted(fateId) - .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId)); + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId), + () -> "no files, attempting to mark as compacted. " + fateId); noFiles++; } else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) { // there are no selected files @@ -242,7 +243,8 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) // no files were selected so mark the tablet as compacted tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(fateId) - .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId)); + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId), + () -> "no files, attempting to mark as compacted. " + fateId); noneSelected++; } else { @@ -260,9 +262,11 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) selectionsSubmitted.put(tablet.getExtent(), filesToCompact); - mutator.submit(tabletMetadata -> tabletMetadata.getSelectedFiles() != null - && tabletMetadata.getSelectedFiles().getFateId().equals(fateId) - || tabletMetadata.getCompacted().contains(fateId)); + mutator.submit( + tabletMetadata -> tabletMetadata.getSelectedFiles() != null + && tabletMetadata.getSelectedFiles().getFateId().equals(fateId) + || tabletMetadata.getCompacted().contains(fateId), + () -> "selecting files for compaction. " + fateId); if (minSelected == null || tablet.getExtent().compareTo(minSelected) < 0) { minSelected = tablet.getExtent(); @@ -298,7 +302,8 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, ECOMP, USER_COMPACTION_REQUESTED) .putUserCompactionRequested(fateId); - mutator.submit(tm -> tm.getUserCompactionsRequested().contains(fateId)); + mutator.submit(tm -> tm.getUserCompactionsRequested().contains(fateId), + () -> "marking as needing a user requested compaction. " + fateId); userCompactionRequested++; } else { // Marker was already added and we are waiting @@ -400,7 +405,8 @@ private void cleanupTabletMetadata(FateId fateId, Manager manager) throws Except mutator.deleteUserCompactionRequested(fateId); } - mutator.submit(needsNoUpdate::test); + mutator.submit(needsNoUpdate::test, + () -> "cleanup metadata for failed compaction. " + fateId); } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java index f065206b2ff..6f9d65d0b87 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -96,7 +96,8 @@ public long isReady(FateId fateId, Manager manager) throws Exception { // must wait for the tablet to have no location before proceeding to actually delete. See // the documentation about the opid column in the MetadataSchema class for more details. conditionalMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation() - .putOperation(opid).submit(tm -> opid.equals(tm.getOperationId())); + .putOperation(opid) + .submit(tm -> opid.equals(tm.getOperationId()), () -> "put opid " + opid); submitted++; } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java index 2c5221b7ce8..3bf6bce841b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java @@ -178,16 +178,20 @@ private Optional deleteTabletFiles(Manager manager, FateId fateId) { } } - filesToDelete.forEach(file -> log.debug("{} deleting file {} for {}", fateId, file, - tabletMetadata.getExtent())); - filesToAddMap.forEach((file, dfv) -> log.debug("{} adding file {} {} for {}", fateId, file, - dfv, tabletMetadata.getExtent())); - - filesToDelete.forEach(tabletMutator::deleteFile); - filesToAddMap.forEach(tabletMutator::putFile); - - tabletMutator.submit(tm -> tm.getFiles().containsAll(filesToAddMap.keySet()) - && Collections.disjoint(tm.getFiles(), filesToDelete)); + filesToDelete.forEach(file -> { + log.debug("{} deleting file {} for {}", fateId, file, tabletMetadata.getExtent()); + tabletMutator.deleteFile(file); + }); + + filesToAddMap.forEach((file, dfv) -> { + log.debug("{} adding file {} {} for {}", fateId, file, dfv, tabletMetadata.getExtent()); + tabletMutator.putFile(file, dfv); + }); + + tabletMutator.submit( + tm -> tm.getFiles().containsAll(filesToAddMap.keySet()) + && Collections.disjoint(tm.getFiles(), filesToDelete), + () -> "delete tablet files (as part of merge or deleterow operation) " + fateId); } var results = tabletsMutator.process(); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 1ac2914df88..d364a2fdfce 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; @@ -56,9 +57,9 @@ import org.apache.accumulo.core.gc.thrift.GCMonitorService; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TableInfo; @@ -72,7 +73,6 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; @@ -742,10 +742,9 @@ private void getMonitorLock(HostAndPort monitorLocation) // Get a ZooLock for the monitor UUID zooLockUUID = UUID.randomUUID(); monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, zooLockUUID); + HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher(Type.MONITOR); while (true) { - MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); - monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID, monitorLocation.getHost() + ":" + monitorLocation.getPort(), ThriftService.NONE, @@ -753,11 +752,11 @@ private void getMonitorLock(HostAndPort monitorLocation) monitorLockWatcher.waitForChange(); - if (monitorLockWatcher.acquiredLock) { + if (monitorLockWatcher.isLockAcquired()) { break; } - if (!monitorLockWatcher.failedToAcquireLock) { + if (!monitorLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("monitor lock in unknown state"); } @@ -771,57 +770,6 @@ private void getMonitorLock(HostAndPort monitorLocation) log.info("Got Monitor lock."); } - /** - * Async Watcher for monitor lock - */ - private static class MoniterLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(-1, () -> log.error("No longer able to monitor Monitor lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get monitor lock " + e); - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) {} - } - } - } - public ManagerMonitorInfo getMmi() { return mmi; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 241f7bfded5..1666515c719 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -70,7 +70,6 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; @@ -78,6 +77,7 @@ import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockSupport; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; @@ -93,7 +93,6 @@ import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.cache.Caches.CacheName; @@ -336,24 +335,8 @@ private ServiceLock announceExistence() { ServiceLockSupport.createNonHaServiceLockPath(Type.SCAN_SERVER, zoo, zLockPath); serverLockUUID = UUID.randomUUID(); scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID); - - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> serverStopRequested, + (type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 948197cb237..2c961fac9c3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -80,7 +80,6 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; @@ -88,6 +87,7 @@ import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockSupport; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.manager.thrift.Compacting; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.TableInfo; @@ -105,7 +105,6 @@ import org.apache.accumulo.core.tabletserver.UnloaderParamsImpl; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.ComparablePair; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; @@ -495,24 +494,8 @@ private void announceExistence() { UUID tabletServerUUID = UUID.randomUUID(); tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, tabletServerUUID); - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - log.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); - - } - }; + LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> serverStopRequested, + (type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index d8e0e9d2e17..f594f7b9ec7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -173,7 +173,8 @@ public void testLocations() throws Exception { // test require absent with a future location set try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() - .putLocation(Location.future(ts2)).submit(tm -> false); + .putLocation(Location.future(ts2)).submit(tm -> false, + () -> "Testing that requireAbsentLocation() fails when a future location is set"); assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); } assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); @@ -196,7 +197,8 @@ public void testLocations() throws Exception { try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() .putLocation(Location.future(ts2)).submit(tm -> false); - assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus(), + () -> "Testing that requireAbsentLocation() fails when a current location is set"); } assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation());