Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up ZooKeeper tooling #5192

Merged
merged 12 commits into from
Jan 10, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
Expand All @@ -44,6 +43,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,8 +79,6 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
Expand All @@ -106,12 +104,11 @@
import org.apache.accumulo.core.util.tables.TableZooHelper;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Suppliers;

/**
* This class represents any essential configuration and credentials needed to initiate RPC
* operations throughout the code. It is intended to represent a shared object that contains these
Expand All @@ -126,14 +123,12 @@ public class ClientContext implements AccumuloClient {
private static final Logger log = LoggerFactory.getLogger(ClientContext.class);

private final ClientInfo info;
private InstanceId instanceId;
private final ZooReader zooReader;
ctubbsii marked this conversation as resolved.
Show resolved Hide resolved
private final ZooCache zooCache;
private final Supplier<ZooCache> zooCache;

private Credentials creds;
private BatchWriterConfig batchWriterConfig;
private ConditionalWriterConfig conditionalWriterConfig;
private final AccumuloConfiguration serverConf;
private final AccumuloConfiguration accumuloConf;
private final Configuration hadoopConf;

// These fields are very frequently accessed (each time a connection is created) and expensive to
Expand All @@ -158,6 +153,9 @@ public class ClientContext implements AccumuloClient {
private ThreadPoolExecutor cleanupThreadPool;
private ThreadPoolExecutor scannerReadaheadPool;

private final AtomicBoolean zooKeeperOpened = new AtomicBoolean(false);
private final Supplier<ZooSession> zooSession;

private void ensureOpen() {
if (closed) {
throw new IllegalStateException("This client was closed.");
Expand Down Expand Up @@ -224,13 +222,19 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
AccumuloConfiguration serverConf, UncaughtExceptionHandler ueh) {
this.info = info;
this.hadoopConf = info.getHadoopConf();
zooReader = new ZooReader(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
zooCache =
new ZooCacheFactory().getZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
this.serverConf = serverConf;

this.zooSession = memoize(() -> {
var zk = info
.getZooKeeperSupplier(getClass().getSimpleName() + "(" + info.getPrincipal() + ")").get();
zooKeeperOpened.set(true);
return zk;
});

this.zooCache = memoize(() -> new ZooCache(getZooSession()));
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
sslSupplier = Suppliers.memoize(() -> SslConnectionParams.forClient(getConfiguration()));
sslSupplier = memoize(() -> SslConnectionParams.forClient(getConfiguration()));
saslSupplier = memoizeWithExpiration(
() -> SaslConnectionParams.from(getConfiguration(), getCredentials().getToken()), 100,
MILLISECONDS);
Expand Down Expand Up @@ -328,7 +332,7 @@ public synchronized void setCredentials(Credentials newCredentials) {
*/
public AccumuloConfiguration getConfiguration() {
ensureOpen();
return serverConf;
return accumuloConf;
}

