From b3424a86b9cc62cfba3ca5eb5f3eef249c82fa30 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 6 Jan 2025 14:08:55 -0500 Subject: [PATCH 01/10] Made lock acquisition consistent (#5224) Modified SimpleGarbageCollector to acquire the service lock in the same manner that other HA services acquire the service lock. Created a new class called ServiceLockSupport that is currently used to hold LockWatcher implementations for HA and non-HA servers. Closes #4839 Co-authored-by: Keith Turner --- .../fate/zookeeper/ServiceLockSupport.java | 152 ++++++++++++++++++ .../coordinator/CompactionCoordinator.java | 5 +- .../coordinator/CoordinatorLockWatcher.java | 94 ----------- .../apache/accumulo/compactor/Compactor.java | 18 +-- .../accumulo/gc/SimpleGarbageCollector.java | 41 +++-- .../org/apache/accumulo/manager/Manager.java | 69 +------- .../org/apache/accumulo/monitor/Monitor.java | 60 +------ .../apache/accumulo/tserver/ScanServer.java | 23 +-- .../apache/accumulo/tserver/TabletServer.java | 23 +-- 9 files changed, 192 insertions(+), 293 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java delete mode 100644 server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java new file mode 100644 index 00000000000..b4d95a703aa --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.zookeeper; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.AccumuloLockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.util.Halt; +import org.apache.zookeeper.KeeperException.NoAuthException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceLockSupport { + + /** + * Lock Watcher used by Highly Available services. These are services where only instance is + * running at a time, but another backup service can be started that will be used if the active + * service instance fails and loses its lock in ZK. + */ + public static class HAServiceLockWatcher implements AccumuloLockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); + + private final String serviceName; + private volatile boolean acquiredLock = false; + private volatile boolean failedToAcquireLock = false; + + public HAServiceLockWatcher(String serviceName) { + this.serviceName = serviceName; + } + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, + () -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e)); + + } + + @Override + public synchronized void acquiredLock() { + LOG.debug("Acquired {} lock", serviceName); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + LOG.warn("Failed to get {} lock", serviceName, e); + + if (e instanceof NoAuthException) { + String msg = + "Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication."; + LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); + Halt.halt(msg, -1); + } + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, + -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + LOG.info("{} lock held by someone else, waiting for a change in state", serviceName); + wait(); + } catch (InterruptedException e) { + // empty + } + } + } + + public boolean isLockAcquired() { + return acquiredLock; + } + + public boolean isFailedToAcquireLock() { + return failedToAcquireLock; + } + + } + + /** + * Lock Watcher used by non-HA services + */ + public static class ServiceLockWatcher implements LockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); + + private final String serviceName; + private final Supplier shuttingDown; + private final Consumer lostLockAction; + + public ServiceLockWatcher(String serviceName, Supplier shuttingDown, + Consumer lostLockAction) { + this.serviceName = serviceName; + this.shuttingDown = shuttingDown; + this.lostLockAction = lostLockAction; + } + + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + if (!shuttingDown.get()) { + LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason); + } + lostLockAction.accept(serviceName); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e)); + } + + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b735d8544dc..7d853b48089 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -58,6 +58,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; @@ -223,11 +224,11 @@ protected void getCoordinatorLock(HostAndPort clientAddress) ServiceLock.path(lockPath), zooLockUUID); while (true) { - CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); + HAServiceLockWatcher coordinatorLockWatcher = new HAServiceLockWatcher("coordinator"); coordinatorLock.lock(coordinatorLockWatcher, coordinatorClientAddress.getBytes(UTF_8)); coordinatorLockWatcher.waitForChange(); - if (coordinatorLockWatcher.isAcquiredLock()) { + if (coordinatorLockWatcher.isLockAcquired()) { break; } if (!coordinatorLockWatcher.isFailedToAcquireLock()) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java deleted file mode 100644 index 663ecaf4a9f..00000000000 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.coordinator; - -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.util.Halt; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CoordinatorLockWatcher implements ServiceLock.AccumuloLockWatcher { - - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class); - - private volatile boolean acquiredLock = false; - private volatile boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - LOG.debug("Acquired Coordinator lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - LOG.warn("Failed to get Coordinator lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire Coordinator lock due to incorrect ZooKeeper authentication."; - LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - LOG.info("Coordinator lock held by someone else, waiting for a change in state"); - wait(); - } catch (InterruptedException e) {} - } - } - - public boolean isAcquiredLock() { - return acquiredLock; - } - - public boolean isFailedToAcquireLock() { - return failedToAcquireLock; - } - -} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 83ff5de7ee0..f95fad41ab2 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -66,8 +66,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.FileOperations; @@ -91,7 +91,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; @@ -288,20 +287,9 @@ protected void announceExistence(HostAndPort clientAddress) compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), ServiceLock.path(zPath), compactorId); - LockWatcher lw = new LockWatcher() { - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - LOG.error("Compactor lost lock (reason = {}), exiting.", reason); - gcLogger.logGCInfo(getConfiguration()); - }); - } - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("compactor", () -> false, + (name) -> gcLogger.logGCInfo(getConfiguration())); try { byte[] lockContent = diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index bd78388836e..9658a21e7a2 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -32,8 +32,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; @@ -44,7 +43,6 @@ import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; @@ -365,31 +363,32 @@ boolean moveToTrash(Path path) throws IOException { private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { var path = ServiceLock.path(getContext().getZooKeeperRoot() + Constants.ZGC_LOCK); - LockWatcher lockWatcher = new LockWatcher() { - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); - } + UUID zooLockUUID = UUID.randomUUID(); + gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e)); + while (true) { + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); + gcLock.lock(gcLockWatcher, + new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8)); + gcLockWatcher.waitForChange(); + + if (gcLockWatcher.isLockAcquired()) { + break; } - }; - UUID zooLockUUID = UUID.randomUUID(); - gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - while (true) { - if (gcLock.tryLock(lockWatcher, - new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8))) { - log.debug("Got GC ZooKeeper lock"); - return; + if (!gcLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("gc lock in unknown state"); } + + gcLock.tryToCancelAsyncLockOrUnlock(); + log.debug("Failed to get GC ZooKeeper lock, will retry"); - sleepUninterruptibly(1, TimeUnit.SECONDS); + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); } + + log.info("Got GC lock."); + } private HostAndPort startStatsService() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a3540fa062a..7ca35732cca 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -68,8 +68,8 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.TStore; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; @@ -105,7 +105,6 @@ import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; @@ -148,7 +147,6 @@ import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -1637,65 +1635,6 @@ public ServiceLock getManagerLock() { return managerLock; } - private static class ManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - log.debug("Acquired manager lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get manager lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire manager lock due to incorrect ZooKeeper authentication."; - log.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) { - // empty - } - } - } - } - private void getManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); @@ -1708,17 +1647,17 @@ private void getManagerLock(final ServiceLockPath zManagerLoc) managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); while (true) { - ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher(); + HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher("manager"); managerLock.lock(managerLockWatcher, managerClientAddress.getBytes(UTF_8)); managerLockWatcher.waitForChange(); - if (managerLockWatcher.acquiredLock) { + if (managerLockWatcher.isLockAcquired()) { startServiceLockVerificationThread(); break; } - if (!managerLockWatcher.failedToAcquireLock) { + if (!managerLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("manager lock in unknown state"); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 4108282bcb3..17ed1737149 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -52,7 +52,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -71,7 +71,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; @@ -858,16 +857,16 @@ private void getMonitorLock() throws KeeperException, InterruptedException { monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, zooLockUUID); while (true) { - MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); + HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher("monitor"); monitorLock.lock(monitorLockWatcher, new byte[0]); monitorLockWatcher.waitForChange(); - if (monitorLockWatcher.acquiredLock) { + if (monitorLockWatcher.isLockAcquired()) { break; } - if (!monitorLockWatcher.failedToAcquireLock) { + if (!monitorLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("monitor lock in unknown state"); } @@ -881,57 +880,6 @@ private void getMonitorLock() throws KeeperException, InterruptedException { log.info("Got Monitor lock."); } - /** - * Async Watcher for monitor lock - */ - private static class MoniterLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(-1, () -> log.error("No longer able to monitor Monitor lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get monitor lock " + e); - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) {} - } - } - } - public ManagerMonitorInfo getMmi() { return mmi; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index b3301f426c2..bfbcdeed9b0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -64,8 +64,8 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -86,7 +86,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -349,24 +348,8 @@ private ServiceLock announceExistence() { serverLockUUID = UUID.randomUUID(); scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID); - - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - gcLogger.logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("scan server", () -> serverStopRequested, + (name) -> gcLogger.logGCInfo(getConfiguration())); // Don't use the normal ServerServices lock content, instead put the server UUID here. byte[] lockContent = (serverLockUUID.toString() + "," + groupName).getBytes(UTF_8); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 54764b86725..3a3ad3f3b14 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -73,8 +73,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -95,7 +95,6 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; @@ -679,24 +678,8 @@ private void announceExistence() { tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID()); - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - log.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - gcLogger.logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); - - } - }; + LockWatcher lw = new ServiceLockWatcher("tablet server", () -> serverStopRequested, + (name) -> gcLogger.logGCInfo(getConfiguration())); byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT) .toString().getBytes(UTF_8); From ae8500331a5b7e751ea5e17c0db731d0ecfec79d Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 6 Jan 2025 19:40:07 +0000 Subject: [PATCH 02/10] Create LockWatcher outside of loop in SimpleGarbageCollector --- .../java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 9658a21e7a2..8ea304a8fd3 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -365,9 +365,9 @@ private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedExc UUID zooLockUUID = UUID.randomUUID(); gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); while (true) { - HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); gcLock.lock(gcLockWatcher, new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8)); From fe22a33b23fe7a36b7febc8068deeaedcdd793d2 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 7 Jan 2025 15:18:14 -0500 Subject: [PATCH 03/10] Better handling for runtime exception in Tablet.commit (#5213) Wrapped call to getTabletMemory().mutate() in try/finally so that a RuntimeException does not short circuit the code in the method that decrements the writes. Added logging in TabletClientHandler to note which client was causing the failure. Closes #5208 Co-authored-by: Keith Turner --- .../accumulo/tserver/TabletClientHandler.java | 7 +++++ .../accumulo/tserver/tablet/Tablet.java | 27 +++++++++---------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 4086a8a963d..087c733e6f9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -469,6 +469,7 @@ private void flush(UpdateSession us) { } } catch (Exception e) { TraceUtil.setException(span2, e, true); + log.error("Error logging mutations sent from {}", TServerUtils.clientAddress.get(), e); throw e; } finally { span2.end(); @@ -496,6 +497,10 @@ private void flush(UpdateSession us) { us.commitTimes.addStat(t2 - t1); updateAvgCommitTime(t2 - t1, sendables.size()); + } catch (Exception e) { + TraceUtil.setException(span3, e, true); + log.error("Error committing mutations sent from {}", TServerUtils.clientAddress.get(), e); + throw e; } finally { span3.end(); } @@ -675,6 +680,7 @@ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, session.commit(mutations); } catch (Exception e) { TraceUtil.setException(span3, e, true); + log.error("Error committing mutations sent from {}", TServerUtils.clientAddress.get(), e); throw e; } finally { span3.end(); @@ -831,6 +837,7 @@ private void writeConditionalMutations(Map mutations) { totalBytes += mutation.numBytes(); } - getTabletMemory().mutate(commitSession, mutations, totalCount); - - synchronized (this) { - if (isCloseComplete()) { - throw new IllegalStateException( - "Tablet " + extent + " closed with outstanding messages to the logger"); + try { + getTabletMemory().mutate(commitSession, mutations, totalCount); + synchronized (this) { + getTabletMemory().updateMemoryUsageStats(); + if (isCloseComplete()) { + throw new IllegalStateException( + "Tablet " + extent + " closed with outstanding messages to the logger"); + } + numEntries += totalCount; + numEntriesInMemory += totalCount; + ingestCount += totalCount; + ingestBytes += totalBytes; } - // decrement here in case an exception is thrown below + } finally { decrementWritesInProgress(commitSession); - - getTabletMemory().updateMemoryUsageStats(); - - numEntries += totalCount; - numEntriesInMemory += totalCount; - ingestCount += totalCount; - ingestBytes += totalBytes; } } From 3d0bea9a3b160eb9f826e521e7cc00dc0a2b7649 Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Tue, 7 Jan 2025 20:18:48 -0500 Subject: [PATCH 04/10] Use a FileHandler when loading properties (#5233) Fixes error when using include or includeOptional property directives in accumulo.properties --- .../java/org/apache/accumulo/core/conf/SiteConfiguration.java | 4 +++- .../accumulo/start/classloader/AccumuloClassLoader.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java index 196486f205c..2a54943217b 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java @@ -39,6 +39,7 @@ import org.apache.commons.configuration2.MapConfiguration; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.commons.configuration2.io.FileHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -222,8 +223,9 @@ private SiteConfiguration(Map config) { private static AbstractConfiguration getPropsFileConfig(URL accumuloPropsLocation) { var config = new PropertiesConfiguration(); if (accumuloPropsLocation != null) { + var fileHandler = new FileHandler(config); try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) { - config.read(reader); + fileHandler.load(reader); } catch (ConfigurationException | IOException e) { throw new IllegalArgumentException(e); } diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java index 3a816dcf9a8..72c76eaa106 100644 --- a/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java @@ -34,6 +34,7 @@ import java.util.regex.Pattern; import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.io.FileHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,8 +91,9 @@ public static String getAccumuloProperty(String propertyName, String defaultValu } try { var config = new PropertiesConfiguration(); + var fileHandler = new FileHandler(config); try (var reader = new InputStreamReader(accumuloConfigUrl.openStream(), UTF_8)) { - config.read(reader); + fileHandler.load(reader); } String value = config.getString(propertyName); if (value != null) { From 9e40a1e0687812d9250008a159a6dc000b46bddb Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 8 Jan 2025 09:35:35 -0500 Subject: [PATCH 05/10] Replaced tserv block cache mgr property with a more general one (#5231) The block cache manager is not only used in the tserver, it's also used in the sserver and by a client that might be reading from an RFile. Closes #4663 --- .../org/apache/accumulo/core/conf/Property.java | 15 ++++++++++++--- .../cache/impl/BlockCacheManagerFactory.java | 10 ++++++++-- .../blockfile/cache/BlockCacheFactoryTest.java | 4 ++-- .../file/blockfile/cache/TestLruBlockCache.java | 14 +++++++------- .../accumulo/core/file/rfile/RFileTest.java | 4 ++-- 5 files changed, 31 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index b1a658ea2f6..981799f6d0d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -294,6 +294,12 @@ public enum Property { + " user-implementations of pluggable Accumulo features, such as the balancer" + " or volume chooser.", "2.0.0"), + GENERAL_CACHE_MANAGER_IMPL("general.block.cache.manager.class", + "org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING, + "Specifies the class name of the block cache factory implementation." + + " Alternative implementation is" + + " org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager.", + "2.1.4"), GENERAL_DELEGATION_TOKEN_LIFETIME("general.delegation.token.lifetime", "7d", PropertyType.TIMEDURATION, "The length of time that delegation tokens and secret keys are valid.", "1.7.0"), @@ -532,7 +538,9 @@ public enum Property { "Time to wait for clients to continue scans before closing a session.", "1.3.5"), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches.", "1.3.5"), - TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", + @Deprecated(since = "2.1.4") + @ReplacedBy(property = Property.GENERAL_CACHE_MANAGER_IMPL) + TSERV_CACHE_MANAGER_IMPL("general.cache.manager.class", "org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING, "Specifies the class name of the block cache factory implementation." + " Alternative implementation is" @@ -1873,8 +1881,9 @@ public static boolean isValidTablePropertyKey(String key) { TSERV_MAX_MESSAGE_SIZE, GENERAL_MAX_MESSAGE_SIZE, RPC_MAX_MESSAGE_SIZE, // block cache options - TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, - TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, + GENERAL_CACHE_MANAGER_IMPL, TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, + TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, + SSERV_SUMMARYCACHE_SIZE, // blocksize options TSERV_DEFAULT_BLOCKSIZE, SSERV_DEFAULT_BLOCKSIZE, diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java index b72979d7297..d62a5e8d468 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java @@ -39,7 +39,10 @@ public class BlockCacheManagerFactory { */ public static synchronized BlockCacheManager getInstance(AccumuloConfiguration conf) throws Exception { - String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + @SuppressWarnings("deprecation") + var cacheManagerProp = + conf.resolve(Property.GENERAL_CACHE_MANAGER_IMPL, Property.TSERV_CACHE_MANAGER_IMPL); + String impl = conf.get(cacheManagerProp); Class clazz = ClassLoaderUtil.loadClass(impl, BlockCacheManager.class); LOG.info("Created new block cache manager of type: {}", clazz.getSimpleName()); @@ -55,7 +58,10 @@ public static synchronized BlockCacheManager getInstance(AccumuloConfiguration c */ public static synchronized BlockCacheManager getClientInstance(AccumuloConfiguration conf) throws Exception { - String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + @SuppressWarnings("deprecation") + var cacheManagerProp = + conf.resolve(Property.GENERAL_CACHE_MANAGER_IMPL, Property.TSERV_CACHE_MANAGER_IMPL); + String impl = conf.get(cacheManagerProp); Class clazz = Class.forName(impl).asSubclass(BlockCacheManager.class); LOG.info("Created new block cache factory of type: {}", clazz.getSimpleName()); diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java index 1cbfbd289fe..f6c3266603c 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java @@ -37,7 +37,7 @@ public class BlockCacheFactoryTest { public void testCreateLruBlockCacheFactory() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManagerFactory.getInstance(cc); } @@ -45,7 +45,7 @@ public void testCreateLruBlockCacheFactory() throws Exception { public void testCreateTinyLfuBlockCacheFactory() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); BlockCacheManagerFactory.getInstance(cc); } diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index b6b0ea39620..e09bad27a84 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -57,7 +57,7 @@ public class TestLruBlockCache { @Test public void testConfiguration() { ConfigurationCopy cc = new ConfigurationCopy(); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(1019)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(1000023)); cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(1000027)); @@ -99,7 +99,7 @@ public void testBackgroundEvictionThread() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -133,7 +133,7 @@ public void testCacheSimple() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -191,7 +191,7 @@ public void testCacheEvictionSimple() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -241,7 +241,7 @@ public void testCacheEvictionTwoPriorities() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -309,7 +309,7 @@ public void testCacheEvictionThreePriorities() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -434,7 +434,7 @@ public void testScanResistance() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 082e0d6ea7b..33545639efd 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -308,7 +308,7 @@ public void openReader(boolean cfsi) throws IOException { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); try { manager = BlockCacheManagerFactory.getInstance(cc); } catch (Exception e) { @@ -1726,7 +1726,7 @@ private void runVersionTest(int version, ConfigurationCopy aconf) throws Excepti byte[] data = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in2 = new FSDataInputStream(bais); - aconf.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + aconf.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); aconf.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); aconf.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); aconf.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); From 36c9740299361874769cb18683c5130153dc488c Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 8 Jan 2025 11:07:16 -0500 Subject: [PATCH 06/10] Checks for corruption earlier and always report errors (#5227) Was looking into an issue were a mutation was corrupted on the network. This caused the mutation to be written to the write ahead log and then a failure occurred in the tablet server which left the tablet in an inconsistent state. Modifed the tablet server code to deserialize mutations as early as possible (it used to deserialize after writing to the walog, now it does it before). Wrote an IT to recreate this problem and found another bug. Writing data to Accumulo does the following. 1. Make a startUpdate RPC to create an update session 2. Make one or more applyUpdates RPCs to add data to the session. These RPCs are thrift oneway calls, so nothing is reported back. 3. Call closeUpdate on the session to see what happened with all of the applyUpdates RPCs done in step 2. If an unexpected exception happened in step 2 above then it would not be reported back to the client. These changes fix and test that as part of testing the corrupt mutation. After these changes if there was an error in step 2, then step 3 now throws an exception. --- .../accumulo/tserver/TabletClientHandler.java | 32 +++- .../tserver/session/UpdateSession.java | 1 + .../accumulo/test/CorruptMutationIT.java | 149 ++++++++++++++++++ 3 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 087c733e6f9..c6b38739029 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -267,8 +267,8 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) { return; } - if (us.currentTablet == null - && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) { + if (us.currentTablet == null && (us.failures.containsKey(keyExtent) + || us.authFailures.containsKey(keyExtent) || us.unhandledException != null)) { // if there were previous failures, then do not accept additional writes return; } @@ -339,6 +339,11 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List mutations = us.queuedMutations.get(us.currentTablet); for (TMutation tmutation : tmutations) { Mutation mutation = new ServerMutation(tmutation); + // Deserialize the mutation in an attempt to check for data corruption that happened on + // the network. This will avoid writing a corrupt mutation to the write ahead log and + // failing after its written to the write ahead log when it is deserialized to update the + // in memory map. + mutation.getUpdates(); mutations.add(mutation); additionalMutationSize += mutation.numBytes(); } @@ -358,6 +363,15 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, } } } + } catch (RuntimeException e) { + // This method is a thrift oneway method so an exception from it will not make it back to the + // client. Need to record the exception and set the session such that any future updates to + // the session are ignored. + us.unhandledException = e; + us.currentTablet = null; + + // Rethrowing it will cause logging from thrift, so not adding logging here. + throw e; } finally { if (reserved) { server.sessionManager.unreserveSession(us); @@ -541,6 +555,20 @@ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDE } try { + if (us.unhandledException != null) { + // Since flush() is not being called, any memory added to the global queued mutations + // counter will not be decremented. So do that here before throwing an exception. + server.updateTotalQueuedMutationSize(-us.queuedMutationSize); + us.queuedMutationSize = 0; + // make this memory available for GC + us.queuedMutations.clear(); + + // Something unexpected happened during this write session, so throw an exception here to + // cause a TApplicationException on the client side. + throw new IllegalStateException( + "Write session " + updateID + " saw an unexpected exception", us.unhandledException); + } + // clients may or may not see data from an update session while // it is in progress, however when the update session is closed // want to ensure that reads wait for the write to finish diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java index ca487294d7f..dc32765c28b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java @@ -50,6 +50,7 @@ public class UpdateSession extends Session { public long flushTime = 0; public long queuedMutationSize = 0; public final Durability durability; + public Exception unhandledException = null; public UpdateSession(TservConstraintEnv env, TCredentials credentials, Durability durability) { super(credentials); diff --git a/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java new file mode 100644 index 00000000000..552cd46b339 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TMutation; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletserver.thrift.TDurability; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TServiceClient; +import org.junit.jupiter.api.Test; + +public class CorruptMutationIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "10"); + } + + @Test + public void testCorruptMutation() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().create(table); + try (BatchWriter writer = c.createBatchWriter(table)) { + Mutation m = new Mutation("1"); + m.put("f1", "q1", new Value("v1")); + writer.addMutation(m); + } + + var ctx = (ClientContext) c; + var tableId = ctx.getTableId(table); + var extent = new KeyExtent(tableId, null, null); + var tabletMetadata = ctx.getAmple().readTablet(extent, TabletMetadata.ColumnType.LOCATION); + var location = tabletMetadata.getLocation(); + assertNotNull(location); + assertEquals(TabletMetadata.LocationType.CURRENT, location.getType()); + + TabletClientService.Iface client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, location.getHostAndPort(), ctx); + // Make the same RPC calls made by the BatchWriter, but pass a corrupt serialized mutation in + // this try block. + try { + TInfo tinfo = TraceUtil.traceInfo(); + + long sessionId = client.startUpdate(tinfo, ctx.rpcCreds(), TDurability.DEFAULT); + + // Write two valid mutations to the session. The tserver buffers data it receives via + // applyUpdates and may not write them until closeUpdate RPC is called. Because + // TSERV_TOTAL_MUTATION_QUEUE_MAX was set so small, these values should be written. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("abc", "z1"), createTMutation("def", "z2"))); + + // Simulate data corruption in the serialized mutation + TMutation badMutation = createTMutation("ghi", "z3"); + badMutation.entries = -42; + + // Write some good and bad mutations to the session. The server side will see an error here, + // however since this is a thrift oneway method no exception is expected here. This should + // leave the session in a broken state where it no longer accepts any new data. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("jkl", "z4"), badMutation, createTMutation("mno", "z5"))); + + // Write two more valid mutations to the session, these should be dropped on the server side + // because of the previous error. So should never see these updates. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("pqr", "z6"), createTMutation("stu", "z7"))); + + // Since client.applyUpdates experienced an error, should see an error when closing the + // session. + assertThrows(TApplicationException.class, () -> client.closeUpdate(tinfo, sessionId)); + } finally { + ThriftUtil.returnClient((TServiceClient) client, ctx); + } + + // The values that a scan must see + Set expectedValues = Set.of("v1", "v2", "z1", "z2"); + + // The failed mutation should not have left the tablet in a bad state. Do some follow-on + // actions to ensure the tablet is still functional. + try (BatchWriter writer = c.createBatchWriter(table)) { + Mutation m = new Mutation("2"); + m.put("f1", "q1", new Value("v2")); + writer.addMutation(m); + } + + try (Scanner scanner = c.createScanner(table)) { + var valuesSeen = + scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet()); + assertEquals(expectedValues, valuesSeen); + } + + c.tableOperations().flush(table, null, null, true); + + try (Scanner scanner = c.createScanner(table)) { + var valuesSeen = + scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet()); + assertEquals(expectedValues, valuesSeen); + } + } + } + + private static TMutation createTMutation(String row, String value) { + Mutation m = new Mutation(row); + m.put("x", "y", value); + return m.toThrift(); + } +} From d5756e8f2d39ed114794d5a695d2e09203ee57ae Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 8 Jan 2025 11:42:58 -0500 Subject: [PATCH 07/10] decrements total queued mutation size when update idles (#5236) fixes #5235 --- .../accumulo/tserver/TabletClientHandler.java | 16 +++++++++++++++- .../apache/accumulo/tserver/TabletServer.java | 7 ++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index c6b38739029..7ef64352d95 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -258,7 +258,21 @@ public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdura UpdateSession us = new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials), - credentials, durability); + credentials, durability) { + @Override + public boolean cleanup() { + // This is called when a client abandons a session. When this happens need to decrement + // any queued mutations. + if (queuedMutationSize > 0) { + log.trace( + "cleaning up abandoned update session, decrementing totalQueuedMutationSize by {}", + queuedMutationSize); + server.updateTotalQueuedMutationSize(-queuedMutationSize); + queuedMutationSize = 0; + } + return true; + } + }; return server.sessionManager.createSession(us, false); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3a3ad3f3b14..beb2e550a68 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -417,7 +417,12 @@ public void run() { } public long updateTotalQueuedMutationSize(long additionalMutationSize) { - return totalQueuedMutationSize.addAndGet(additionalMutationSize); + var newTotal = totalQueuedMutationSize.addAndGet(additionalMutationSize); + if (log.isTraceEnabled()) { + log.trace("totalQueuedMutationSize is now {} after adding {}", newTotal, + additionalMutationSize); + } + return newTotal; } @Override From 8124a6b33f8ec92ae5d943e727abda4b52aa04ea Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Wed, 8 Jan 2025 13:55:35 -0500 Subject: [PATCH 08/10] Only create Threadpools when calling ZooInfoViewer or ZooPropEditor (#5216) * Threadpools are now created on execute Before this change, the "accumulo.pool.scheduled.future.checker" thread would be created when ZooInfoViewer and ZooPropEditor were loaded. This change ensures the threadpools are only created when the classes are run specifically. * Move NullWatcher creation to generateReport Moves the nullwatcher creation down so the mock zooReader object will work. --- .../server/conf/util/ZooInfoViewer.java | 17 +++--- .../server/conf/util/ZooPropEditor.java | 5 +- .../server/conf/util/ZooInfoViewerTest.java | 59 +++++++++++++++---- 3 files changed, 57 insertions(+), 24 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java index c45b0b8832e..c6763e85968 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java @@ -85,8 +85,7 @@ public class ZooInfoViewer implements KeywordExecutable { DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC)); private static final Logger log = LoggerFactory.getLogger(ZooInfoViewer.class); - private final NullWatcher nullWatcher = - new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); + private NullWatcher nullWatcher; private static final String INDENT = " "; @@ -107,6 +106,7 @@ public String description() { @Override public void execute(String[] args) throws Exception { + nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts(); opts.parseArgs(ZooInfoViewer.class.getName(), args); @@ -115,16 +115,15 @@ public void execute(String[] args) throws Exception { log.info("print properties: {}", opts.printProps); log.info("print instances: {}", opts.printInstanceIds); - var conf = opts.getSiteConfiguration(); - - ZooReader zooReader = new ZooReaderWriter(conf); - - try (ServerContext context = new ServerContext(conf)) { - InstanceId iid = context.getInstanceID(); - generateReport(iid, opts, zooReader); + try (ServerContext context = getContext(opts)) { + generateReport(context.getInstanceID(), opts, context.getZooReader()); } } + ServerContext getContext(ZooInfoViewer.Opts opts) { + return new ServerContext(opts.getSiteConfiguration()); + } + void generateReport(final InstanceId iid, final ZooInfoViewer.Opts opts, final ZooReader zooReader) throws Exception { diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java index ff277202fa3..c8e9da32ea3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java @@ -61,8 +61,7 @@ public class ZooPropEditor implements KeywordExecutable { private static final Logger LOG = LoggerFactory.getLogger(ZooPropEditor.class); - private final NullWatcher nullWatcher = - new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); + private NullWatcher nullWatcher; /** * No-op constructor - provided so ServiceLoader autoload does not consume resources. @@ -82,6 +81,8 @@ public String description() { @Override public void execute(String[] args) throws Exception { + nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); + ZooPropEditor.Opts opts = new ZooPropEditor.Opts(); opts.parseArgs(ZooPropEditor.class.getName(), args); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java index 9ea587a320a..d9113506da6 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java @@ -26,6 +26,7 @@ import static org.apache.accumulo.core.Constants.ZTABLES; import static org.apache.accumulo.core.Constants.ZTABLE_NAME; import static org.apache.accumulo.core.Constants.ZTABLE_NAMESPACE; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.eq; @@ -50,13 +51,18 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReader; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.MockServerContext; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.codec.VersionedPropCodec; import org.apache.accumulo.server.conf.codec.VersionedProperties; import org.apache.accumulo.server.conf.store.NamespacePropKey; +import org.apache.accumulo.server.conf.store.PropStore; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.apache.accumulo.server.conf.store.TablePropKey; import org.apache.accumulo.server.conf.store.impl.PropStoreWatcher; +import org.apache.accumulo.server.conf.store.impl.ZooPropStore; import org.apache.zookeeper.data.Stat; import org.easymock.Capture; import org.junit.jupiter.api.Test; @@ -139,7 +145,7 @@ public void allOpts() { public void instanceIdOutputTest() throws Exception { String uuid = UUID.randomUUID().toString(); - ZooReader zooReader = createMock(ZooReader.class); + ZooReaderWriter zooReader = createMock(ZooReaderWriter.class); var instanceName = "test"; expect(zooReader.getChildren(eq(ZROOT + ZINSTANCES))).andReturn(List.of(instanceName)).once(); expect(zooReader.getData(eq(ZROOT + ZINSTANCES + "/" + instanceName))) @@ -148,14 +154,24 @@ public void instanceIdOutputTest() throws Exception { String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt"; - ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts(); - opts.parseArgs(ZooInfoViewer.class.getName(), - new String[] {"--print-instances", "--outfile", testFileName}); + ServerContext context = + MockServerContext.getWithZK(InstanceId.of(instanceName), instanceName, 20_000); + expect(context.getZooReader()).andReturn(zooReader).once(); + context.close(); - ZooInfoViewer viewer = new ZooInfoViewer(); - viewer.generateReport(InstanceId.of(uuid), opts, zooReader); + replay(context); - verify(zooReader); + class ZooInfoViewerTestClazz extends ZooInfoViewer { + @Override + ServerContext getContext(ZooInfoViewer.Opts ots) { + return context; + } + } + + ZooInfoViewer viewer = new ZooInfoViewerTestClazz(); + viewer.execute(new String[] {"--print-instances", "--outfile", testFileName}); + + verify(zooReader, context); String line; try (Scanner scanner = new Scanner(new File(testFileName))) { @@ -286,14 +302,31 @@ public void propTest() throws Exception { String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt"; - ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts(); - opts.parseArgs(ZooInfoViewer.class.getName(), - new String[] {"--print-props", "--outfile", testFileName}); + ZooReaderWriter zrw = createMock(ZooReaderWriter.class); + expect(zrw.getSessionTimeout()).andReturn(2_000).anyTimes(); + expect(zrw.exists(eq("/accumulo/" + iid), anyObject())).andReturn(true).anyTimes(); - ZooInfoViewer viewer = new ZooInfoViewer(); - viewer.generateReport(InstanceId.of(uuid), opts, zooReader); + replay(zrw); - verify(zooReader); + PropStore propStore = ZooPropStore.initialize(iid, zrw); + + ServerContext context = MockServerContext.getMockContextWithPropStore(iid, zrw, propStore); + expect(context.getZooReader()).andReturn(zooReader).once(); + context.close(); + + replay(context); + + class ZooInfoViewerTestClazz extends ZooInfoViewer { + @Override + ServerContext getContext(ZooInfoViewer.Opts ots) { + return context; + } + } + + ZooInfoViewer viewer = new ZooInfoViewerTestClazz(); + viewer.execute(new String[] {"--print-props", "--outfile", testFileName}); + + verify(zooReader, context); Map props = new HashMap<>(); try (Scanner scanner = new Scanner(new File(testFileName))) { From 25dcfbfe14d9b71e8c9b2e4b079287bbcd6ae476 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 8 Jan 2025 14:30:54 -0500 Subject: [PATCH 09/10] Fix issue with ZooPropEditor setting classloader context property (#5238) The root cause of the issue is that ClassLoaderUtil.initContextFactory was not being called which caused an NPE to be thrown which returned false from ClassLoaderUtil.isValidContext. Closes #5198 --- .../core/classloader/ClassLoaderUtil.java | 2 +- .../apache/accumulo/server/util/PropUtil.java | 19 ++-- .../accumulo/server/util/SystemPropUtil.java | 11 +- .../accumulo/server/util/PropUtilTest.java | 102 ++++++++++++++++++ 4 files changed, 120 insertions(+), 14 deletions(-) create mode 100644 server/base/src/test/java/org/apache/accumulo/server/util/PropUtilTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java index 49287679d07..7af94627b0c 100644 --- a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java @@ -69,7 +69,7 @@ static ContextClassLoaderFactory getContextFactory() { } // for testing - static synchronized void resetContextFactoryForTests() { + public static synchronized void resetContextFactoryForTests() { FACTORY = null; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java index 85e03ea1c29..b49acae2481 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java @@ -39,7 +39,7 @@ private PropUtil() {} */ public static void setProperties(final ServerContext context, final PropStoreKey propStoreKey, final Map properties) throws IllegalArgumentException { - PropUtil.validateProperties(propStoreKey, properties); + PropUtil.validateProperties(context, propStoreKey, properties); context.getPropStore().putAll(propStoreKey, properties); } @@ -51,12 +51,13 @@ public static void removeProperties(final ServerContext context, public static void replaceProperties(final ServerContext context, final PropStoreKey propStoreKey, final long version, final Map properties) throws IllegalArgumentException { - PropUtil.validateProperties(propStoreKey, properties); + PropUtil.validateProperties(context, propStoreKey, properties); context.getPropStore().replaceAll(propStoreKey, version, properties); } - protected static void validateProperties(final PropStoreKey propStoreKey, - final Map properties) throws IllegalArgumentException { + protected static void validateProperties(final ServerContext context, + final PropStoreKey propStoreKey, final Map properties) + throws IllegalArgumentException { for (Map.Entry prop : properties.entrySet()) { if (!Property.isValidProperty(prop.getKey(), prop.getValue())) { String exceptionMessage = "Invalid property for : "; @@ -66,10 +67,12 @@ protected static void validateProperties(final PropStoreKey propStoreKey, throw new IllegalArgumentException(exceptionMessage + propStoreKey + " name: " + prop.getKey() + ", value: " + prop.getValue()); } else if (prop.getKey().equals(Property.TABLE_CLASSLOADER_CONTEXT.getKey()) - && !Property.TABLE_CLASSLOADER_CONTEXT.getDefaultValue().equals(prop.getValue()) - && !ClassLoaderUtil.isValidContext(prop.getValue())) { - throw new IllegalArgumentException( - "Unable to resolve classloader for context: " + prop.getValue()); + && !Property.TABLE_CLASSLOADER_CONTEXT.getDefaultValue().equals(prop.getValue())) { + ClassLoaderUtil.initContextFactory(context.getConfiguration()); + if (!ClassLoaderUtil.isValidContext(prop.getValue())) { + throw new IllegalArgumentException( + "Unable to resolve classloader for context: " + prop.getValue()); + } } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java index 0bb2e8a8e3b..b8589ab21ab 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java @@ -37,7 +37,8 @@ public class SystemPropUtil { public static void setSystemProperty(ServerContext context, String property, String value) throws IllegalArgumentException { final SystemPropKey key = SystemPropKey.of(context); - context.getPropStore().putAll(key, Map.of(validateSystemProperty(key, property, value), value)); + context.getPropStore().putAll(key, + Map.of(validateSystemProperty(context, key, property, value), value)); } public static void modifyProperties(ServerContext context, long version, @@ -46,7 +47,7 @@ public static void modifyProperties(ServerContext context, long version, final Map checkedProperties = properties.entrySet().stream() .collect(Collectors.toMap( - entry -> validateSystemProperty(key, entry.getKey(), entry.getValue()), + entry -> validateSystemProperty(context, key, entry.getKey(), entry.getValue()), Map.Entry::getValue)); context.getPropStore().replaceAll(key, version, checkedProperties); } @@ -66,8 +67,8 @@ public static void removePropWithoutDeprecationWarning(ServerContext context, St context.getPropStore().removeProperties(SystemPropKey.of(context), List.of(property)); } - private static String validateSystemProperty(SystemPropKey key, String property, - final String value) throws IllegalArgumentException { + private static String validateSystemProperty(ServerContext context, SystemPropKey key, + String property, final String value) throws IllegalArgumentException { // Retrieve the replacement name for this property, if there is one. // Do this before we check if the name is a valid zookeeper name. final var original = property; @@ -88,7 +89,7 @@ private static String validateSystemProperty(SystemPropKey key, String property, throw iae; } if (Property.isValidTablePropertyKey(property)) { - PropUtil.validateProperties(key, Map.of(property, value)); + PropUtil.validateProperties(context, key, Map.of(property, value)); } // Find the property taking prefix into account diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/PropUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/PropUtilTest.java new file mode 100644 index 00000000000..e16fc75e450 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/PropUtilTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.util; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Map; + +import org.apache.accumulo.core.classloader.ClassLoaderUtil; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.store.TablePropKey; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class PropUtilTest { + + private static final String TEST_CONTEXT_NAME = "TestContext"; + private static final String INVALID_TEST_CONTEXT_NAME = "InvalidTestContext"; + + public static class TestContextClassLoaderFactory implements ContextClassLoaderFactory { + + @Override + public ClassLoader getClassLoader(String contextName) { + if (contextName.equals(TEST_CONTEXT_NAME)) { + return this.getClass().getClassLoader(); + } + return null; + } + + } + + @BeforeEach + public void before() { + ClassLoaderUtil.resetContextFactoryForTests(); + } + + @Test + public void testSetClasspathContextFails() { + ServerContext ctx = createMock(ServerContext.class); + AccumuloConfiguration conf = createMock(AccumuloConfiguration.class); + InstanceId iid = createMock(InstanceId.class); + TableId tid = createMock(TableId.class); + TablePropKey tkey = TablePropKey.of(iid, tid); + + expect(ctx.getConfiguration()).andReturn(conf).once(); + expect(conf.get(Property.GENERAL_CONTEXT_CLASSLOADER_FACTORY)) + .andReturn(TestContextClassLoaderFactory.class.getName()); + + replay(ctx, conf, iid, tid); + IllegalArgumentException e = + assertThrows(IllegalArgumentException.class, () -> PropUtil.validateProperties(ctx, tkey, + Map.of(Property.TABLE_CLASSLOADER_CONTEXT.getKey(), INVALID_TEST_CONTEXT_NAME))); + assertEquals("Unable to resolve classloader for context: " + INVALID_TEST_CONTEXT_NAME, + e.getMessage()); + verify(ctx, conf, iid, tid); + } + + @Test + public void testSetClasspathContext() { + ServerContext ctx = createMock(ServerContext.class); + AccumuloConfiguration conf = createMock(AccumuloConfiguration.class); + InstanceId iid = createMock(InstanceId.class); + TableId tid = createMock(TableId.class); + TablePropKey tkey = TablePropKey.of(iid, tid); + + expect(ctx.getConfiguration()).andReturn(conf).once(); + expect(conf.get(Property.GENERAL_CONTEXT_CLASSLOADER_FACTORY)) + .andReturn(TestContextClassLoaderFactory.class.getName()); + + replay(ctx, conf, iid, tid); + PropUtil.validateProperties(ctx, tkey, + Map.of(Property.TABLE_CLASSLOADER_CONTEXT.getKey(), TEST_CONTEXT_NAME)); + verify(ctx, conf, iid, tid); + } + +} From 6fe8b0c5f1f0b0821524f6c20a283e1d6d37308f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 9 Jan 2025 14:13:01 -0500 Subject: [PATCH 10/10] Modified SessionManager to recognize session idle property updates (#5239) Reverted changes in #4780, which marked the properties as not mutable. Passed the ServerContext to the sweep method instead of the property values. The property values are resolved at sweep method execution time. Closes #4723 --- .../apache/accumulo/core/conf/Property.java | 3 +-- .../tserver/session/SessionManager.java | 18 +++++++----------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 981799f6d0d..942dfeea456 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1898,8 +1898,7 @@ public static boolean isValidTablePropertyKey(String key) { COMPACTOR_MINTHREADS_TIMEOUT, // others - TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, - TSERV_SESSION_MAXIDLE, TSERV_UPDATE_SESSION_MAXIDLE); + TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME); /** * Checks if the given property may be changed via Zookeeper, but not recognized until the restart diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index d1bf4d6f8cf..0726c1997cb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -70,22 +70,15 @@ public class SessionManager { private static final SecureRandom random = new SecureRandom(); private final ConcurrentMap sessions = new ConcurrentHashMap<>(); - private final long maxIdle; - private final long maxUpdateIdle; private final BlockingQueue deferredCleanupQueue = new ArrayBlockingQueue<>(5000); private final Long expiredSessionMarker = (long) -1; - private final AccumuloConfiguration aconf; private final ServerContext ctx; private volatile LongConsumer zombieCountConsumer = null; public SessionManager(ServerContext context) { this.ctx = context; - this.aconf = context.getConfiguration(); - maxUpdateIdle = aconf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); - maxIdle = aconf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); - - Runnable r = () -> sweep(maxIdle, maxUpdateIdle); - + long maxIdle = ctx.getConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); + Runnable r = () -> sweep(ctx); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(r, 0, Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS)); } @@ -107,7 +100,7 @@ public long createSession(Session session, boolean reserve) { } public long getMaxIdleTime() { - return maxIdle; + return ctx.getConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); } /** @@ -309,7 +302,10 @@ private void cleanup(Session session) { cleanup(deferredCleanupQueue, session); } - private void sweep(final long maxIdle, final long maxUpdateIdle) { + private void sweep(final ServerContext context) { + final AccumuloConfiguration conf = context.getConfiguration(); + final long maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); + final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); List sessionsToCleanup = new LinkedList<>(); Iterator iter = sessions.values().iterator(); while (iter.hasNext()) {