Skip to content

Commit

Permalink
Clean up ZooKeeper tooling (apache#5192)
Browse files Browse the repository at this point in the history
* Replace existing ZooSession with a new one that is a facade for
  ZooKeeper. The methods from ZooKeeper we need are placed on it and
  internally it maintains a delegate ZooKeeper instance that handles
  automatically creating a new ZooKeeper session (client object)
  when the old session has died.
  This enables us to maintain fewer ZooKeeper objects by keeping only
  one at a time per ClientContext/ServerContext, and to reduce
  complexity substantially, where it was hard to reason about
  which ZooKeeper session we were using at any given moment. This
  no longer requires passing around ZooKeeper objects via ZooReader
  that called to the old ZooSession to construct a ZooKeeper session
  on demand. The new ZooSession is now a singleton field whose lifecycle
  is part of the context, and ZooReader/ZooReaderWriter are
  substantially simplified.
* Lazily construct objects in ServerInfo/ClientInfoImpl to simplify the
  implementation for the various use cases, and to ensure that we don't
  create more objects than needed.
* Improve debugging information to tie the ZooSession instances with
  their purpose (for example, for use with ServerContext, or for use
  with ClientContext, with the user's name)
* Get rid of ZooCacheFactory in favor of a lazily constructed ZooCache
  instance in the ClientContext, to make it more clear when a ZooCache
  is being shared or reused, and to remove a static singleton
* Move instanceId and instanceName lookup logic into ZooUtil
* Make ZookeeperLockChecker use its own ZooSession, because it is still
  a static singleton and must continue to operate after the context that
  constructed it is closed (should be fixed when apache#2301 is done)
* Perform some minor improvements to ZooCache to simplify its
  constructors now that it uses ZooSession, and to change the type of
  external watchers to Consumer, so they aren't as easily confused with
  actual ZooKeeper watchers
* Improve a lot of ZooKeeper related test code

Potential future work after this could include:

* Roll ZooReader and ZooReaderWriter functions directly into ZooSession
* Add support to ZooSession for more ZooKeeper APIs
* Handle KeeperException thrown from delegate that signals the session
  is disconnected, instead of relying only on the verifyConnected()
  method in ZooSession to update the delegate
* Handle InterruptedExceptions directly in ZooSession, so they don't
  propagate everywhere in the code in ways that are inconvenient to
  handle

This fixes apache#5124, apache#2299, and apache#2298
  • Loading branch information
ctubbsii authored Jan 10, 2025
1 parent e347e8b commit be22dee
Show file tree
Hide file tree
Showing 142 changed files with 2,040 additions and 2,425 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
Expand All @@ -44,6 +43,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,8 +79,6 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
Expand All @@ -106,12 +104,11 @@
import org.apache.accumulo.core.util.tables.TableZooHelper;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Suppliers;

/**
* This class represents any essential configuration and credentials needed to initiate RPC
* operations throughout the code. It is intended to represent a shared object that contains these
Expand All @@ -126,14 +123,12 @@ public class ClientContext implements AccumuloClient {
private static final Logger log = LoggerFactory.getLogger(ClientContext.class);

private final ClientInfo info;
private InstanceId instanceId;
private final ZooReader zooReader;
private final ZooCache zooCache;
private final Supplier<ZooCache> zooCache;

private Credentials creds;
private BatchWriterConfig batchWriterConfig;
private ConditionalWriterConfig conditionalWriterConfig;
private final AccumuloConfiguration serverConf;
private final AccumuloConfiguration accumuloConf;
private final Configuration hadoopConf;

// These fields are very frequently accessed (each time a connection is created) and expensive to
Expand All @@ -158,6 +153,9 @@ public class ClientContext implements AccumuloClient {
private ThreadPoolExecutor cleanupThreadPool;
private ThreadPoolExecutor scannerReadaheadPool;

private final AtomicBoolean zooKeeperOpened = new AtomicBoolean(false);
private final Supplier<ZooSession> zooSession;

private void ensureOpen() {
if (closed) {
throw new IllegalStateException("This client was closed.");
Expand Down Expand Up @@ -224,13 +222,19 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
AccumuloConfiguration serverConf, UncaughtExceptionHandler ueh) {
this.info = info;
this.hadoopConf = info.getHadoopConf();
zooReader = new ZooReader(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
zooCache =
new ZooCacheFactory().getZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
this.serverConf = serverConf;

this.zooSession = memoize(() -> {
var zk = info
.getZooKeeperSupplier(getClass().getSimpleName() + "(" + info.getPrincipal() + ")").get();
zooKeeperOpened.set(true);
return zk;
});

this.zooCache = memoize(() -> new ZooCache(getZooSession()));
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
sslSupplier = Suppliers.memoize(() -> SslConnectionParams.forClient(getConfiguration()));
sslSupplier = memoize(() -> SslConnectionParams.forClient(getConfiguration()));
saslSupplier = memoizeWithExpiration(
() -> SaslConnectionParams.from(getConfiguration(), getCredentials().getToken()), 100,
MILLISECONDS);
Expand Down Expand Up @@ -328,7 +332,7 @@ public synchronized void setCredentials(Credentials newCredentials) {
*/
public AccumuloConfiguration getConfiguration() {
ensureOpen();
return serverConf;
return accumuloConf;
}

/**
Expand Down Expand Up @@ -520,7 +524,7 @@ public List<String> getManagerLocations() {
timer = Timer.startNew();
}

Optional<ServiceLockData> sld = zooCache.getLockData(zLockManagerPath);
Optional<ServiceLockData> sld = getZooCache().getLockData(zLockManagerPath);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
Expand All @@ -546,26 +550,7 @@ public List<String> getManagerLocations() {
*/
public InstanceId getInstanceID() {
ensureOpen();
if (instanceId == null) {
// lookup by name
final String instanceName = info.getInstanceName();
String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
byte[] data = zooCache.get(instanceNamePath);
if (data == null) {
throw new RuntimeException(
"Instance name " + instanceName + " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
String instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (zooCache.get(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new RuntimeException("Instance id " + instanceIdString
+ (instanceName == null ? "" : " pointed to by the name " + instanceName)
+ " does not exist in zookeeper");
}
instanceId = InstanceId.of(instanceIdString);
}
return instanceId;
return info.getInstanceId();
}

public String getZooKeeperRoot() {
Expand Down Expand Up @@ -605,7 +590,7 @@ public int getZooKeepersSessionTimeOut() {

public ZooCache getZooCache() {
ensureOpen();
return zooCache;
return zooCache.get();
}

private TableZooHelper tableZooHelper;
Expand Down Expand Up @@ -860,6 +845,9 @@ public AuthenticationToken token() {
@Override
public synchronized void close() {
closed = true;
if (zooKeeperOpened.get()) {
zooSession.get().close();
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
}
Expand All @@ -880,7 +868,7 @@ public static class ClientBuilderImpl<T>
SslOptions<T>, SaslOptions<T>, ClientFactory<T>, FromOptions<T> {

private Properties properties = new Properties();
private AuthenticationToken token = null;
private Optional<AuthenticationToken> tokenOpt = Optional.empty();
private final Function<ClientBuilderImpl<T>,T> builderFunction;
private UncaughtExceptionHandler ueh = null;

Expand All @@ -889,12 +877,9 @@ public ClientBuilderImpl(Function<ClientBuilderImpl<T>,T> builderFunction) {
}

private ClientInfo getClientInfo() {
if (token != null) {
ClientProperty.validate(properties, false);
return new ClientInfoImpl(properties, token);
}
ClientProperty.validate(properties);
return new ClientInfoImpl(properties);
// validate the token in the properties if not provided here
ClientProperty.validate(properties, tokenOpt.isEmpty());
return new ClientInfoImpl(properties, tokenOpt);
}

private UncaughtExceptionHandler getUncaughtExceptionHandler() {
Expand Down Expand Up @@ -1065,7 +1050,7 @@ public ConnectionOptions<T> as(CharSequence principal, AuthenticationToken token
}
setProperty(ClientProperty.AUTH_PRINCIPAL, principal.toString());
ClientProperty.setAuthenticationToken(properties, token);
this.token = token;
this.tokenOpt = Optional.of(token);
return this;
}

Expand All @@ -1089,9 +1074,9 @@ public ClientFactory<T> withUncaughtExceptionHandler(UncaughtExceptionHandler ue

}

public ZooReader getZooReader() {
public ZooSession getZooSession() {
ensureOpen();
return zooReader;
return zooSession.get();
}

protected long getTransportPoolMaxAgeMillis() {
Expand All @@ -1110,7 +1095,14 @@ public synchronized ThriftTransportPool getTransportPool() {
public synchronized ZookeeperLockChecker getTServerLockChecker() {
ensureOpen();
if (this.zkLockChecker == null) {
this.zkLockChecker = new ZookeeperLockChecker(this);
// make this use its own ZooSession and ZooCache, because this is used by the
// tablet location cache, which is a static singleton reused by multiple clients
// so, it can't rely on being able to continue to use the same client's ZooCache,
// because that client could be closed, and its ZooSession also closed
// this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk), getZooKeeperRoot() + Constants.ZTSERVERS);
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

import java.net.URL;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

/**
Expand All @@ -38,6 +42,16 @@ public interface ClientInfo {
*/
String getInstanceName();

/**
* @return Accumulo instanceId
*/
InstanceId getInstanceId();

/**
* @return a Supplier for creating new ZooKeeper client instances based on the configuration
*/
Supplier<ZooSession> getZooKeeperSupplier(String clientName);

/**
* @return Zookeeper connection information for Accumulo instance
*/
Expand Down Expand Up @@ -77,27 +91,27 @@ public interface ClientInfo {
* @return ClientInfo given properties
*/
static ClientInfo from(Properties properties) {
return new ClientInfoImpl(properties);
return new ClientInfoImpl(properties, Optional.empty());
}

/**
* @return ClientInfo given URL path to client config file
*/
static ClientInfo from(URL propertiesURL) {
return new ClientInfoImpl(propertiesURL);
return new ClientInfoImpl(ClientInfoImpl.toProperties(propertiesURL), Optional.empty());
}

/**
* @return ClientInfo given properties and token
*/
static ClientInfo from(Properties properties, AuthenticationToken token) {
return new ClientInfoImpl(properties, token);
return new ClientInfoImpl(properties, Optional.of(token));
}

/**
* @return ClientInfo given path to client config file
*/
static ClientInfo from(Path propertiesFile) {
return new ClientInfoImpl(propertiesFile);
return new ClientInfoImpl(ClientInfoImpl.toProperties(propertiesFile), Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,70 @@
*/
package org.apache.accumulo.core.clientImpl;

import static com.google.common.base.Suppliers.memoize;
import static java.util.Objects.requireNonNull;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

import com.google.common.base.Suppliers;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class ClientInfoImpl implements ClientInfo {

private final Properties properties;
private AuthenticationToken token;
private final Configuration hadoopConf;

public ClientInfoImpl(Path propertiesFile) {
this(ClientInfoImpl.toProperties(propertiesFile));
}

public ClientInfoImpl(URL propertiesURL) {
this(ClientInfoImpl.toProperties(propertiesURL));
// suppliers for lazily loading
private final Supplier<AuthenticationToken> tokenSupplier;
private final Supplier<Configuration> hadoopConf;
private final Supplier<InstanceId> instanceId;
private final Function<String,ZooSession> zooSessionForName;

public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> tokenOpt) {
this.properties = requireNonNull(properties);
// convert the optional to a supplier to delay retrieval from the properties unless needed
this.tokenSupplier = requireNonNull(tokenOpt).map(Suppliers::ofInstance)
.orElse(memoize(() -> ClientProperty.getAuthenticationToken(properties)));
this.hadoopConf = memoize(Configuration::new);
this.zooSessionForName =
name -> new ZooSession(name, getZooKeepers(), getZooKeepersSessionTimeOut(), null);
this.instanceId = memoize(() -> {
try (var zk = getZooKeeperSupplier(getClass().getSimpleName() + ".getInstanceName()").get()) {
return ZooUtil.getInstanceId(zk, getInstanceName());
}
});
}

public ClientInfoImpl(Properties properties) {
this(properties, null);
@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
}

public ClientInfoImpl(Properties properties, AuthenticationToken token) {
this.properties = properties;
this.token = token;
this.hadoopConf = new Configuration();
@Override
public InstanceId getInstanceId() {
return instanceId.get();
}

@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
public Supplier<ZooSession> getZooKeeperSupplier(String clientName) {
return () -> zooSessionForName.apply(clientName);
}

@Override
Expand Down Expand Up @@ -87,10 +109,7 @@ public Properties getProperties() {

@Override
public AuthenticationToken getAuthenticationToken() {
if (token == null) {
token = ClientProperty.getAuthenticationToken(properties);
}
return token;
return tokenSupplier.get();
}

@Override
Expand Down Expand Up @@ -134,6 +153,6 @@ public static Properties toProperties(URL propertiesURL) {

@Override
public Configuration getHadoopConf() {
return this.hadoopConf;
return hadoopConf.get();
}
}
Loading

0 comments on commit be22dee

Please sign in to comment.