diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java index dc84a6325b8..d87c07e2690 100644 --- a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java @@ -69,7 +69,7 @@ static ContextClassLoaderFactory getContextFactory() { } // for testing - static synchronized void resetContextFactoryForTests() { + public static synchronized void resetContextFactoryForTests() { FACTORY = null; } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6b70bd059d7..42a57a1962a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -283,6 +283,9 @@ public enum Property { + " user-implementations of pluggable Accumulo features, such as the balancer" + " or volume chooser.", "2.0.0"), + GENERAL_CACHE_MANAGER_IMPL("general.block.cache.manager.class", + TinyLfuBlockCacheManager.class.getName(), PropertyType.STRING, + "Specifies the class name of the block cache factory implementation.", "2.1.4"), GENERAL_DELEGATION_TOKEN_LIFETIME("general.delegation.token.lifetime", "7d", PropertyType.TIMEDURATION, "The length of time that delegation tokens and secret keys are valid.", "1.7.0"), @@ -514,6 +517,8 @@ public enum Property { "Time to wait for clients to continue scans before closing a session.", "1.3.5"), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches.", "1.3.5"), + @Deprecated(since = "2.1.4") + @ReplacedBy(property = Property.GENERAL_CACHE_MANAGER_IMPL) TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", TinyLfuBlockCacheManager.class.getName(), PropertyType.STRING, "Specifies the class name of the block cache factory implementation.", "2.0.0"), @@ -1535,8 +1540,9 @@ public static boolean isValidTablePropertyKey(String key) { RPC_MAX_MESSAGE_SIZE, // block cache options - TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, - TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, + GENERAL_CACHE_MANAGER_IMPL, TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, + TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, + SSERV_SUMMARYCACHE_SIZE, // blocksize options TSERV_DEFAULT_BLOCKSIZE, SSERV_DEFAULT_BLOCKSIZE, @@ -1551,8 +1557,7 @@ public static boolean isValidTablePropertyKey(String key) { COMPACTOR_MINTHREADS_TIMEOUT, // others - TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, - TSERV_SESSION_MAXIDLE, TSERV_UPDATE_SESSION_MAXIDLE); + TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME); /** * Checks if the given property may be changed via Zookeeper, but not recognized until the restart diff --git a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java index ed1775cbdb0..1196bd2152e 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java @@ -39,6 +39,7 @@ import org.apache.commons.configuration2.MapConfiguration; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.commons.configuration2.io.FileHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -211,8 +212,9 @@ private SiteConfiguration(Map config) { private static AbstractConfiguration getPropsFileConfig(URL accumuloPropsLocation) { var config = new PropertiesConfiguration(); if (accumuloPropsLocation != null) { + var fileHandler = new FileHandler(config); try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) { - config.read(reader); + fileHandler.load(reader); } catch (ConfigurationException | IOException e) { throw new IllegalArgumentException(e); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java index 655b7b3ad60..d0b34164e59 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java @@ -39,7 +39,10 @@ public class BlockCacheManagerFactory { */ public static synchronized BlockCacheManager getInstance(AccumuloConfiguration conf) throws ReflectiveOperationException { - String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + @SuppressWarnings("deprecation") + var cacheManagerProp = + conf.resolve(Property.GENERAL_CACHE_MANAGER_IMPL, Property.TSERV_CACHE_MANAGER_IMPL); + String impl = conf.get(cacheManagerProp); Class clazz = ClassLoaderUtil.loadClass(impl, BlockCacheManager.class); LOG.info("Created new block cache manager of type: {}", clazz.getSimpleName()); @@ -55,7 +58,10 @@ public static synchronized BlockCacheManager getInstance(AccumuloConfiguration c */ public static synchronized BlockCacheManager getClientInstance(AccumuloConfiguration conf) throws ReflectiveOperationException { - String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + @SuppressWarnings("deprecation") + var cacheManagerProp = + conf.resolve(Property.GENERAL_CACHE_MANAGER_IMPL, Property.TSERV_CACHE_MANAGER_IMPL); + String impl = conf.get(cacheManagerProp); Class clazz = Class.forName(impl).asSubclass(BlockCacheManager.class); LOG.info("Created new block cache factory of type: {}", clazz.getSimpleName()); diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java new file mode 100644 index 00000000000..3129511b5e3 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.lock; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; +import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.util.Halt; +import org.apache.zookeeper.KeeperException.NoAuthException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceLockSupport { + + /** + * Lock Watcher used by Highly Available services. These are services where only instance is + * running at a time, but another backup service can be started that will be used if the active + * service instance fails and loses its lock in ZK. + */ + public static class HAServiceLockWatcher implements AccumuloLockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); + + private final String serviceName; + private volatile boolean acquiredLock = false; + private volatile boolean failedToAcquireLock = false; + + public HAServiceLockWatcher(String serviceName) { + this.serviceName = serviceName; + } + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, + () -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e)); + + } + + @Override + public synchronized void acquiredLock() { + LOG.debug("Acquired {} lock", serviceName); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + LOG.warn("Failed to get {} lock", serviceName, e); + + if (e instanceof NoAuthException) { + String msg = + "Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication."; + LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); + Halt.halt(msg, -1); + } + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, + -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + LOG.info("{} lock held by someone else, waiting for a change in state", serviceName); + wait(); + } catch (InterruptedException e) { + // empty + } + } + } + + public boolean isLockAcquired() { + return acquiredLock; + } + + public boolean isFailedToAcquireLock() { + return failedToAcquireLock; + } + + } + + /** + * Lock Watcher used by non-HA services + */ + public static class ServiceLockWatcher implements LockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); + + private final String serviceName; + private final Supplier shuttingDown; + private final Consumer lostLockAction; + + public ServiceLockWatcher(String serviceName, Supplier shuttingDown, + Consumer lostLockAction) { + this.serviceName = serviceName; + this.shuttingDown = shuttingDown; + this.lostLockAction = lostLockAction; + } + + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + if (!shuttingDown.get()) { + LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason); + } + lostLockAction.accept(serviceName); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e)); + } + + } + +} diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java index 8338d2735a7..c697ca17a26 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java @@ -38,7 +38,7 @@ public class BlockCacheFactoryTest { public void testCreateLruBlockCacheFactory() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManagerFactory.getInstance(cc); } @@ -46,7 +46,7 @@ public void testCreateLruBlockCacheFactory() throws Exception { public void testCreateTinyLfuBlockCacheFactory() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); BlockCacheManagerFactory.getInstance(cc); } diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index e12dacfe71f..d5a23c1bd84 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -55,7 +55,7 @@ public class TestLruBlockCache { @Test public void testConfiguration() { ConfigurationCopy cc = new ConfigurationCopy(); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(1019)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(1000023)); cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(1000027)); @@ -97,7 +97,7 @@ public void testBackgroundEvictionThread() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -131,7 +131,7 @@ public void testCacheSimple() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -189,7 +189,7 @@ public void testCacheEvictionSimple() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -239,7 +239,7 @@ public void testCacheEvictionTwoPriorities() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -307,7 +307,7 @@ public void testCacheEvictionThreePriorities() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); @@ -432,7 +432,7 @@ public void testScanResistance() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java index d65991590ad..1c9e8a6a04a 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java @@ -164,7 +164,7 @@ public void openReader(boolean cfsi, Range fence) throws IOException { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); + cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); try { manager = BlockCacheManagerFactory.getInstance(cc); } catch (ReflectiveOperationException e) { diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 81c76d58ff6..babd6215167 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -1584,7 +1584,7 @@ private void runVersionTest(int version, ConfigurationCopy aconf) throws Excepti byte[] data = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in2 = new FSDataInputStream(bais); - aconf.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); + aconf.set(Property.GENERAL_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); aconf.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); aconf.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); aconf.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java index ba362353336..bd8bd9e0d5f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java @@ -81,8 +81,7 @@ public class ZooInfoViewer implements KeywordExecutable { DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC)); private static final Logger log = LoggerFactory.getLogger(ZooInfoViewer.class); - private final NullWatcher nullWatcher = - new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); + private NullWatcher nullWatcher; private static final String INDENT = " "; @@ -103,6 +102,7 @@ public String description() { @Override public void execute(String[] args) throws Exception { + nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts(); opts.parseArgs(ZooInfoViewer.class.getName(), args); @@ -111,13 +111,15 @@ public void execute(String[] args) throws Exception { log.info("print properties: {}", opts.printProps); log.info("print instances: {}", opts.printInstanceIds); - var conf = opts.getSiteConfiguration(); - - try (ServerContext context = new ServerContext(conf)) { + try (ServerContext context = getContext(opts)) { generateReport(context, opts); } } + ServerContext getContext(ZooInfoViewer.Opts opts) { + return new ServerContext(opts.getSiteConfiguration()); + } + void generateReport(final ServerContext context, final ZooInfoViewer.Opts opts) throws Exception { OutputStream outStream; diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java index 1a4f426034b..e96a90370b2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java @@ -58,8 +58,7 @@ public class ZooPropEditor implements KeywordExecutable { private static final Logger LOG = LoggerFactory.getLogger(ZooPropEditor.class); - private final NullWatcher nullWatcher = - new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); + private NullWatcher nullWatcher; /** * No-op constructor - provided so ServiceLoader autoload does not consume resources. @@ -79,6 +78,8 @@ public String description() { @Override public void execute(String[] args) throws Exception { + nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L)); + ZooPropEditor.Opts opts = new ZooPropEditor.Opts(); opts.parseArgs(ZooPropEditor.class.getName(), args); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java index 85e03ea1c29..b49acae2481 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/PropUtil.java @@ -39,7 +39,7 @@ private PropUtil() {} */ public static void setProperties(final ServerContext context, final PropStoreKey propStoreKey, final Map properties) throws IllegalArgumentException { - PropUtil.validateProperties(propStoreKey, properties); + PropUtil.validateProperties(context, propStoreKey, properties); context.getPropStore().putAll(propStoreKey, properties); } @@ -51,12 +51,13 @@ public static void removeProperties(final ServerContext context, public static void replaceProperties(final ServerContext context, final PropStoreKey propStoreKey, final long version, final Map properties) throws IllegalArgumentException { - PropUtil.validateProperties(propStoreKey, properties); + PropUtil.validateProperties(context, propStoreKey, properties); context.getPropStore().replaceAll(propStoreKey, version, properties); } - protected static void validateProperties(final PropStoreKey propStoreKey, - final Map properties) throws IllegalArgumentException { + protected static void validateProperties(final ServerContext context, + final PropStoreKey propStoreKey, final Map properties) + throws IllegalArgumentException { for (Map.Entry prop : properties.entrySet()) { if (!Property.isValidProperty(prop.getKey(), prop.getValue())) { String exceptionMessage = "Invalid property for : "; @@ -66,10 +67,12 @@ protected static void validateProperties(final PropStoreKey propStoreKey, throw new IllegalArgumentException(exceptionMessage + propStoreKey + " name: " + prop.getKey() + ", value: " + prop.getValue()); } else if (prop.getKey().equals(Property.TABLE_CLASSLOADER_CONTEXT.getKey()) - && !Property.TABLE_CLASSLOADER_CONTEXT.getDefaultValue().equals(prop.getValue()) - && !ClassLoaderUtil.isValidContext(prop.getValue())) { - throw new IllegalArgumentException( - "Unable to resolve classloader for context: " + prop.getValue()); + && !Property.TABLE_CLASSLOADER_CONTEXT.getDefaultValue().equals(prop.getValue())) { + ClassLoaderUtil.initContextFactory(context.getConfiguration()); + if (!ClassLoaderUtil.isValidContext(prop.getValue())) { + throw new IllegalArgumentException( + "Unable to resolve classloader for context: " + prop.getValue()); + } } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java index 0bb2e8a8e3b..b8589ab21ab 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java @@ -37,7 +37,8 @@ public class SystemPropUtil { public static void setSystemProperty(ServerContext context, String property, String value) throws IllegalArgumentException { final SystemPropKey key = SystemPropKey.of(context); - context.getPropStore().putAll(key, Map.of(validateSystemProperty(key, property, value), value)); + context.getPropStore().putAll(key, + Map.of(validateSystemProperty(context, key, property, value), value)); } public static void modifyProperties(ServerContext context, long version, @@ -46,7 +47,7 @@ public static void modifyProperties(ServerContext context, long version, final Map checkedProperties = properties.entrySet().stream() .collect(Collectors.toMap( - entry -> validateSystemProperty(key, entry.getKey(), entry.getValue()), + entry -> validateSystemProperty(context, key, entry.getKey(), entry.getValue()), Map.Entry::getValue)); context.getPropStore().replaceAll(key, version, checkedProperties); } @@ -66,8 +67,8 @@ public static void removePropWithoutDeprecationWarning(ServerContext context, St context.getPropStore().removeProperties(SystemPropKey.of(context), List.of(property)); } - private static String validateSystemProperty(SystemPropKey key, String property, - final String value) throws IllegalArgumentException { + private static String validateSystemProperty(ServerContext context, SystemPropKey key, + String property, final String value) throws IllegalArgumentException { // Retrieve the replacement name for this property, if there is one. // Do this before we check if the name is a valid zookeeper name. final var original = property; @@ -88,7 +89,7 @@ private static String validateSystemProperty(SystemPropKey key, String property, throw iae; } if (Property.isValidTablePropertyKey(property)) { - PropUtil.validateProperties(key, Map.of(property, value)); + PropUtil.validateProperties(context, key, Map.of(property, value)); } // Find the property taking prefix into account diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java index 20e1138c362..bbaed9f4bf0 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/util/ZooInfoViewerTest.java @@ -27,6 +27,7 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.isA; import static org.easymock.EasyMock.isNull; import static org.easymock.EasyMock.newCapture; @@ -50,6 +51,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.accumulo.server.MockServerContext; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.codec.VersionedPropCodec; import org.apache.accumulo.server.conf.codec.VersionedProperties; import org.apache.accumulo.server.conf.store.NamespacePropKey; @@ -145,16 +147,22 @@ public void instanceIdOutputTest() throws Exception { .once(); expect(zk.getData(eq(ZROOT + ZINSTANCES + "/" + instanceName), isNull(), isNull())) .andReturn(uuid.getBytes(UTF_8)).once(); + context.close(); + expectLastCall().once(); + replay(context, zk); String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt"; - ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts(); - opts.parseArgs(ZooInfoViewer.class.getName(), - new String[] {"--print-instances", "--outfile", testFileName}); + class ZooInfoViewerTestClazz extends ZooInfoViewer { + @Override + ServerContext getContext(ZooInfoViewer.Opts ots) { + return context; + } + } - ZooInfoViewer viewer = new ZooInfoViewer(); - viewer.generateReport(context, opts); + ZooInfoViewer viewer = new ZooInfoViewerTestClazz(); + viewer.execute(new String[] {"--print-instances", "--outfile", testFileName}); verify(context, zk); @@ -281,6 +289,9 @@ public void propTest() throws Exception { expect(zk.getData(tBasePath + "/t" + ZTABLE_NAMESPACE, null, null)) .andReturn("+default".getBytes(UTF_8)).anyTimes(); + context.close(); + expectLastCall().once(); + replay(context, zk); NamespacePropKey nsKey = NamespacePropKey.of(iid, nsId); @@ -288,12 +299,15 @@ public void propTest() throws Exception { String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt"; - ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts(); - opts.parseArgs(ZooInfoViewer.class.getName(), - new String[] {"--print-props", "--outfile", testFileName}); + class ZooInfoViewerTestClazz extends ZooInfoViewer { + @Override + ServerContext getContext(ZooInfoViewer.Opts ots) { + return context; + } + } - ZooInfoViewer viewer = new ZooInfoViewer(); - viewer.generateReport(context, opts); + ZooInfoViewer viewer = new ZooInfoViewerTestClazz(); + viewer.execute(new String[] {"--print-props", "--outfile", testFileName}); verify(context, zk); diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/PropUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/PropUtilTest.java new file mode 100644 index 00000000000..e16fc75e450 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/PropUtilTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.util; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Map; + +import org.apache.accumulo.core.classloader.ClassLoaderUtil; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.store.TablePropKey; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class PropUtilTest { + + private static final String TEST_CONTEXT_NAME = "TestContext"; + private static final String INVALID_TEST_CONTEXT_NAME = "InvalidTestContext"; + + public static class TestContextClassLoaderFactory implements ContextClassLoaderFactory { + + @Override + public ClassLoader getClassLoader(String contextName) { + if (contextName.equals(TEST_CONTEXT_NAME)) { + return this.getClass().getClassLoader(); + } + return null; + } + + } + + @BeforeEach + public void before() { + ClassLoaderUtil.resetContextFactoryForTests(); + } + + @Test + public void testSetClasspathContextFails() { + ServerContext ctx = createMock(ServerContext.class); + AccumuloConfiguration conf = createMock(AccumuloConfiguration.class); + InstanceId iid = createMock(InstanceId.class); + TableId tid = createMock(TableId.class); + TablePropKey tkey = TablePropKey.of(iid, tid); + + expect(ctx.getConfiguration()).andReturn(conf).once(); + expect(conf.get(Property.GENERAL_CONTEXT_CLASSLOADER_FACTORY)) + .andReturn(TestContextClassLoaderFactory.class.getName()); + + replay(ctx, conf, iid, tid); + IllegalArgumentException e = + assertThrows(IllegalArgumentException.class, () -> PropUtil.validateProperties(ctx, tkey, + Map.of(Property.TABLE_CLASSLOADER_CONTEXT.getKey(), INVALID_TEST_CONTEXT_NAME))); + assertEquals("Unable to resolve classloader for context: " + INVALID_TEST_CONTEXT_NAME, + e.getMessage()); + verify(ctx, conf, iid, tid); + } + + @Test + public void testSetClasspathContext() { + ServerContext ctx = createMock(ServerContext.class); + AccumuloConfiguration conf = createMock(AccumuloConfiguration.class); + InstanceId iid = createMock(InstanceId.class); + TableId tid = createMock(TableId.class); + TablePropKey tkey = TablePropKey.of(iid, tid); + + expect(ctx.getConfiguration()).andReturn(conf).once(); + expect(conf.get(Property.GENERAL_CONTEXT_CLASSLOADER_FACTORY)) + .andReturn(TestContextClassLoaderFactory.class.getName()); + + replay(ctx, conf, iid, tid); + PropUtil.validateProperties(ctx, tkey, + Map.of(Property.TABLE_CLASSLOADER_CONTEXT.getKey(), TEST_CONTEXT_NAME)); + verify(ctx, conf, iid, tid); + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index c20f89e4c81..70e024022bd 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -64,6 +64,7 @@ import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -219,12 +220,12 @@ protected void getCoordinatorLock(HostAndPort clientAddress) new ServiceLock(getContext().getZooSession(), ServiceLock.path(lockPath), zooLockUUID); while (true) { - CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); + HAServiceLockWatcher coordinatorLockWatcher = new HAServiceLockWatcher("coordinator"); coordinatorLock.lock(coordinatorLockWatcher, new ServiceLockData(zooLockUUID, coordinatorClientAddress, ThriftService.COORDINATOR)); coordinatorLockWatcher.waitForChange(); - if (coordinatorLockWatcher.isAcquiredLock()) { + if (coordinatorLockWatcher.isLockAcquired()) { break; } if (!coordinatorLockWatcher.isFailedToAcquireLock()) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java deleted file mode 100644 index bbbe73147d7..00000000000 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.coordinator; - -import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; -import org.apache.accumulo.core.util.Halt; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CoordinatorLockWatcher implements ServiceLock.AccumuloLockWatcher { - - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class); - - private volatile boolean acquiredLock = false; - private volatile boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - LOG.debug("Acquired Coordinator lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - LOG.warn("Failed to get Coordinator lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire Coordinator lock due to incorrect ZooKeeper authentication."; - LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - LOG.info("Coordinator lock held by someone else, waiting for a change in state"); - wait(); - } catch (InterruptedException e) {} - } - } - - public boolean isAcquiredLock() { - return acquiredLock; - } - - public boolean isFailedToAcquireLock() { - return failedToAcquireLock; - } - -} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 80de4db6ed3..f93440ef521 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -75,12 +75,12 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -98,7 +98,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; @@ -270,20 +269,8 @@ protected void announceExistence(HostAndPort clientAddress) compactorLock = new ServiceLock(getContext().getZooSession(), ServiceLock.path(zPath), compactorId); - LockWatcher lw = new LockWatcher() { - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - LOG.error("Compactor lost lock (reason = {}), exiting.", reason); - getContext().getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("compactor", () -> false, + (name) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); try { for (int i = 0; i < 25; i++) { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 800872ae7a2..be415df136b 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -36,16 +36,14 @@ import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; -import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.gc.metrics.GcCycleMetrics; import org.apache.accumulo.gc.metrics.GcMetrics; @@ -336,31 +334,32 @@ boolean moveToTrash(Path path) throws IOException { private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { var path = ServiceLock.path(getContext().getZooKeeperRoot() + Constants.ZGC_LOCK); - LockWatcher lockWatcher = new LockWatcher() { - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); - } + UUID zooLockUUID = UUID.randomUUID(); + gcLock = new ServiceLock(getContext().getZooSession(), path, zooLockUUID); + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e)); + while (true) { + gcLock.lock(gcLockWatcher, + new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC)); + gcLockWatcher.waitForChange(); + + if (gcLockWatcher.isLockAcquired()) { + break; } - }; - UUID zooLockUUID = UUID.randomUUID(); - gcLock = new ServiceLock(getContext().getZooSession(), path, zooLockUUID); - while (true) { - if (gcLock.tryLock(lockWatcher, - new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC))) { - log.debug("Got GC ZooKeeper lock"); - return; + if (!gcLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("gc lock in unknown state"); } + + gcLock.tryToCancelAsyncLockOrUnlock(); + log.debug("Failed to get GC ZooKeeper lock, will retry"); - sleepUninterruptibly(1, TimeUnit.SECONDS); + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); } + + log.info("Got GC lock."); + } private HostAndPort startStatsService() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index ee9f2ed3d6e..012c509a4f5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -74,10 +74,10 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; @@ -105,7 +105,6 @@ import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -150,7 +149,6 @@ import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -1564,65 +1562,6 @@ public ServiceLock getManagerLock() { return managerLock; } - private static class ManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - log.debug("Acquired manager lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get manager lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire manager lock due to incorrect ZooKeeper authentication."; - log.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) { - // empty - } - } - } - } - private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooSession(); @@ -1638,17 +1577,17 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); while (true) { - ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher(); + HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher("manager"); managerLock.lock(managerLockWatcher, sld); managerLockWatcher.waitForChange(); - if (managerLockWatcher.acquiredLock) { + if (managerLockWatcher.isLockAcquired()) { startServiceLockVerificationThread(); break; } - if (!managerLockWatcher.failedToAcquireLock) { + if (!managerLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("manager lock in unknown state"); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index f8fdbe959c8..cb09d740956 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -54,9 +54,9 @@ import org.apache.accumulo.core.gc.thrift.GCMonitorService; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TableInfo; @@ -70,7 +70,6 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; @@ -754,17 +753,17 @@ private void getMonitorLock(HostAndPort monitorLocation) monitorLock = new ServiceLock(context.getZooSession(), monitorLockPath, zooLockUUID); while (true) { - MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); + HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher("monitor"); monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID, monitorLocation.getHost() + ":" + monitorLocation.getPort(), ThriftService.NONE)); monitorLockWatcher.waitForChange(); - if (monitorLockWatcher.acquiredLock) { + if (monitorLockWatcher.isLockAcquired()) { break; } - if (!monitorLockWatcher.failedToAcquireLock) { + if (!monitorLockWatcher.isFailedToAcquireLock()) { throw new IllegalStateException("monitor lock in unknown state"); } @@ -778,57 +777,6 @@ private void getMonitorLock(HostAndPort monitorLocation) log.info("Got Monitor lock."); } - /** - * Async Watcher for monitor lock - */ - private static class MoniterLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(-1, () -> log.error("No longer able to monitor Monitor lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get monitor lock " + e); - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) {} - } - } - } - public ManagerMonitorInfo getMmi() { return mmi; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 2cac2841a21..1d76c2596b9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -69,12 +69,12 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; @@ -90,7 +90,6 @@ import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -346,24 +345,8 @@ private ServiceLock announceExistence() { serverLockUUID = UUID.randomUUID(); scanServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, serverLockUUID); - - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("scan server", () -> serverStopRequested, + (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index ca993182b8d..7357fde2214 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -214,7 +214,21 @@ public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdura UpdateSession us = new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials), - credentials, durability); + credentials, durability) { + @Override + public boolean cleanup() { + // This is called when a client abandons a session. When this happens need to decrement + // any queued mutations. + if (queuedMutationSize > 0) { + log.trace( + "cleaning up abandoned update session, decrementing totalQueuedMutationSize by {}", + queuedMutationSize); + server.updateTotalQueuedMutationSize(-queuedMutationSize); + queuedMutationSize = 0; + } + return true; + } + }; return server.sessionManager.createSession(us, false); } @@ -223,8 +237,8 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) { return; } - if (us.currentTablet == null - && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) { + if (us.currentTablet == null && (us.failures.containsKey(keyExtent) + || us.authFailures.containsKey(keyExtent) || us.unhandledException != null)) { // if there were previous failures, then do not accept additional writes return; } @@ -293,6 +307,11 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List mutations = us.queuedMutations.get(us.currentTablet); for (TMutation tmutation : tmutations) { Mutation mutation = new ServerMutation(tmutation); + // Deserialize the mutation in an attempt to check for data corruption that happened on + // the network. This will avoid writing a corrupt mutation to the write ahead log and + // failing after its written to the write ahead log when it is deserialized to update the + // in memory map. + mutation.getUpdates(); mutations.add(mutation); additionalMutationSize += mutation.numBytes(); } @@ -312,6 +331,15 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, } } } + } catch (RuntimeException e) { + // This method is a thrift oneway method so an exception from it will not make it back to the + // client. Need to record the exception and set the session such that any future updates to + // the session are ignored. + us.unhandledException = e; + us.currentTablet = null; + + // Rethrowing it will cause logging from thrift, so not adding logging here. + throw e; } finally { if (reserved) { server.sessionManager.unreserveSession(us); @@ -423,6 +451,7 @@ private void flush(UpdateSession us) { } } catch (Exception e) { TraceUtil.setException(span2, e, true); + log.error("Error logging mutations sent from {}", TServerUtils.clientAddress.get(), e); throw e; } finally { span2.end(); @@ -450,6 +479,10 @@ private void flush(UpdateSession us) { us.commitTimes.addStat(t2 - t1); updateAvgCommitTime(t2 - t1, sendables.size()); + } catch (Exception e) { + TraceUtil.setException(span3, e, true); + log.error("Error committing mutations sent from {}", TServerUtils.clientAddress.get(), e); + throw e; } finally { span3.end(); } @@ -490,6 +523,20 @@ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDE } try { + if (us.unhandledException != null) { + // Since flush() is not being called, any memory added to the global queued mutations + // counter will not be decremented. So do that here before throwing an exception. + server.updateTotalQueuedMutationSize(-us.queuedMutationSize); + us.queuedMutationSize = 0; + // make this memory available for GC + us.queuedMutations.clear(); + + // Something unexpected happened during this write session, so throw an exception here to + // cause a TApplicationException on the client side. + throw new IllegalStateException( + "Write session " + updateID + " saw an unexpected exception", us.unhandledException); + } + // clients may or may not see data from an update session while // it is in progress, however when the update session is closed // want to ensure that reads wait for the write to finish @@ -696,6 +743,7 @@ private void writeConditionalMutations(Map { - if (!serverStopRequested) { - log.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); - - } - }; + LockWatcher lw = new ServiceLockWatcher("tablet server", () -> serverStopRequested, + (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 564f25c019d..f34e331d7bf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -67,8 +67,6 @@ public class SessionManager { private static final Logger log = LoggerFactory.getLogger(SessionManager.class); private final ConcurrentMap sessions = new ConcurrentHashMap<>(); - private final long maxIdle; - private final long maxUpdateIdle; private final BlockingQueue deferredCleanupQueue = new ArrayBlockingQueue<>(5000); private final Long expiredSessionMarker = (long) -1; private final ServerContext ctx; @@ -76,12 +74,8 @@ public class SessionManager { public SessionManager(ServerContext context) { this.ctx = context; - final AccumuloConfiguration aconf = context.getConfiguration(); - maxUpdateIdle = aconf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); - maxIdle = aconf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); - - Runnable r = () -> sweep(maxIdle, maxUpdateIdle); - + long maxIdle = ctx.getConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); + Runnable r = () -> sweep(ctx); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(r, 0, Math.max(maxIdle / 2, 1000), MILLISECONDS)); } @@ -103,7 +97,7 @@ public long createSession(Session session, boolean reserve) { } public long getMaxIdleTime() { - return maxIdle; + return ctx.getConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); } /** @@ -299,7 +293,10 @@ private void cleanup(Session session) { cleanup(deferredCleanupQueue, session); } - private void sweep(final long maxIdle, final long maxUpdateIdle) { + private void sweep(final ServerContext context) { + final AccumuloConfiguration conf = context.getConfiguration(); + final long maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); + final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); List sessionsToCleanup = new LinkedList<>(); Iterator iter = sessions.values().iterator(); while (iter.hasNext()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java index ca487294d7f..dc32765c28b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java @@ -50,6 +50,7 @@ public class UpdateSession extends Session { public long flushTime = 0; public long queuedMutationSize = 0; public final Durability durability; + public Exception unhandledException = null; public UpdateSession(TservConstraintEnv env, TCredentials credentials, Durability durability) { super(credentials); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 827c89c989a..b797c6c472c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -860,22 +860,21 @@ public void commit(CommitSession commitSession, List mutations) { totalBytes += mutation.numBytes(); } - getTabletMemory().mutate(commitSession, mutations, totalCount); - - synchronized (this) { - if (isCloseComplete()) { - throw new IllegalStateException( - "Tablet " + extent + " closed with outstanding messages to the logger"); + try { + getTabletMemory().mutate(commitSession, mutations, totalCount); + synchronized (this) { + getTabletMemory().updateMemoryUsageStats(); + if (isCloseComplete()) { + throw new IllegalStateException( + "Tablet " + extent + " closed with outstanding messages to the logger"); + } + numEntries += totalCount; + numEntriesInMemory += totalCount; + ingestCount += totalCount; + ingestBytes += totalBytes; } - // decrement here in case an exception is thrown below + } finally { decrementWritesInProgress(commitSession); - - getTabletMemory().updateMemoryUsageStats(); - - numEntries += totalCount; - numEntriesInMemory += totalCount; - ingestCount += totalCount; - ingestBytes += totalBytes; } } diff --git a/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java new file mode 100644 index 00000000000..fdab6e21425 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TMutation; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletingest.thrift.TDurability; +import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TServiceClient; +import org.junit.jupiter.api.Test; + +public class CorruptMutationIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "10"); + } + + @Test + public void testCorruptMutation() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().create(table); + try (BatchWriter writer = c.createBatchWriter(table)) { + Mutation m = new Mutation("1"); + m.put("f1", "q1", new Value("v1")); + writer.addMutation(m); + } + + var ctx = (ClientContext) c; + var tableId = ctx.getTableId(table); + var extent = new KeyExtent(tableId, null, null); + var tabletMetadata = ctx.getAmple().readTablet(extent, TabletMetadata.ColumnType.LOCATION); + var location = tabletMetadata.getLocation(); + assertNotNull(location); + assertEquals(TabletMetadata.LocationType.CURRENT, location.getType()); + + TabletIngestClientService.Iface client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, location.getHostAndPort(), ctx); + // Make the same RPC calls made by the BatchWriter, but pass a corrupt serialized mutation in + // this try block. + try { + TInfo tinfo = TraceUtil.traceInfo(); + + long sessionId = client.startUpdate(tinfo, ctx.rpcCreds(), TDurability.DEFAULT); + + // Write two valid mutations to the session. The tserver buffers data it receives via + // applyUpdates and may not write them until closeUpdate RPC is called. Because + // TSERV_TOTAL_MUTATION_QUEUE_MAX was set so small, these values should be written. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("abc", "z1"), createTMutation("def", "z2"))); + + // Simulate data corruption in the serialized mutation + TMutation badMutation = createTMutation("ghi", "z3"); + badMutation.entries = -42; + + // Write some good and bad mutations to the session. The server side will see an error here, + // however since this is a thrift oneway method no exception is expected here. This should + // leave the session in a broken state where it no longer accepts any new data. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("jkl", "z4"), badMutation, createTMutation("mno", "z5"))); + + // Write two more valid mutations to the session, these should be dropped on the server side + // because of the previous error. So should never see these updates. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("pqr", "z6"), createTMutation("stu", "z7"))); + + // Since client.applyUpdates experienced an error, should see an error when closing the + // session. + assertThrows(TApplicationException.class, () -> client.closeUpdate(tinfo, sessionId)); + } finally { + ThriftUtil.returnClient((TServiceClient) client, ctx); + } + + // The values that a scan must see + Set expectedValues = Set.of("v1", "v2", "z1", "z2"); + + // The failed mutation should not have left the tablet in a bad state. Do some follow-on + // actions to ensure the tablet is still functional. + try (BatchWriter writer = c.createBatchWriter(table)) { + Mutation m = new Mutation("2"); + m.put("f1", "q1", new Value("v2")); + writer.addMutation(m); + } + + try (Scanner scanner = c.createScanner(table)) { + var valuesSeen = + scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet()); + assertEquals(expectedValues, valuesSeen); + } + + c.tableOperations().flush(table, null, null, true); + + try (Scanner scanner = c.createScanner(table)) { + var valuesSeen = + scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet()); + assertEquals(expectedValues, valuesSeen); + } + } + } + + private static TMutation createTMutation(String row, String value) { + Mutation m = new Mutation(row); + m.put("x", "y", value); + return m.toThrift(); + } +}