Skip to content

Commit

Permalink
Merge branch '2.1' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 6, 2025
2 parents c48adea + b3424a8 commit 81c7f3d
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 294 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.lock;

import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.util.Halt;
import org.apache.zookeeper.KeeperException.NoAuthException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceLockSupport {

/**
* Lock Watcher used by Highly Available services. These are services where only instance is
* running at a time, but another backup service can be started that will be used if the active
* service instance fails and loses its lock in ZK.
*/
public static class HAServiceLockWatcher implements AccumuloLockWatcher {

private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class);

private final String serviceName;
private volatile boolean acquiredLock = false;
private volatile boolean failedToAcquireLock = false;

public HAServiceLockWatcher(String serviceName) {
this.serviceName = serviceName;
}

@Override
public void lostLock(LockLossReason reason) {
Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
}

@Override
public void unableToMonitorLockNode(final Exception e) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
Halt.halt(-1,
() -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e));

}

@Override
public synchronized void acquiredLock() {
LOG.debug("Acquired {} lock", serviceName);

if (acquiredLock || failedToAcquireLock) {
Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1);
}

acquiredLock = true;
notifyAll();
}

@Override
public synchronized void failedToAcquireLock(Exception e) {
LOG.warn("Failed to get {} lock", serviceName, e);

if (e instanceof NoAuthException) {
String msg =
"Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication.";
LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e);
Halt.halt(msg, -1);
}

if (acquiredLock) {
Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock,
-1);
}

failedToAcquireLock = true;
notifyAll();
}

public synchronized void waitForChange() {
while (!acquiredLock && !failedToAcquireLock) {
try {
LOG.info("{} lock held by someone else, waiting for a change in state", serviceName);
wait();
} catch (InterruptedException e) {
// empty
}
}
}

public boolean isLockAcquired() {
return acquiredLock;
}

public boolean isFailedToAcquireLock() {
return failedToAcquireLock;
}

}

/**
* Lock Watcher used by non-HA services
*/
public static class ServiceLockWatcher implements LockWatcher {

private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class);

private final String serviceName;
private final Supplier<Boolean> shuttingDown;
private final Consumer<String> lostLockAction;

public ServiceLockWatcher(String serviceName, Supplier<Boolean> shuttingDown,
Consumer<String> lostLockAction) {
this.serviceName = serviceName;
this.shuttingDown = shuttingDown;
this.lostLockAction = lostLockAction;
}

@Override
public void lostLock(final LockLossReason reason) {
Halt.halt(1, () -> {
if (!shuttingDown.get()) {
LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason);
}
lostLockAction.accept(serviceName);
});
}

@Override
public void unableToMonitorLockNode(final Exception e) {
Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
Expand Down Expand Up @@ -219,12 +220,12 @@ protected void getCoordinatorLock(HostAndPort clientAddress)
ServiceLock.path(lockPath), zooLockUUID);
while (true) {

CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher();
HAServiceLockWatcher coordinatorLockWatcher = new HAServiceLockWatcher("coordinator");
coordinatorLock.lock(coordinatorLockWatcher,
new ServiceLockData(zooLockUUID, coordinatorClientAddress, ThriftService.COORDINATOR));

coordinatorLockWatcher.waitForChange();
if (coordinatorLockWatcher.isAcquiredLock()) {
if (coordinatorLockWatcher.isLockAcquired()) {
break;
}
if (!coordinatorLockWatcher.isFailedToAcquireLock()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
Expand All @@ -98,7 +98,6 @@
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
Expand Down Expand Up @@ -273,20 +272,8 @@ protected void announceExistence(HostAndPort clientAddress)

compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
ServiceLock.path(zPath), compactorId);
LockWatcher lw = new LockWatcher() {
@Override
public void lostLock(final LockLossReason reason) {
Halt.halt(1, () -> {
LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
});
}

@Override
public void unableToMonitorLockNode(final Exception e) {
Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e));
}
};
LockWatcher lw = new ServiceLockWatcher("compactor", () -> false,
(name) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));

try {
for (int i = 0; i < 25; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,14 @@
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.gc.metrics.GcCycleMetrics;
import org.apache.accumulo.gc.metrics.GcMetrics;
Expand Down Expand Up @@ -336,31 +334,32 @@ boolean moveToTrash(Path path) throws IOException {
private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException {
var path = ServiceLock.path(getContext().getZooKeeperRoot() + Constants.ZGC_LOCK);

LockWatcher lockWatcher = new LockWatcher() {
@Override
public void lostLock(LockLossReason reason) {
Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1);
}
UUID zooLockUUID = UUID.randomUUID();
gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID);

@Override
public void unableToMonitorLockNode(final Exception e) {
// ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility
Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e));
while (true) {
HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc");
gcLock.lock(gcLockWatcher,
new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC));

gcLockWatcher.waitForChange();

if (gcLockWatcher.isLockAcquired()) {
break;
}
};

UUID zooLockUUID = UUID.randomUUID();
gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID);
while (true) {
if (gcLock.tryLock(lockWatcher,
new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC))) {
log.debug("Got GC ZooKeeper lock");
return;
if (!gcLockWatcher.isFailedToAcquireLock()) {
throw new IllegalStateException("gc lock in unknown state");
}

gcLock.tryToCancelAsyncLockOrUnlock();

log.debug("Failed to get GC ZooKeeper lock, will retry");
sleepUninterruptibly(1, TimeUnit.SECONDS);
sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
}

log.info("Got GC lock.");

}

private HostAndPort startStatsService() {
Expand Down
Loading

0 comments on commit 81c7f3d

Please sign in to comment.