Skip to content

Commit

Permalink
Additional improvements to ZooKeeper utility code
Browse files Browse the repository at this point in the history
* Add static ZooUtil.close(ZooKeeper) method to avoid handling
  InterruptedException that's documented as never being thrown from
  ZooKeeper.close()
* Lazy load and memoize ClientInfo and ServerInfo members; these are
  becoming more and more closer to being able to be merged with
  ClientContext/ServerContext for even further simplification in the
  future (if we want to pursue it)
* Remove zk connection stuff in the ZooUtil tools recently moved to that
  class; instead, a zk is explicitly passed to them
* Fix a bug in MiniCluster's verifyUp method that would dump ZK multiple
  times
  • Loading branch information
ctubbsii committed Dec 18, 2024
1 parent 3848bf1 commit f3b8454
Show file tree
Hide file tree
Showing 14 changed files with 253 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -848,12 +848,7 @@ public AuthenticationToken token() {
public synchronized void close() {
closed = true;
if (zooKeeperOpened.get()) {
try {
zooKeeper.get().close();
} catch (InterruptedException e) {
// reset the interrupt flag and continue closing
Thread.currentThread().interrupt();
}
ZooUtil.close(zooKeeper.get());
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.clientImpl;

import static com.google.common.base.Suppliers.memoize;
import static java.util.Objects.requireNonNull;

import java.io.FileInputStream;
Expand All @@ -44,19 +45,30 @@

public class ClientInfoImpl implements ClientInfo {

private final Supplier<InstanceId> instanceIdSupplier;
private final Properties properties;

// suppliers for lazily loading
private final Supplier<AuthenticationToken> tokenSupplier;
private final Configuration hadoopConf;
private final Supplier<Configuration> hadoopConf;
private final Supplier<InstanceId> instanceId;
private final Supplier<ZooKeeper> zkSupplier;

public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> tokenOpt) {
this.properties = requireNonNull(properties);
// convert the optional to a supplier to delay retrieval from the properties unless needed
this.tokenSupplier = requireNonNull(tokenOpt).map(Suppliers::ofInstance)
.orElse(Suppliers.memoize(() -> ClientProperty.getAuthenticationToken(properties)));
this.hadoopConf = new Configuration();
this.instanceIdSupplier = Suppliers.memoize(() -> ZooUtil.getInstanceID(getZooKeepers(),
getZooKeepersSessionTimeOut(), getInstanceName()));
.orElse(memoize(() -> ClientProperty.getAuthenticationToken(properties)));
this.hadoopConf = memoize(Configuration::new);
this.zkSupplier = () -> ZooUtil.connect(getClass().getSimpleName(), getZooKeepers(),
getZooKeepersSessionTimeOut(), null);
this.instanceId = memoize(() -> {
var zk = getZooKeeperSupplier().get();
try {
return ZooUtil.getInstanceId(zk, getInstanceName());
} finally {
ZooUtil.close(zk);
}
});
}

@Override
Expand All @@ -66,13 +78,12 @@ public String getInstanceName() {

@Override
public InstanceId getInstanceId() {
return instanceIdSupplier.get();
return instanceId.get();
}

@Override
public Supplier<ZooKeeper> getZooKeeperSupplier() {
return () -> ZooUtil.connect(ClientInfo.class.getSimpleName(), getZooKeepers(),
getZooKeepersSessionTimeOut(), null);
return zkSupplier;
}

@Override
Expand Down Expand Up @@ -144,6 +155,6 @@ public static Properties toProperties(URL propertiesURL) {

@Override
public Configuration getHadoopConf() {
return this.hadoopConf;
return hadoopConf.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -217,8 +216,8 @@ public static void digestAuth(ZooKeeper zoo, String secret) {
}

/**
* Construct a new ZooKeeper client, retrying if it doesn't work right away. The caller is
* responsible for closing instances returned from this method.
* Construct a new ZooKeeper client, retrying indefinitely if it doesn't work right away. The
* caller is responsible for closing instances returned from this method.
*
* @param clientName a convenient name for logging its connection state changes
* @param conf a convenient carrier of ZK connection information using Accumulo properties
Expand All @@ -230,8 +229,8 @@ public static ZooKeeper connect(String clientName, AccumuloConfiguration conf) {
}

/**
* Construct a new ZooKeeper client, retrying if it doesn't work right away. The caller is
* responsible for closing instances returned from this method.
* Construct a new ZooKeeper client, retrying indefinitely if it doesn't work right away. The
* caller is responsible for closing instances returned from this method.
*
* @param clientName a convenient name for logging its connection state changes
* @param connectString in the form of host1:port1,host2:port2/chroot/path
Expand Down Expand Up @@ -280,11 +279,12 @@ public static ZooKeeper connect(String clientName, String connectString, int tim
if (tryAgain && zk != null) {
try {
zk.close();
zk = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("interrupted", e);
throw new AssertionError(
"ZooKeeper.close() shouldn't throw this; it exists only for backwards compatibility",
e);
}
zk = null;
}
}

Expand Down Expand Up @@ -314,94 +314,88 @@ public static ZooKeeper connect(String clientName, String connectString, int tim
return zk;
}

public static void close(ZooKeeper zk) {
try {
if (zk != null) {
zk.close();
}
} catch (InterruptedException e) {
throw new AssertionError(
"ZooKeeper.close() shouldn't throw this; it exists only for backwards compatibility", e);
}
}

/**
* Given a zooCache and instanceId, look up the instance name.
*/
public static String getInstanceName(String zooKeepers, int zkSessionTimeout,
InstanceId instanceId) {
requireNonNull(zooKeepers);
public static String getInstanceName(ZooKeeper zk, InstanceId instanceId) {
requireNonNull(zk);
var instanceIdBytes = requireNonNull(instanceId).canonical().getBytes(UTF_8);
try (var zk = connect("ZooUtil.getInstanceName", zooKeepers, zkSessionTimeout, null)) {
for (String name : zk.getChildren(Constants.ZROOT + Constants.ZINSTANCES, false)) {
var bytes = zk.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + name, false, null);
if (Arrays.equals(bytes, instanceIdBytes)) {
return name;
}
for (String name : getInstanceNames(zk)) {
var bytes = getInstanceIdBytesFromName(zk, name);
if (Arrays.equals(bytes, instanceIdBytes)) {
return name;
}
}
return null;
}

private static List<String> getInstanceNames(ZooKeeper zk) {
try {
return new ZooReader(zk).getChildren(Constants.ZROOT + Constants.ZINSTANCES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Interrupted reading instance name for the instanceId " + instanceId, e);
throw new IllegalStateException("Interrupted reading instance names from ZooKeeper", e);
} catch (KeeperException e) {
throw new IllegalStateException(
"Unable to get instance name for the instanceId " + instanceId, e);
throw new IllegalStateException("Failed to read instance names from ZooKeeper", e);
}
return null;
}

/**
* Read the instance names and instance ids from ZooKeeper. The storage structure in ZooKeeper is:
*
* <pre>
* /accumulo/instances/instance_name - with the instance id stored as data.
* </pre>
*
* @return a map of (instance name, instance id) entries
*/
public static Map<String,InstanceId> readInstancesFromZk(final ZooReader zooReader) {
// TODO clean up
Map<String,InstanceId> idMap = new TreeMap<>();
private static byte[] getInstanceIdBytesFromName(ZooKeeper zk, String name) {
try {
List<String> names = zooReader.getChildren(Constants.ZROOT + Constants.ZINSTANCES);
names.forEach(name -> {
try {
byte[] uuid = zooReader.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + name);
idMap.put(name, InstanceId.of(UUID.fromString(new String(uuid, UTF_8))));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted reading instance id from ZooKeeper", ex);
} catch (KeeperException ex) {
log.warn("Failed to read instance id for " + ex.getPath());
}
});
} catch (InterruptedException ex) {
return new ZooReader(zk)
.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + requireNonNull(name));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted reading instance name info from ZooKeeper", ex);
} catch (KeeperException ex) {
throw new IllegalStateException("Failed to read instance name info from ZooKeeper", ex);
throw new IllegalStateException(
"Interrupted reading InstanceId from ZooKeeper for instance named " + name, e);
} catch (KeeperException e) {
log.warn("Failed to read InstanceId from ZooKeeper for instance named {}", name, e);
return null;
}
return idMap;
}

/**
* Returns a unique string that identifies this instance of accumulo.
*/
public static InstanceId getInstanceID(String zooKeepers, int zkSessionTimeout,
String instanceName) {
requireNonNull(zooKeepers);
requireNonNull(instanceName);
// lookup by name
String instanceIdString = null;
try (var zk = connect("ZooUtil.getInstanceID", zooKeepers, zkSessionTimeout, null)) {
String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
byte[] data = zk.getData(instanceNamePath, false, null);
if (data == null) {
throw new IllegalStateException(
"Instance name " + instanceName + " does not exist in zookeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
public static Map<String,InstanceId> getInstanceMap(ZooKeeper zk) {
Map<String,InstanceId> idMap = new TreeMap<>();
getInstanceNames(zk).forEach(name -> {
byte[] instanceId = getInstanceIdBytesFromName(zk, name);
if (instanceId != null) {
idMap.put(name, InstanceId.of(new String(instanceId, UTF_8)));
}
instanceIdString = new String(data, UTF_8);
// verify that the instanceId found via the instanceName actually exists as an instance
if (zk.getData(Constants.ZROOT + "/" + instanceIdString, false, null) == null) {
throw new IllegalStateException("Instance id " + instanceIdString
+ " pointed to by the name " + instanceName + " does not exist in zookeeper");
});
return idMap;
}

public static InstanceId getInstanceId(ZooKeeper zk, String name) {
byte[] data = getInstanceIdBytesFromName(zk, name);
if (data == null) {
throw new IllegalStateException("Instance name " + name + " does not exist in ZooKeeper. "
+ "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
}
String instanceIdString = new String(data, UTF_8);
try {
// verify that the instanceId found via the name actually exists
if (new ZooReader(zk).getData(Constants.ZROOT + "/" + instanceIdString) == null) {
throw new IllegalStateException("InstanceId " + instanceIdString
+ " pointed to by the name " + name + " does not exist in ZooKeeper");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted reading instance id from ZooKeeper", e);
throw new IllegalStateException("Interrupted verifying InstanceId " + instanceIdString
+ " pointed to by instance named " + name + " actually exists in ZooKeeper", e);
} catch (KeeperException e) {
throw new IllegalStateException(
"Unable to get instanceId for the instance name " + instanceName, e);
throw new IllegalStateException("Failed to verify InstanceId " + instanceIdString
+ " pointed to by instance named " + name + " actually exists in ZooKeeper", e);
}
return InstanceId.of(instanceIdString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.isNull;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -108,20 +109,20 @@ public void fetchInstancesFromZk() throws Exception {
String instBName = "INST_B";
InstanceId instB = InstanceId.of(UUID.randomUUID());

ZooReader zooReader = createMock(ZooReader.class);
ZooKeeper zk = createMock(ZooKeeper.class);
String namePath = Constants.ZROOT + Constants.ZINSTANCES;
expect(zooReader.getChildren(eq(namePath))).andReturn(List.of(instAName, instBName)).once();
expect(zooReader.getData(eq(namePath + "/" + instAName)))
expect(zk.getChildren(eq(namePath), isNull())).andReturn(List.of(instAName, instBName)).once();
expect(zk.getData(eq(namePath + "/" + instAName), isNull(), isNull()))
.andReturn(instA.canonical().getBytes(UTF_8)).once();
expect(zooReader.getData(eq(namePath + "/" + instBName)))
expect(zk.getData(eq(namePath + "/" + instBName), isNull(), isNull()))
.andReturn(instB.canonical().getBytes(UTF_8)).once();
replay(zooReader);
replay(zk);

Map<String,InstanceId> instanceMap = ZooUtil.readInstancesFromZk(zooReader);
Map<String,InstanceId> instanceMap = ZooUtil.getInstanceMap(zk);

log.trace("id map returned: {}", instanceMap);
assertEquals(Map.of(instAName, instA, instBName, instB), instanceMap);
verify(zooReader);
verify(zk);
}

}
Loading

0 comments on commit f3b8454

Please sign in to comment.