diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 0adccd24bde..821574dea30 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -848,12 +848,7 @@ public AuthenticationToken token() { public synchronized void close() { closed = true; if (zooKeeperOpened.get()) { - try { - zooKeeper.get().close(); - } catch (InterruptedException e) { - // reset the interrupt flag and continue closing - Thread.currentThread().interrupt(); - } + ZooUtil.close(zooKeeper.get()); } if (thriftTransportPool != null) { thriftTransportPool.shutdown(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java index 9ab9dadc7e4..ce10bb23efb 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.clientImpl; +import static com.google.common.base.Suppliers.memoize; import static java.util.Objects.requireNonNull; import java.io.FileInputStream; @@ -44,19 +45,30 @@ public class ClientInfoImpl implements ClientInfo { - private final Supplier instanceIdSupplier; private final Properties properties; + + // suppliers for lazily loading private final Supplier tokenSupplier; - private final Configuration hadoopConf; + private final Supplier hadoopConf; + private final Supplier instanceId; + private final Supplier zkSupplier; public ClientInfoImpl(Properties properties, Optional tokenOpt) { this.properties = requireNonNull(properties); // convert the optional to a supplier to delay retrieval from the properties unless needed this.tokenSupplier = requireNonNull(tokenOpt).map(Suppliers::ofInstance) - .orElse(Suppliers.memoize(() -> ClientProperty.getAuthenticationToken(properties))); - this.hadoopConf = new Configuration(); - this.instanceIdSupplier = Suppliers.memoize(() -> ZooUtil.getInstanceID(getZooKeepers(), - getZooKeepersSessionTimeOut(), getInstanceName())); + .orElse(memoize(() -> ClientProperty.getAuthenticationToken(properties))); + this.hadoopConf = memoize(Configuration::new); + this.zkSupplier = () -> ZooUtil.connect(getClass().getSimpleName(), getZooKeepers(), + getZooKeepersSessionTimeOut(), null); + this.instanceId = memoize(() -> { + var zk = getZooKeeperSupplier().get(); + try { + return ZooUtil.getInstanceId(zk, getInstanceName()); + } finally { + ZooUtil.close(zk); + } + }); } @Override @@ -66,13 +78,12 @@ public String getInstanceName() { @Override public InstanceId getInstanceId() { - return instanceIdSupplier.get(); + return instanceId.get(); } @Override public Supplier getZooKeeperSupplier() { - return () -> ZooUtil.connect(ClientInfo.class.getSimpleName(), getZooKeepers(), - getZooKeepersSessionTimeOut(), null); + return zkSupplier; } @Override @@ -144,6 +155,6 @@ public static Properties toProperties(URL propertiesURL) { @Override public Configuration getHadoopConf() { - return this.hadoopConf; + return hadoopConf.get(); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index 0f3272e22e1..f472dc168cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; @@ -217,8 +216,8 @@ public static void digestAuth(ZooKeeper zoo, String secret) { } /** - * Construct a new ZooKeeper client, retrying if it doesn't work right away. The caller is - * responsible for closing instances returned from this method. + * Construct a new ZooKeeper client, retrying indefinitely if it doesn't work right away. The + * caller is responsible for closing instances returned from this method. * * @param clientName a convenient name for logging its connection state changes * @param conf a convenient carrier of ZK connection information using Accumulo properties @@ -230,8 +229,8 @@ public static ZooKeeper connect(String clientName, AccumuloConfiguration conf) { } /** - * Construct a new ZooKeeper client, retrying if it doesn't work right away. The caller is - * responsible for closing instances returned from this method. + * Construct a new ZooKeeper client, retrying indefinitely if it doesn't work right away. The + * caller is responsible for closing instances returned from this method. * * @param clientName a convenient name for logging its connection state changes * @param connectString in the form of host1:port1,host2:port2/chroot/path @@ -280,11 +279,12 @@ public static ZooKeeper connect(String clientName, String connectString, int tim if (tryAgain && zk != null) { try { zk.close(); - zk = null; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("interrupted", e); + throw new AssertionError( + "ZooKeeper.close() shouldn't throw this; it exists only for backwards compatibility", + e); } + zk = null; } } @@ -314,94 +314,88 @@ public static ZooKeeper connect(String clientName, String connectString, int tim return zk; } + public static void close(ZooKeeper zk) { + try { + if (zk != null) { + zk.close(); + } + } catch (InterruptedException e) { + throw new AssertionError( + "ZooKeeper.close() shouldn't throw this; it exists only for backwards compatibility", e); + } + } + /** * Given a zooCache and instanceId, look up the instance name. */ - public static String getInstanceName(String zooKeepers, int zkSessionTimeout, - InstanceId instanceId) { - requireNonNull(zooKeepers); + public static String getInstanceName(ZooKeeper zk, InstanceId instanceId) { + requireNonNull(zk); var instanceIdBytes = requireNonNull(instanceId).canonical().getBytes(UTF_8); - try (var zk = connect("ZooUtil.getInstanceName", zooKeepers, zkSessionTimeout, null)) { - for (String name : zk.getChildren(Constants.ZROOT + Constants.ZINSTANCES, false)) { - var bytes = zk.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + name, false, null); - if (Arrays.equals(bytes, instanceIdBytes)) { - return name; - } + for (String name : getInstanceNames(zk)) { + var bytes = getInstanceIdBytesFromName(zk, name); + if (Arrays.equals(bytes, instanceIdBytes)) { + return name; } + } + return null; + } + + private static List getInstanceNames(ZooKeeper zk) { + try { + return new ZooReader(zk).getChildren(Constants.ZROOT + Constants.ZINSTANCES); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IllegalStateException( - "Interrupted reading instance name for the instanceId " + instanceId, e); + throw new IllegalStateException("Interrupted reading instance names from ZooKeeper", e); } catch (KeeperException e) { - throw new IllegalStateException( - "Unable to get instance name for the instanceId " + instanceId, e); + throw new IllegalStateException("Failed to read instance names from ZooKeeper", e); } - return null; } - /** - * Read the instance names and instance ids from ZooKeeper. The storage structure in ZooKeeper is: - * - *
-   *   /accumulo/instances/instance_name  - with the instance id stored as data.
-   * 
- * - * @return a map of (instance name, instance id) entries - */ - public static Map readInstancesFromZk(final ZooReader zooReader) { - // TODO clean up - Map idMap = new TreeMap<>(); + private static byte[] getInstanceIdBytesFromName(ZooKeeper zk, String name) { try { - List names = zooReader.getChildren(Constants.ZROOT + Constants.ZINSTANCES); - names.forEach(name -> { - try { - byte[] uuid = zooReader.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + name); - idMap.put(name, InstanceId.of(UUID.fromString(new String(uuid, UTF_8)))); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted reading instance id from ZooKeeper", ex); - } catch (KeeperException ex) { - log.warn("Failed to read instance id for " + ex.getPath()); - } - }); - } catch (InterruptedException ex) { + return new ZooReader(zk) + .getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + requireNonNull(name)); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted reading instance name info from ZooKeeper", ex); - } catch (KeeperException ex) { - throw new IllegalStateException("Failed to read instance name info from ZooKeeper", ex); + throw new IllegalStateException( + "Interrupted reading InstanceId from ZooKeeper for instance named " + name, e); + } catch (KeeperException e) { + log.warn("Failed to read InstanceId from ZooKeeper for instance named {}", name, e); + return null; } - return idMap; } - /** - * Returns a unique string that identifies this instance of accumulo. - */ - public static InstanceId getInstanceID(String zooKeepers, int zkSessionTimeout, - String instanceName) { - requireNonNull(zooKeepers); - requireNonNull(instanceName); - // lookup by name - String instanceIdString = null; - try (var zk = connect("ZooUtil.getInstanceID", zooKeepers, zkSessionTimeout, null)) { - String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; - byte[] data = zk.getData(instanceNamePath, false, null); - if (data == null) { - throw new IllegalStateException( - "Instance name " + instanceName + " does not exist in zookeeper. " - + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); + public static Map getInstanceMap(ZooKeeper zk) { + Map idMap = new TreeMap<>(); + getInstanceNames(zk).forEach(name -> { + byte[] instanceId = getInstanceIdBytesFromName(zk, name); + if (instanceId != null) { + idMap.put(name, InstanceId.of(new String(instanceId, UTF_8))); } - instanceIdString = new String(data, UTF_8); - // verify that the instanceId found via the instanceName actually exists as an instance - if (zk.getData(Constants.ZROOT + "/" + instanceIdString, false, null) == null) { - throw new IllegalStateException("Instance id " + instanceIdString - + " pointed to by the name " + instanceName + " does not exist in zookeeper"); + }); + return idMap; + } + + public static InstanceId getInstanceId(ZooKeeper zk, String name) { + byte[] data = getInstanceIdBytesFromName(zk, name); + if (data == null) { + throw new IllegalStateException("Instance name " + name + " does not exist in ZooKeeper. " + + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); + } + String instanceIdString = new String(data, UTF_8); + try { + // verify that the instanceId found via the name actually exists + if (new ZooReader(zk).getData(Constants.ZROOT + "/" + instanceIdString) == null) { + throw new IllegalStateException("InstanceId " + instanceIdString + + " pointed to by the name " + name + " does not exist in ZooKeeper"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted reading instance id from ZooKeeper", e); + throw new IllegalStateException("Interrupted verifying InstanceId " + instanceIdString + + " pointed to by instance named " + name + " actually exists in ZooKeeper", e); } catch (KeeperException e) { - throw new IllegalStateException( - "Unable to get instanceId for the instance name " + instanceName, e); + throw new IllegalStateException("Failed to verify InstanceId " + instanceIdString + + " pointed to by instance named " + name + " actually exists in ZooKeeper", e); } return InstanceId.of(instanceIdString); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooUtilTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooUtilTest.java index b5717e46c56..27a8bcb25a2 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooUtilTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooUtilTest.java @@ -23,6 +23,7 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.isNull; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -108,20 +109,20 @@ public void fetchInstancesFromZk() throws Exception { String instBName = "INST_B"; InstanceId instB = InstanceId.of(UUID.randomUUID()); - ZooReader zooReader = createMock(ZooReader.class); + ZooKeeper zk = createMock(ZooKeeper.class); String namePath = Constants.ZROOT + Constants.ZINSTANCES; - expect(zooReader.getChildren(eq(namePath))).andReturn(List.of(instAName, instBName)).once(); - expect(zooReader.getData(eq(namePath + "/" + instAName))) + expect(zk.getChildren(eq(namePath), isNull())).andReturn(List.of(instAName, instBName)).once(); + expect(zk.getData(eq(namePath + "/" + instAName), isNull(), isNull())) .andReturn(instA.canonical().getBytes(UTF_8)).once(); - expect(zooReader.getData(eq(namePath + "/" + instBName))) + expect(zk.getData(eq(namePath + "/" + instBName), isNull(), isNull())) .andReturn(instB.canonical().getBytes(UTF_8)).once(); - replay(zooReader); + replay(zk); - Map instanceMap = ZooUtil.readInstancesFromZk(zooReader); + Map instanceMap = ZooUtil.getInstanceMap(zk); log.trace("id map returned: {}", instanceMap); assertEquals(Map.of(instAName, instA, instBName, instB), instanceMap); - verify(zooReader); + verify(zk); } } diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 94c84efd791..dabbf632d2c 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -46,14 +46,12 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -74,6 +72,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; @@ -101,9 +100,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -643,74 +640,48 @@ private void verifyUp() throws InterruptedException, IOException { waitForProcessStart(tsp, "TabletServer" + tsExpectedCount); } - try (var zk = new ZooKeeper(getZooKeepers(), 60000, event -> log.warn("{}", event))) { - - String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET); - - while (!zk.getState().isConnected()) { - log.info("Waiting for ZK client to connect, state: {} - will retry", zk.getState()); - Thread.sleep(1000); - } - - String instanceId = null; + String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET); + String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + getInstanceName(); + try (var zk = ZooUtil.connect(MiniAccumuloClusterImpl.class.getSimpleName() + ".verifyUp()", + getZooKeepers(), 60000, secret)) { + var rdr = new ZooReader(zk); + InstanceId instanceId = null; for (int i = 0; i < numTries; i++) { - if (zk.getState().isConnected()) { - ZooUtil.digestAuth(zk, secret); - try { - final AtomicInteger rc = new AtomicInteger(); - final CountDownLatch waiter = new CountDownLatch(1); - zk.sync("/", (code, arg1, arg2) -> { - rc.set(code); - waiter.countDown(); - }, null); - waiter.await(); - Code code = Code.get(rc.get()); - if (code != Code.OK) { - throw KeeperException.create(code); - } - String instanceNamePath = - Constants.ZROOT + Constants.ZINSTANCES + "/" + config.getInstanceName(); - byte[] bytes = zk.getData(instanceNamePath, null, null); - instanceId = new String(bytes, UTF_8); - break; - } catch (KeeperException e) { - log.warn("Error trying to read instance id from zookeeper: " + e.getMessage()); - log.debug("Unable to read instance id from zookeeper.", e); - } - } else { - log.warn("ZK client not connected, state: {}", zk.getState()); + try { + // make sure it's up enough we can perform operations successfully + rdr.sync("/"); + // wait for the instance to be created + instanceId = InstanceId.of(new String(rdr.getData(instanceNamePath), UTF_8)); + break; + } catch (KeeperException e) { + log.warn("Error trying to read instance id from zookeeper: {}", e.getMessage()); + log.debug("Unable to read instance id from zookeeper.", e); } Thread.sleep(1000); } if (instanceId == null) { - for (int i = 0; i < numTries; i++) { - if (zk.getState().isConnected()) { - ZooUtil.digestAuth(zk, secret); - try { - log.warn("******* COULD NOT FIND INSTANCE ID - DUMPING ZK ************"); - log.warn("Connected to ZooKeeper: {}", getZooKeepers()); - log.warn("Looking for instanceId at {}", - Constants.ZROOT + Constants.ZINSTANCES + "/" + config.getInstanceName()); - ZKUtil.visitSubTreeDFS(zk, Constants.ZROOT, false, - (rc, path, ctx, name) -> log.warn("{}", path)); - log.warn("******* END ZK DUMP ************"); - } catch (KeeperException | InterruptedException e) { - log.error("Error dumping zk", e); - } - } - Thread.sleep(1000); + try { + log.warn("******* COULD NOT FIND INSTANCE ID - DUMPING ZK ************"); + log.warn("Connected to ZooKeeper: {}", getZooKeepers()); + log.warn("Looking for instanceId at {}", instanceNamePath); + ZKUtil.visitSubTreeDFS(zk, Constants.ZROOT, false, + (rc, path, ctx, name) -> log.warn("{}", path)); + log.warn("******* END ZK DUMP ************"); + } catch (KeeperException e) { + log.error("Error dumping zk", e); } throw new IllegalStateException("Unable to find instance id from zookeeper."); } - String rootPath = Constants.ZROOT + "/" + instanceId; + String rootPath = ZooUtil.getRoot(instanceId); int tsActualCount = 0; try { while (tsActualCount < tsExpectedCount) { tsActualCount = 0; - for (String child : zk.getChildren(rootPath + Constants.ZTSERVERS, null)) { - if (zk.getChildren(rootPath + Constants.ZTSERVERS + "/" + child, null).isEmpty()) { + String tserverPath = rootPath + Constants.ZTSERVERS; + for (String child : rdr.getChildren(tserverPath)) { + if (rdr.getChildren(tserverPath + "/" + child).isEmpty()) { log.info("TServer " + tsActualCount + " not yet present in ZooKeeper"); } else { tsActualCount++; @@ -724,7 +695,7 @@ private void verifyUp() throws InterruptedException, IOException { } try { - while (zk.getChildren(rootPath + Constants.ZMANAGER_LOCK, null).isEmpty()) { + while (rdr.getChildren(rootPath + Constants.ZMANAGER_LOCK).isEmpty()) { log.info("Manager not yet present in ZooKeeper"); Thread.sleep(500); } @@ -733,7 +704,7 @@ private void verifyUp() throws InterruptedException, IOException { } try { - while (zk.getChildren(rootPath + Constants.ZGC_LOCK, null).isEmpty()) { + while (rdr.getChildren(rootPath + Constants.ZGC_LOCK).isEmpty()) { log.info("GC not yet present in ZooKeeper"); Thread.sleep(500); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java index 1e8c5a429cb..f461a964be8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java @@ -109,7 +109,7 @@ public class ServerContext extends ClientContext { private final Supplier metricsInfoSupplier; public ServerContext(SiteConfiguration siteConfig) { - this(ServerInfo.fromHdfsAndZooKeeper(siteConfig)); + this(ServerInfo.fromServerConfig(siteConfig)); } private ServerContext(ServerInfo info) { @@ -148,13 +148,13 @@ public static ServerContext initialize(SiteConfiguration siteConfig, String inst * from the client configuration, and the instanceId is looked up in ZooKeeper from the name. */ public static ServerContext withClientInfo(SiteConfiguration siteConfig, ClientInfo info) { - return new ServerContext(ServerInfo.forUtilities(siteConfig, info)); + return new ServerContext(ServerInfo.fromServerAndClientConfig(siteConfig, info)); } /** * Override properties for testing */ - public static ServerContext overrideForTesting(SiteConfiguration siteConfig, String instanceName, + public static ServerContext forTesting(SiteConfiguration siteConfig, String instanceName, String zooKeepers, int zkSessionTimeOut) { return new ServerContext( ServerInfo.forTesting(siteConfig, instanceName, zooKeepers, zkSessionTimeOut)); diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java index f5de7ad42f4..301b6aef1a7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java @@ -18,14 +18,15 @@ */ package org.apache.accumulo.server; +import static com.google.common.base.Suppliers.memoize; import static java.util.Objects.requireNonNull; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Optional; -import java.util.OptionalInt; import java.util.Properties; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.ToIntFunction; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.clientImpl.ClientConfConverter; @@ -46,77 +47,108 @@ public class ServerInfo implements ClientInfo { - private final SiteConfiguration siteConfig; - private final Configuration hadoopConf; - private final InstanceId instanceID; - private final String instanceName; - private final String zooKeepers; - private final int zooKeepersSessionTimeOut; - private final VolumeManager volumeManager; - private final ServerDirs serverDirs; - private final Credentials credentials; - - static ServerInfo fromHdfsAndZooKeeper(SiteConfiguration siteConfig) { - // pass in enough information in the site configuration to connect to HDFS and ZooKeeper - // then get the instanceId from HDFS, and use it to look up the instanceName from ZooKeeper - return new ServerInfo(siteConfig, Optional.empty(), Optional.empty(), Optional.empty(), - OptionalInt.empty()); - } - - static ServerInfo forUtilities(SiteConfiguration siteConfig, ClientInfo info) { - // get everything from the ClientInfo, which itself would look up the instanceId from the - // configured name - return new ServerInfo(siteConfig, Optional.of(info.getInstanceName()), - Optional.of(info.getInstanceId()), Optional.of(info.getZooKeepers()), - OptionalInt.of(info.getZooKeepersSessionTimeOut())); - } - + private static final Function GET_ZK_HOSTS_FROM_CONFIG = + si -> si.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST); + + private static final ToIntFunction GET_ZK_TIMEOUT_FROM_CONFIG = + si -> (int) si.getSiteConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + + // set things up using the config file, the instanceId from HDFS, and ZK for the instanceName + static ServerInfo fromServerConfig(SiteConfiguration siteConfig) { + final Function instanceNameFromZk = si -> { + var zk = si.getZooKeeperSupplier().get(); + try { + return ZooUtil.getInstanceName(zk, si.getInstanceId()); + } finally { + ZooUtil.close(zk); + } + }; + final Function instanceIdFromHdfs = si -> VolumeManager.getInstanceIDFromHdfs( + si.getServerDirs().getInstanceIdLocation(si.getVolumeManager().getFirst()), + si.getHadoopConf()); + return new ServerInfo(siteConfig, GET_ZK_HOSTS_FROM_CONFIG, GET_ZK_TIMEOUT_FROM_CONFIG, + instanceNameFromZk, instanceIdFromHdfs); + } + + // set things up using a provided instanceName and InstanceId to initialize the system, but still + // have a ServerContext that is functional without bootstrapping issues, so long as you don't call + // functions from it that require an instance to have already been initialized static ServerInfo initialize(SiteConfiguration siteConfig, String instanceName, InstanceId instanceId) { - // get the ZK hosts and timeout from the site config, but pass the name and ID directly - return new ServerInfo(siteConfig, Optional.of(instanceName), Optional.of(instanceId), - Optional.empty(), OptionalInt.empty()); + requireNonNull(instanceName); + requireNonNull(instanceId); + return new ServerInfo(siteConfig, GET_ZK_HOSTS_FROM_CONFIG, GET_ZK_TIMEOUT_FROM_CONFIG, + si -> instanceName, si -> instanceId); + } + + // set things up using the config file, and the client config for a server-side CLI utility + static ServerInfo fromServerAndClientConfig(SiteConfiguration siteConfig, ClientInfo info) { + // ClientInfo.getInstanceId looks up the ID in ZK using the provided instance name + return new ServerInfo(siteConfig, si -> info.getZooKeepers(), + si -> info.getZooKeepersSessionTimeOut(), si -> info.getInstanceName(), + si -> info.getInstanceId()); } static ServerInfo forTesting(SiteConfiguration siteConfig, String instanceName, String zooKeepers, int zooKeepersSessionTimeOut) { - return new ServerInfo(siteConfig, Optional.ofNullable(instanceName), Optional.empty(), - Optional.ofNullable(zooKeepers), OptionalInt.of(zooKeepersSessionTimeOut)); + var props = new Properties(); + props.put(ClientProperty.INSTANCE_NAME, requireNonNull(instanceName)); + props.put(ClientProperty.INSTANCE_ZOOKEEPERS, requireNonNull(zooKeepers)); + props.put(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, zooKeepersSessionTimeOut); + return fromServerAndClientConfig(siteConfig, ClientInfo.from(props)); } - private ServerInfo(SiteConfiguration siteConfig, Optional instanceNameOpt, - Optional instanceIdOpt, Optional zkHostsOpt, - OptionalInt zkSessionTimeoutOpt) { + // required parameter + private final SiteConfiguration siteConfig; + + // suppliers for lazily loading + private final Supplier hadoopConf; + private final Supplier volumeManager; + private final Supplier serverDirs; + private final Supplier zooKeepers; + private final Supplier zooKeepersSessionTimeOut; // can't memoize IntSupplier + private final Supplier instanceId; + private final Supplier instanceName; + private final Supplier credentials; + private final Supplier zkSupplier; + + // set up everything to be lazily loaded with memoized suppliers, so if nothing is used, the cost + // is low; to support different scenarios, plug in the functionality to retrieve certain items + // from ZooKeeper, HDFS, or from the input, using functions that take "this" and emit the desired + // object to be memoized on demand; these functions should not have cyclic dependencies on one + // another, but because things are lazily loaded, it is okay if one depends on another in one + // direction only + private ServerInfo(SiteConfiguration siteConfig, Function zkHostsFunction, + ToIntFunction zkTimeoutFunction, Function instanceNameFunction, + Function instanceIdFunction) { SingletonManager.setMode(Mode.SERVER); this.siteConfig = requireNonNull(siteConfig); - requireNonNull(instanceNameOpt); - requireNonNull(instanceIdOpt); - requireNonNull(zkHostsOpt); - requireNonNull(zkSessionTimeoutOpt); - - this.hadoopConf = new Configuration(); - try { - this.volumeManager = VolumeManagerImpl.get(siteConfig, hadoopConf); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - this.serverDirs = new ServerDirs(siteConfig, hadoopConf); - - this.zooKeepers = zkHostsOpt.orElseGet(() -> siteConfig.get(Property.INSTANCE_ZK_HOST)); - this.zooKeepersSessionTimeOut = zkSessionTimeoutOpt - .orElseGet(() -> (int) siteConfig.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)); - - // if not provided, look up the instanceId from ZK if the name was provided, HDFS otherwise - this.instanceID = instanceIdOpt.orElseGet(() -> instanceNameOpt.isPresent() - ? ZooUtil.getInstanceID(this.zooKeepers, this.zooKeepersSessionTimeOut, - instanceNameOpt.orElseThrow()) - : VolumeManager.getInstanceIDFromHdfs( - serverDirs.getInstanceIdLocation(volumeManager.getFirst()), hadoopConf)); - // if not provided, look up the instanceName from ZooKeeper, using the instanceId - this.instanceName = instanceNameOpt.orElseGet(() -> ZooUtil.getInstanceName(this.zooKeepers, - this.zooKeepersSessionTimeOut, this.instanceID)); - - this.credentials = SystemCredentials.get(instanceID, siteConfig); + requireNonNull(zkHostsFunction); + requireNonNull(zkTimeoutFunction); + requireNonNull(instanceNameFunction); + requireNonNull(instanceIdFunction); + + this.hadoopConf = memoize(Configuration::new); + this.volumeManager = memoize(() -> { + try { + return VolumeManagerImpl.get(getSiteConfiguration(), getHadoopConf()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + this.serverDirs = memoize(() -> new ServerDirs(getSiteConfiguration(), getHadoopConf())); + this.credentials = + memoize(() -> SystemCredentials.get(getInstanceId(), getSiteConfiguration())); + + this.zkSupplier = () -> ZooUtil.connect(getClass().getSimpleName(), getZooKeepers(), + getZooKeepersSessionTimeOut(), getSiteConfiguration().get(Property.INSTANCE_SECRET)); + + // from here on, set up the suppliers based on what was passed in, to support different cases + this.zooKeepers = memoize(() -> zkHostsFunction.apply(this)); + this.zooKeepersSessionTimeOut = memoize(() -> zkTimeoutFunction.applyAsInt(this)); + this.instanceId = memoize(() -> instanceIdFunction.apply(this)); + this.instanceName = memoize(() -> instanceNameFunction.apply(this)); } public SiteConfiguration getSiteConfiguration() { @@ -124,28 +156,27 @@ public SiteConfiguration getSiteConfiguration() { } public VolumeManager getVolumeManager() { - return volumeManager; + return volumeManager.get(); } @Override public InstanceId getInstanceId() { - return instanceID; + return instanceId.get(); } @Override public Supplier getZooKeeperSupplier() { - return () -> ZooUtil.connect(getClass().getSimpleName(), getZooKeepers(), - getZooKeepersSessionTimeOut(), getSiteConfiguration().get(Property.INSTANCE_SECRET)); + return zkSupplier; } @Override public String getZooKeepers() { - return zooKeepers; + return zooKeepers.get(); } @Override public int getZooKeepersSessionTimeOut() { - return zooKeepersSessionTimeOut; + return zooKeepersSessionTimeOut.get(); } @Override @@ -177,20 +208,20 @@ public Properties getProperties() { @Override public String getInstanceName() { - return instanceName; + return instanceName.get(); } public Credentials getCredentials() { - return credentials; + return credentials.get(); } @Override public Configuration getHadoopConf() { - return this.hadoopConf; + return hadoopConf.get(); } public ServerDirs getServerDirs() { - return serverDirs; + return serverDirs.get(); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java index aae151d8526..a6752c6c4b9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooInfoViewer.java @@ -137,7 +137,7 @@ void generateReport(final ServerContext context, final ZooInfoViewer.Opts opts) writer.println("Report Time: " + tsFormat.format(Instant.now())); writer.println("-----------------------------------------------"); if (opts.printInstanceIds) { - Map instanceMap = ZooUtil.readInstancesFromZk(context.getZooReader()); + Map instanceMap = ZooUtil.getInstanceMap(context.getZooKeeper()); printInstanceIds(instanceMap, writer); } @@ -228,7 +228,7 @@ private void printAcls(final ServerContext context, final Opts opts, final Print writer.printf("ZooKeeper acls for instance ID: %s\n\n", iid.canonical()); var conf = opts.getSiteConfiguration(); - try (var zk = ZooUtil.connect(ZooInfoViewer.class.getSimpleName(), conf)) { + try (var zk = ZooUtil.connect(getClass().getSimpleName(), conf)) { String instanceRoot = ZooUtil.getRoot(iid); diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java index 5fd2bd65f8e..394ca94a927 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java @@ -84,7 +84,7 @@ public void execute(String[] args) throws Exception { opts.parseArgs(ZooPropEditor.class.getName(), args); var siteConfig = opts.getSiteConfiguration(); - try (var zk = ZooUtil.connect(ZooPropEditor.class.getSimpleName(), siteConfig)) { + try (var zk = ZooUtil.connect(getClass().getSimpleName(), siteConfig)) { var zrw = new ZooReaderWriter(zk); try (ServerContext context = new ServerContext(siteConfig)) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 8921c0d6f11..6a3dd8c59fa 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -526,7 +526,7 @@ public void execute(final String[] args) throws InterruptedException { Opts opts = new Opts(); opts.parseArgs("accumulo init", args); var siteConfig = SiteConfiguration.auto(); - try (var zk = ZooUtil.connect(Initialize.class.getSimpleName(), siteConfig)) { + try (var zk = ZooUtil.connect(getClass().getSimpleName(), siteConfig)) { var zrw = new ZooReaderWriter(zk); SecurityUtil.serverLogin(siteConfig); Configuration hadoopConfig = new Configuration(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java b/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java index 2ab12dfedb1..c5d0f571f40 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java @@ -74,7 +74,7 @@ public void execute(String[] args) throws KeeperException, InterruptedException PrintStream out = System.out; var conf = opts.getSiteConfiguration(); - try (var zk = ZooUtil.connect(DumpZookeeper.class.getSimpleName(), conf)) { + try (var zk = ZooUtil.connect(getClass().getSimpleName(), conf)) { zrw = new ZooReaderWriter(zk); if (opts.xml) { writeXml(out, opts.root); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index 73c6edc8c28..011be0d290f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -96,7 +96,7 @@ public void execute(String[] args) throws Exception { } var siteConf = SiteConfiguration.auto(); - try (var zk = ZooUtil.connect(ZooZap.class.getSimpleName(), siteConf)) { + try (var zk = ZooUtil.connect(getClass().getSimpleName(), siteConf)) { // Login as the server on secure HDFS if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { SecurityUtil.serverLogin(siteConf); diff --git a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java index 119988dac0a..7553e77ce23 100644 --- a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java +++ b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java @@ -521,17 +521,15 @@ public boolean holdsLock() { @Override public void run() { - try { - try (ZooKeeperWrapper zk = testZk.newClient(ZooKeeperWrapper::new)) { - ServiceLock zl = getZooLock(zk, parent, uuid); - getLockLatch.countDown(); // signal we are done - getLockLatch.await(); // wait for others to finish - zl.lock(lockWatcher, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, - ServiceDescriptor.DEFAULT_GROUP_NAME)); // race to the lock - lockCompletedLatch.countDown(); - unlockLatch.await(); - zl.unlock(); - } + try (ZooKeeperWrapper zk = testZk.newClient(ZooKeeperWrapper::new)) { + ServiceLock zl = getZooLock(zk, parent, uuid); + getLockLatch.countDown(); // signal we are done + getLockLatch.await(); // wait for others to finish + zl.lock(lockWatcher, new ServiceLockData(UUID.randomUUID(), "test1", ThriftService.TSERV, + ServiceDescriptor.DEFAULT_GROUP_NAME)); // race to the lock + lockCompletedLatch.countDown(); + unlockLatch.await(); + zl.unlock(); } catch (Exception e) { LOG.error("Error in LockWorker.run() for {}", uuid, e); ex = e; diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index a09a49a4826..d36c9919fce 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -314,7 +314,7 @@ public static void main(String[] args) throws Exception { int zkTimeOut = (int) DefaultConfiguration.getInstance().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); var siteConfig = SiteConfiguration.auto(); - var context = ServerContext.overrideForTesting(siteConfig, opts.iname, opts.keepers, zkTimeOut); + var context = ServerContext.forTesting(siteConfig, opts.iname, opts.keepers, zkTimeOut); ClientServiceHandler csh = new ClientServiceHandler(context, new TransactionWatcher(context)); NullTServerTabletClientHandler tch = new NullTServerTabletClientHandler();