Skip to content

Commit

Permalink
Merge branch '3.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
ctubbsii committed Jan 12, 2025
2 parents 7701e59 + be22dee commit e745a5d
Show file tree
Hide file tree
Showing 138 changed files with 2,059 additions and 2,409 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_CLEANUP_POOL;
Expand All @@ -44,11 +43,11 @@
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
Expand Down Expand Up @@ -79,8 +78,6 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
Expand All @@ -105,6 +102,7 @@
import org.apache.accumulo.core.util.tables.TableZooHelper;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -127,14 +125,12 @@ public class ClientContext implements AccumuloClient {
private static final Logger log = LoggerFactory.getLogger(ClientContext.class);

private final ClientInfo info;
private InstanceId instanceId;
private final ZooReader zooReader;
private final ZooCache zooCache;
private final Supplier<ZooCache> zooCache;

private Credentials creds;
private BatchWriterConfig batchWriterConfig;
private ConditionalWriterConfig conditionalWriterConfig;
private final AccumuloConfiguration serverConf;
private final AccumuloConfiguration accumuloConf;
private final Configuration hadoopConf;

// These fields are very frequently accessed (each time a connection is created) and expensive to
Expand Down Expand Up @@ -162,6 +158,9 @@ public class ClientContext implements AccumuloClient {
private MeterRegistry micrometer;
private Caches caches;

private final AtomicBoolean zooKeeperOpened = new AtomicBoolean(false);
private final Supplier<ZooSession> zooSession;

private void ensureOpen() {
if (closed) {
throw new IllegalStateException("This client was closed.");
Expand Down Expand Up @@ -228,13 +227,19 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
AccumuloConfiguration serverConf, UncaughtExceptionHandler ueh) {
this.info = info;
this.hadoopConf = info.getHadoopConf();
zooReader = new ZooReader(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
zooCache =
new ZooCacheFactory().getZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut());
this.serverConf = serverConf;

this.zooSession = memoize(() -> {
var zk = info
.getZooKeeperSupplier(getClass().getSimpleName() + "(" + info.getPrincipal() + ")").get();
zooKeeperOpened.set(true);
return zk;
});

this.zooCache = memoize(() -> new ZooCache(getZooSession()));
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
sslSupplier = Suppliers.memoize(() -> SslConnectionParams.forClient(getConfiguration()));
sslSupplier = memoize(() -> SslConnectionParams.forClient(getConfiguration()));
saslSupplier = memoizeWithExpiration(
() -> SaslConnectionParams.from(getConfiguration(), getCredentials().getToken()), 100,
MILLISECONDS);
Expand Down Expand Up @@ -333,7 +338,7 @@ public synchronized void setCredentials(Credentials newCredentials) {
*/
public AccumuloConfiguration getConfiguration() {
ensureOpen();
return serverConf;
return accumuloConf;
}

/**
Expand Down Expand Up @@ -484,26 +489,7 @@ public synchronized TCredentials rpcCreds() {
* @return a UUID
*/
public InstanceId getInstanceID() {
if (instanceId == null) {
// lookup by name
final String instanceName = info.getInstanceName();
String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
byte[] data = zooCache.get(instanceNamePath);
if (data == null) {
throw new RuntimeException(
"Instance name " + instanceName + " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
String instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (zooCache.get(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new RuntimeException("Instance id " + instanceIdString
+ (instanceName == null ? "" : " pointed to by the name " + instanceName)
+ " does not exist in zookeeper");
}
instanceId = InstanceId.of(instanceIdString);
}
return instanceId;
return info.getInstanceId();
}

public String getZooKeeperRoot() {
Expand Down Expand Up @@ -541,7 +527,7 @@ public int getZooKeepersSessionTimeOut() {
}

public ZooCache getZooCache() {
return zooCache;
return zooCache.get();
}

private TableZooHelper tableZooHelper;
Expand Down Expand Up @@ -796,6 +782,9 @@ public AuthenticationToken token() {
@Override
public synchronized void close() {
closed = true;
if (zooKeeperOpened.get()) {
zooSession.get().close();
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
}
Expand All @@ -816,7 +805,7 @@ public static class ClientBuilderImpl<T>
SslOptions<T>, SaslOptions<T>, ClientFactory<T>, FromOptions<T> {

private Properties properties = new Properties();
private AuthenticationToken token = null;
private Optional<AuthenticationToken> tokenOpt = Optional.empty();
private final Function<ClientBuilderImpl<T>,T> builderFunction;
private UncaughtExceptionHandler ueh = null;

Expand All @@ -825,12 +814,9 @@ public ClientBuilderImpl(Function<ClientBuilderImpl<T>,T> builderFunction) {
}

private ClientInfo getClientInfo() {
if (token != null) {
ClientProperty.validate(properties, false);
return new ClientInfoImpl(properties, token);
}
ClientProperty.validate(properties);
return new ClientInfoImpl(properties);
// validate the token in the properties if not provided here
ClientProperty.validate(properties, tokenOpt.isEmpty());
return new ClientInfoImpl(properties, tokenOpt);
}

private UncaughtExceptionHandler getUncaughtExceptionHandler() {
Expand Down Expand Up @@ -1001,7 +987,7 @@ public ConnectionOptions<T> as(CharSequence principal, AuthenticationToken token
}
setProperty(ClientProperty.AUTH_PRINCIPAL, principal.toString());
ClientProperty.setAuthenticationToken(properties, token);
this.token = token;
this.tokenOpt = Optional.of(token);
return this;
}

Expand All @@ -1025,9 +1011,9 @@ public ClientFactory<T> withUncaughtExceptionHandler(UncaughtExceptionHandler ue

}

public ZooReader getZooReader() {
public ZooSession getZooSession() {
ensureOpen();
return zooReader;
return zooSession.get();
}

protected long getTransportPoolMaxAgeMillis() {
Expand Down Expand Up @@ -1069,7 +1055,14 @@ && getConfiguration().getBoolean(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABL
public synchronized ZookeeperLockChecker getTServerLockChecker() {
ensureOpen();
if (this.zkLockChecker == null) {
this.zkLockChecker = new ZookeeperLockChecker(this);
// make this use its own ZooSession and ZooCache, because this is used by the
// tablet location cache, which is a static singleton reused by multiple clients
// so, it can't rely on being able to continue to use the same client's ZooCache,
// because that client could be closed, and its ZooSession also closed
// this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk), getZooKeeperRoot(), getServerPaths());
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

import java.net.URL;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

/**
Expand All @@ -38,6 +42,16 @@ public interface ClientInfo {
*/
String getInstanceName();

/**
* @return Accumulo instanceId
*/
InstanceId getInstanceId();

/**
* @return a Supplier for creating new ZooKeeper client instances based on the configuration
*/
Supplier<ZooSession> getZooKeeperSupplier(String clientName);

/**
* @return Zookeeper connection information for Accumulo instance
*/
Expand Down Expand Up @@ -77,27 +91,27 @@ public interface ClientInfo {
* @return ClientInfo given properties
*/
static ClientInfo from(Properties properties) {
return new ClientInfoImpl(properties);
return new ClientInfoImpl(properties, Optional.empty());
}

/**
* @return ClientInfo given URL path to client config file
*/
static ClientInfo from(URL propertiesURL) {
return new ClientInfoImpl(propertiesURL);
return new ClientInfoImpl(ClientInfoImpl.toProperties(propertiesURL), Optional.empty());
}

/**
* @return ClientInfo given properties and token
*/
static ClientInfo from(Properties properties, AuthenticationToken token) {
return new ClientInfoImpl(properties, token);
return new ClientInfoImpl(properties, Optional.of(token));
}

/**
* @return ClientInfo given path to client config file
*/
static ClientInfo from(Path propertiesFile) {
return new ClientInfoImpl(propertiesFile);
return new ClientInfoImpl(ClientInfoImpl.toProperties(propertiesFile), Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,70 @@
*/
package org.apache.accumulo.core.clientImpl;

import static com.google.common.base.Suppliers.memoize;
import static java.util.Objects.requireNonNull;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

import com.google.common.base.Suppliers;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class ClientInfoImpl implements ClientInfo {

private final Properties properties;
private AuthenticationToken token;
private final Configuration hadoopConf;

public ClientInfoImpl(Path propertiesFile) {
this(ClientInfoImpl.toProperties(propertiesFile));
}

public ClientInfoImpl(URL propertiesURL) {
this(ClientInfoImpl.toProperties(propertiesURL));
// suppliers for lazily loading
private final Supplier<AuthenticationToken> tokenSupplier;
private final Supplier<Configuration> hadoopConf;
private final Supplier<InstanceId> instanceId;
private final Function<String,ZooSession> zooSessionForName;

public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> tokenOpt) {
this.properties = requireNonNull(properties);
// convert the optional to a supplier to delay retrieval from the properties unless needed
this.tokenSupplier = requireNonNull(tokenOpt).map(Suppliers::ofInstance)
.orElse(memoize(() -> ClientProperty.getAuthenticationToken(properties)));
this.hadoopConf = memoize(Configuration::new);
this.zooSessionForName =
name -> new ZooSession(name, getZooKeepers(), getZooKeepersSessionTimeOut(), null);
this.instanceId = memoize(() -> {
try (var zk = getZooKeeperSupplier(getClass().getSimpleName() + ".getInstanceName()").get()) {
return ZooUtil.getInstanceId(zk, getInstanceName());
}
});
}

public ClientInfoImpl(Properties properties) {
this(properties, null);
@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
}

public ClientInfoImpl(Properties properties, AuthenticationToken token) {
this.properties = properties;
this.token = token;
this.hadoopConf = new Configuration();
@Override
public InstanceId getInstanceId() {
return instanceId.get();
}

@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
public Supplier<ZooSession> getZooKeeperSupplier(String clientName) {
return () -> zooSessionForName.apply(clientName);
}

@Override
Expand Down Expand Up @@ -87,10 +109,7 @@ public Properties getProperties() {

@Override
public AuthenticationToken getAuthenticationToken() {
if (token == null) {
token = ClientProperty.getAuthenticationToken(properties);
}
return token;
return tokenSupplier.get();
}

@Override
Expand Down Expand Up @@ -134,6 +153,6 @@ public static Properties toProperties(URL propertiesURL) {

@Override
public Configuration getHadoopConf() {
return this.hadoopConf;
return hadoopConf.get();
}
}
Loading

0 comments on commit e745a5d

Please sign in to comment.