/**
Expand Down Expand Up @@ -520,7 +524,7 @@ public List<String> getManagerLocations() {
timer = Timer.startNew();
}

Optional<ServiceLockData> sld = zooCache.getLockData(zLockManagerPath);
Optional<ServiceLockData> sld = getZooCache().getLockData(zLockManagerPath);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
Expand All @@ -546,26 +550,7 @@ public List<String> getManagerLocations() {
*/
public InstanceId getInstanceID() {
ensureOpen();
if (instanceId == null) {
// lookup by name
final String instanceName = info.getInstanceName();
String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
byte[] data = zooCache.get(instanceNamePath);
if (data == null) {
throw new RuntimeException(
"Instance name " + instanceName + " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
String instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (zooCache.get(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new RuntimeException("Instance id " + instanceIdString
+ (instanceName == null ? "" : " pointed to by the name " + instanceName)
+ " does not exist in zookeeper");
}
instanceId = InstanceId.of(instanceIdString);
}
return instanceId;
return info.getInstanceId();
}

public String getZooKeeperRoot() {
Expand Down Expand Up @@ -605,7 +590,7 @@ public int getZooKeepersSessionTimeOut() {

public ZooCache getZooCache() {
ensureOpen();
return zooCache;
return zooCache.get();
}

private TableZooHelper tableZooHelper;
Expand Down Expand Up @@ -860,6 +845,9 @@ public AuthenticationToken token() {
@Override
public synchronized void close() {
closed = true;
if (zooKeeperOpened.get()) {
zooSession.get().close();
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
}
Expand All @@ -880,7 +868,7 @@ public static class ClientBuilderImpl<T>
SslOptions<T>, SaslOptions<T>, ClientFactory<T>, FromOptions<T> {

private Properties properties = new Properties();
private AuthenticationToken token = null;
private Optional<AuthenticationToken> tokenOpt = Optional.empty();
private final Function<ClientBuilderImpl<T>,T> builderFunction;
private UncaughtExceptionHandler ueh = null;

Expand All @@ -889,12 +877,9 @@ public ClientBuilderImpl(Function<ClientBuilderImpl<T>,T> builderFunction) {
}

private ClientInfo getClientInfo() {
if (token != null) {
ClientProperty.validate(properties, false);
return new ClientInfoImpl(properties, token);
}
ClientProperty.validate(properties);
return new ClientInfoImpl(properties);
// validate the token in the properties if not provided here
ClientProperty.validate(properties, tokenOpt.isEmpty());
return new ClientInfoImpl(properties, tokenOpt);
}

private UncaughtExceptionHandler getUncaughtExceptionHandler() {
Expand Down Expand Up @@ -1065,7 +1050,7 @@ public ConnectionOptions<T> as(CharSequence principal, AuthenticationToken token
}
setProperty(ClientProperty.AUTH_PRINCIPAL, principal.toString());
ClientProperty.setAuthenticationToken(properties, token);
this.token = token;
this.tokenOpt = Optional.of(token);
return this;
}

Expand All @@ -1089,9 +1074,9 @@ public ClientFactory<T> withUncaughtExceptionHandler(UncaughtExceptionHandler ue

}

public ZooReader getZooReader() {
public ZooSession getZooSession() {
ensureOpen();
return zooReader;
return zooSession.get();
}

protected long getTransportPoolMaxAgeMillis() {
Expand All @@ -1110,7 +1095,14 @@ public synchronized ThriftTransportPool getTransportPool() {
public synchronized ZookeeperLockChecker getTServerLockChecker() {
ensureOpen();
if (this.zkLockChecker == null) {
this.zkLockChecker = new ZookeeperLockChecker(this);
// make this use its own ZooSession and ZooCache, because this is used by the
// tablet location cache, which is a static singleton reused by multiple clients
// so, it can't rely on being able to continue to use the same client's ZooCache,
// because that client could be closed, and its ZooSession also closed
// this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk), getZooKeeperRoot() + Constants.ZTSERVERS);
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

import java.net.URL;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

/**
Expand All @@ -38,6 +42,16 @@ public interface ClientInfo {
*/
String getInstanceName();

/**
* @return Accumulo instanceId
*/
InstanceId getInstanceId();

/**
* @return a Supplier for creating new ZooKeeper client instances based on the configuration
*/
Supplier<ZooSession> getZooKeeperSupplier(String clientName);

/**
* @return Zookeeper connection information for Accumulo instance
*/
Expand Down Expand Up @@ -77,27 +91,27 @@ public interface ClientInfo {
* @return ClientInfo given properties
*/
static ClientInfo from(Properties properties) {
return new ClientInfoImpl(properties);
return new ClientInfoImpl(properties, Optional.empty());
}

/**
* @return ClientInfo given URL path to client config file
*/
static ClientInfo from(URL propertiesURL) {
return new ClientInfoImpl(propertiesURL);
return new ClientInfoImpl(ClientInfoImpl.toProperties(propertiesURL), Optional.empty());
}

/**
* @return ClientInfo given properties and token
*/
static ClientInfo from(Properties properties, AuthenticationToken token) {
return new ClientInfoImpl(properties, token);
return new ClientInfoImpl(properties, Optional.of(token));
}

/**
* @return ClientInfo given path to client config file
*/
static ClientInfo from(Path propertiesFile) {
return new ClientInfoImpl(propertiesFile);
return new ClientInfoImpl(ClientInfoImpl.toProperties(propertiesFile), Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,70 @@
*/
package org.apache.accumulo.core.clientImpl;

import static com.google.common.base.Suppliers.memoize;
import static java.util.Objects.requireNonNull;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

import com.google.common.base.Suppliers;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class ClientInfoImpl implements ClientInfo {

private final Properties properties;
private AuthenticationToken token;
private final Configuration hadoopConf;

public ClientInfoImpl(Path propertiesFile) {
this(ClientInfoImpl.toProperties(propertiesFile));
}

public ClientInfoImpl(URL propertiesURL) {
this(ClientInfoImpl.toProperties(propertiesURL));
// suppliers for lazily loading
private final Supplier<AuthenticationToken> tokenSupplier;
private final Supplier<Configuration> hadoopConf;
private final Supplier<InstanceId> instanceId;
private final Function<String,ZooSession> zooSessionForName;

public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> tokenOpt) {
this.properties = requireNonNull(properties);
// convert the optional to a supplier to delay retrieval from the properties unless needed
this.tokenSupplier = requireNonNull(tokenOpt).map(Suppliers::ofInstance)
.orElse(memoize(() -> ClientProperty.getAuthenticationToken(properties)));
this.hadoopConf = memoize(Configuration::new);
this.zooSessionForName =
name -> new ZooSession(name, getZooKeepers(), getZooKeepersSessionTimeOut(), null);
this.instanceId = memoize(() -> {
try (var zk = getZooKeeperSupplier(getClass().getSimpleName() + ".getInstanceName()").get()) {
return ZooUtil.getInstanceId(zk, getInstanceName());
}
});
}

public ClientInfoImpl(Properties properties) {
this(properties, null);
@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
}

public ClientInfoImpl(Properties properties, AuthenticationToken token) {
this.properties = properties;
this.token = token;
this.hadoopConf = new Configuration();
@Override
public InstanceId getInstanceId() {
return instanceId.get();
}

@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
public Supplier<ZooSession> getZooKeeperSupplier(String clientName) {
return () -> zooSessionForName.apply(clientName);
}

@Override
Expand Down Expand Up @@ -87,10 +109,7 @@ public Properties getProperties() {

@Override
public AuthenticationToken getAuthenticationToken() {
if (token == null) {
token = ClientProperty.getAuthenticationToken(properties);
}
return token;
return tokenSupplier.get();
}

@Override
Expand Down Expand Up @@ -134,6 +153,6 @@ public static Properties toProperties(URL propertiesURL) {

@Override
public Configuration getHadoopConf() {
return this.hadoopConf;
return hadoopConf.get();
}
}
Loading
Loading