Skip to content

Commit

Permalink
Merge branch 'main' into zc-recursive-watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 15, 2025
2 parents 89ccf24 + 0add23e commit 167739e
Show file tree
Hide file tree
Showing 19 changed files with 181 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void addZooCacheWatcher(ZooCacheWatcher watcher) {
protected void setupWatchers(List<String> pathsToWatch) {
for (String left : pathsToWatch) {
for (String right : pathsToWatch) {
if (left != right && left.contains(right)) {
if (left.equals(right) && left.contains(right)) {
throw new IllegalArgumentException(
"Overlapping paths found in paths to watch: " + pathsToWatch);
}
Expand Down Expand Up @@ -582,7 +582,6 @@ public boolean childrenCached(String zPath) {
*/
public void clear(Predicate<String> pathPredicate) {
Preconditions.checkState(!closed);

Predicate<String> pathPredicateWrapper = path -> {
boolean testResult = isWatchedPath(path) && pathPredicate.test(path);
if (testResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@ public static void createNonHaServiceLockPath(Type server, ZooReaderWriter zrw,
try {
zrw.mkdirs(rgPath);
zrw.putPersistentData(slp.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NOAUTH) {
LOG.error("Failed to write to ZooKeeper. Ensure that"
+ " accumulo.properties, specifically instance.secret, is consistent.");
}
} catch (NoAuthException e) {
LOG.error("Failed to write to ZooKeeper. Ensure that"
+ " accumulo.properties, specifically instance.secret, is consistent.");
throw e;
}

}

/**
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -214,8 +216,19 @@ public static <T> T wrapService(final T instance) {
private static <T> T wrapRpc(final InvocationHandler handler, final T instance) {
@SuppressWarnings("unchecked")
T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), handler);
getInterfaces(instance.getClass()).toArray(new Class<?>[0]), handler);
return proxiedInstance;
}

private static Set<Class<?>> getInterfaces(Class<?> clazz) {
var set = new HashSet<Class<?>>();
if (clazz != null) {
set.addAll(getInterfaces(clazz.getSuperclass()));
for (Class<?> interfaze : clazz.getInterfaces()) {
set.add(interfaze);
}
}
return set;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

Expand All @@ -32,16 +32,16 @@
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.constraints.Constraint.Environment;
import org.apache.accumulo.core.security.AuthorizationContainer;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class VisibilityConstraintTest {

VisibilityConstraint vc;
Environment env;
Mutation mutation;
private VisibilityConstraint vc;
private Environment env;
private Mutation mutation;

static final ColumnVisibility good = new ColumnVisibility("good|bad");
static final ColumnVisibility bad = new ColumnVisibility("good&bad");
Expand All @@ -57,17 +57,18 @@ public void setUp() {
vc = new VisibilityConstraint();
mutation = new Mutation("r");

ArrayByteSequence bs = new ArrayByteSequence("good".getBytes(UTF_8));

AuthorizationContainer ac = createNiceMock(AuthorizationContainer.class);
expect(ac.contains(bs)).andReturn(true);
replay(ac);

env = createMock(Environment.class);
expect(env.getAuthorizationsContainer()).andReturn(ac);
expect(env.getAuthorizationsContainer())
.andReturn(new ArrayByteSequence("good".getBytes(UTF_8))::equals).anyTimes();

replay(env);
}

@AfterEach
public void tearDown() {
verify(env);
}

@Test
public void testNoVisibility() {
mutation.put(D, D, D);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.server.util.ZooZap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -270,6 +271,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep
if (managerProcess != null) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
try {
new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager");
} catch (RuntimeException e) {
log.error("Error zapping Manager zookeeper lock", e);
}
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -109,6 +110,7 @@
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.util.AccumuloStatus;
import org.apache.accumulo.server.util.PortUtils;
import org.apache.accumulo.server.util.ZooZap;
import org.apache.accumulo.start.Main;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.accumulo.start.util.MiniDFSUtil;
Expand Down Expand Up @@ -994,10 +996,45 @@ public synchronized void stop() throws IOException, InterruptedException {
control.stop(ServerType.GARBAGE_COLLECTOR, null);
control.stop(ServerType.MANAGER, null);
control.stop(ServerType.TABLET_SERVER, null);
control.stop(ServerType.ZOOKEEPER, null);
control.stop(ServerType.COMPACTOR, null);
control.stop(ServerType.SCAN_SERVER, null);

// The method calls above kill the server
// Clean up the locks in ZooKeeper fo that if the cluster
// is restarted, then the processes will start right away
// and not wait for the old locks to be cleaned up.
try {
new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager", "-tservers",
"-compactors", "-sservers");
} catch (RuntimeException e) {
log.error("Error zapping zookeeper locks", e);
}

// Clear the location of the servers in ZooCache.
// When ZooKeeper was stopped in the previous method call,
// the local ZooKeeper watcher did not fire. If MAC is
// restarted, then ZooKeeper will start on the same port with
// the same data, but no Watchers will fire.
boolean startCalled = true;
try {
getServerContext().getZooKeeperRoot();
} catch (IllegalStateException e) {
if (e.getMessage().startsWith("Accumulo not initialized")) {
startCalled = false;
}
}
if (startCalled) {
final ServerContext ctx = getServerContext();
final String zRoot = ctx.getZooKeeperRoot();
Predicate<String> pred = path -> false;
for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK,
Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) {
pred = pred.or(path -> path.startsWith(zRoot + lockPath));
}
ctx.getZooCache().clear(pred);
}
control.stop(ServerType.ZOOKEEPER, null);

// ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs
if (executor != null) {
List<Runnable> tasksRemaining = executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,13 @@ public void accept(WatchedEvent event) {
case NodeCreated:
case NodeDataChanged:
// state transition
TableState tState = updateTableStateCache(tableId);
log.debug("State transition to {} @ {}", tState, event);
synchronized (observers) {
for (TableObserver to : observers) {
to.stateChanged(tableId, tState);
if (tableId != null) {
TableState tState = updateTableStateCache(tableId);
log.debug("State transition to {} @ {}", tState, event);
synchronized (observers) {
for (TableObserver to : observers) {
to.stateChanged(tableId, tState);
}
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,30 +182,6 @@ private StatusSummary getStatusSummary(ServiceStatusReport.ReportKey displayName
return new StatusSummary(displayNames, byGroup.keySet(), byGroup, result.getErrorCount());
}

/**
* Read the node names from ZooKeeper. Exceptions are counted but ignored.
*
* @return Result with error count, Set of the node names.
*/
@VisibleForTesting
Result<Set<String>> readNodeNames(final ZooReader zooReader, final String path) {
Set<String> nodeNames = new TreeSet<>();
final AtomicInteger errorCount = new AtomicInteger(0);
try {
var children = zooReader.getChildren(path);
if (children != null) {
nodeNames.addAll(children);
}
} catch (KeeperException | InterruptedException ex) {
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
errorCount.incrementAndGet();
}
return new Result<>(errorCount.get(), nodeNames);
}

/**
* Read the data from a ZooKeeper node, tracking if an error occurred. ZooKeeper's exceptions are
* counted but otherwise ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonManager.Mode;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.start.spi.KeywordExecutable;
Expand Down Expand Up @@ -86,6 +87,19 @@ public static void main(String[] args) throws Exception {

@Override
public void execute(String[] args) throws Exception {
try {
var siteConf = SiteConfiguration.auto();
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
}
zap(siteConf, args);
} finally {
SingletonManager.setMode(Mode.CLOSED);
}
}

public void zap(SiteConfiguration siteConf, String... args) {
Opts opts = new Opts();
opts.parseArgs(keyword(), args);

Expand All @@ -94,8 +108,7 @@ public void execute(String[] args) throws Exception {
return;
}

try {
var siteConf = SiteConfiguration.auto();
try (var zk = new ZooSession(getClass().getSimpleName(), siteConf)) {
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
Expand Down Expand Up @@ -175,10 +188,7 @@ public void execute(String[] args) throws Exception {
}
}

} finally {
SingletonManager.setMode(Mode.CLOSED);
}

}

private static void zapDirectory(ZooReaderWriter zoo, ServiceLockPath path, Opts opts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.Constants.ZGC_LOCK;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
Expand Down Expand Up @@ -357,21 +358,56 @@ public void testScanServerHosts() throws Exception {
@Test
public void testCompactorStatus() throws Exception {
replay(zooReader);

UUID uuid1 = UUID.randomUUID();
String lock1Name = "zlock#" + uuid1 + "#0000000001";
UUID uuid2 = UUID.randomUUID();
String lock2Name = "zlock#" + uuid2 + "#0000000022";
UUID uuid3 = UUID.randomUUID();
String lock3Name = "zlock#" + uuid3 + "#0000000033";
UUID uuid4 = UUID.randomUUID();
String lock4Name = "zlock#" + uuid4 + "#0000000044";

String lock1data =
"{\"descriptors\":[{\"uuid\":\"6effb690-c29c-4e0b-92ff-f6b308385a42\",\"service\":\"COMPACTOR\",\"address\":\"hostA:8080\",\"group\":\"q1\"}]}";
String lock2data =
"{\"descriptors\":[{\"uuid\":\"6effb690-c29c-4e0b-92ff-f6b308385a42\",\"service\":\"COMPACTOR\",\"address\":\"hostC:8081\",\"group\":\"q1\"}]}";
String lock3data =
"{\"descriptors\":[{\"uuid\":\"6effb690-c29c-4e0b-92ff-f6b308385a42\",\"service\":\"COMPACTOR\",\"address\":\"hostB:9090\",\"group\":\"q2\"}]}";
String lock4data =
"{\"descriptors\":[{\"uuid\":\"6effb690-c29c-4e0b-92ff-f6b308385a42\",\"service\":\"COMPACTOR\",\"address\":\"hostD:9091\",\"group\":\"q2\"}]}";

String lockPath = zRoot + Constants.ZCOMPACTORS;
expect(zooCache.getChildren(lockPath)).andReturn(List.of("q1", "q2"));
expect(zooCache.getChildren(lockPath)).andReturn(List.of("q1", "q2", "q3"));
expect(zooCache.getChildren(lockPath + "/q1")).andReturn(List.of("hostA:8080", "hostC:8081"));
expect(zooCache.getChildren(lockPath + "/q2")).andReturn(List.of("hostB:9090", "hostD:9091"));
expect(zooCache.getChildren(lockPath + "/q1/hostA:8080")).andReturn(List.of());
expect(zooCache.getChildren(lockPath + "/q1/hostC:8081")).andReturn(List.of());
expect(zooCache.getChildren(lockPath + "/q2/hostB:9090")).andReturn(List.of());
expect(zooCache.getChildren(lockPath + "/q2/hostD:9091")).andReturn(List.of());
// Create compactor group with dead compactor
expect(zooCache.getChildren(lockPath + "/q3")).andReturn(List.of("deadHost:8080"));

expect(zooCache.getChildren(lockPath + "/q1/hostA:8080")).andReturn(List.of(lock1Name));
expect(zooCache.get(eq(lockPath + "/q1/hostA:8080/" + lock1Name), anyObject(ZcStat.class)))
.andReturn(lock1data.getBytes(UTF_8));
expect(zooCache.getChildren(lockPath + "/q1/hostC:8081")).andReturn(List.of(lock2Name));
expect(zooCache.get(eq(lockPath + "/q1/hostC:8081/" + lock2Name), anyObject(ZcStat.class)))
.andReturn(lock2data.getBytes(UTF_8));
expect(zooCache.getChildren(lockPath + "/q2/hostB:9090")).andReturn(List.of(lock3Name));
expect(zooCache.get(eq(lockPath + "/q2/hostB:9090/" + lock3Name), anyObject(ZcStat.class)))
.andReturn(lock3data.getBytes(UTF_8));
expect(zooCache.getChildren(lockPath + "/q2/hostD:9091")).andReturn(List.of(lock4Name));
expect(zooCache.get(eq(lockPath + "/q2/hostD:9091/" + lock4Name), anyObject(ZcStat.class)))
.andReturn(lock4data.getBytes(UTF_8));
expect(zooCache.getChildren(lockPath + "/q3/deadHost:8080")).andReturn(List.of());

replay(zooCache);

ServiceStatusCmd cmd = new ServiceStatusCmd();
StatusSummary status = cmd.getCompactorStatus(context);

LOG.info("compactor group counts: {}", status);
assertEquals(0, status.getResourceGroups().size());
assertEquals(2, status.getResourceGroups().size());

LOG.info("Live compactor counts: {}", status.getServiceCount());
assertEquals(4, status.getServiceCount());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private static void testMerge(List<TabletMetadata> inputTablets, TableId tableId
EasyMock.expectLastCall().once();

// setup processing of conditional mutations
Ample.ConditionalResult cr = EasyMock.niceMock(Ample.ConditionalResult.class);
Ample.ConditionalResult cr = EasyMock.createMock(Ample.ConditionalResult.class);
EasyMock.expect(cr.getStatus()).andReturn(Ample.ConditionalResult.Status.ACCEPTED)
.atLeastOnce();
EasyMock.expect(tabletsMutator.process()).andReturn(Map.of(lastExtent, cr)).atLeastOnce();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ public void testManyColumns() throws Exception {
EasyMock.expect(tabletsMutator.mutateTablet(origExtent)).andReturn(tablet3Mutator);

// setup processing of conditional mutations
Ample.ConditionalResult cr = EasyMock.niceMock(Ample.ConditionalResult.class);
Ample.ConditionalResult cr = EasyMock.createMock(Ample.ConditionalResult.class);
EasyMock.expect(cr.getExtent()).andReturn(origExtent).atLeastOnce();
EasyMock.expect(cr.getStatus()).andReturn(Ample.ConditionalResult.Status.ACCEPTED)
.atLeastOnce();
EasyMock.expect(tabletsMutator.process())
Expand Down
Loading

0 comments on commit 167739e

Please sign in to comment.