Skip to content

Commit

Permalink
Added RPC to make Manager aware that TServer is shutting down.
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 17, 2025
1 parent 554efe8 commit eae2f22
Show file tree
Hide file tree
Showing 12 changed files with 1,370 additions and 31 deletions.

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions core/src/main/thrift/manager.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,15 @@ service ManagerClientService {
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void tabletServerStopping(
1:trace.TInfo tinfo
2:security.TCredentials credentials
3:string tabletServer
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void setSystemProperty(
1:trace.TInfo tinfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void gracefulShutdown(TCredentials credentials) {
}
}

protected boolean isShutdownRequested() {
public boolean isShutdownRequested() {
return shutdownRequested.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -242,6 +243,9 @@ static class TServerInfo {
// as above, indexed by TServerInstance
private final Map<TServerInstance,TServerInfo> currentInstances = new HashMap<>();

private final ConcurrentHashMap<String,TServerInfo> serversShuttingDown =
new ConcurrentHashMap<>();

// The set of entries in zookeeper without locks, and the first time each was noticed
private final Map<String,Long> locklessServers = new HashMap<>();

Expand All @@ -264,6 +268,19 @@ public synchronized void startListeningForTabletServerChanges() {
.scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS));
}

public void tabletServerShuttingDown(String server) {

TServerInfo info = null;
synchronized (this) {
info = current.get(server);
}
if (info != null) {
serversShuttingDown.put(server, info);
} else {
log.info("Tablet Server reported it's shutting down, but not in list of current servers");
}
}

public synchronized void scanServers() {
try {
final Set<TServerInstance> updates = new HashSet<>();
Expand Down Expand Up @@ -312,6 +329,7 @@ private synchronized void checkServer(final Set<TServerInstance> updates,
doomed.add(info.instance);
current.remove(zPath);
currentInstances.remove(info.instance);
serversShuttingDown.remove(zPath);
}

Long firstSeen = locklessServers.get(zPath);
Expand Down Expand Up @@ -389,7 +407,9 @@ public synchronized TServerConnection getConnection(TServerInstance server) {
}

public synchronized Set<TServerInstance> getCurrentServers() {
return new HashSet<>(currentInstances.keySet());
Set<TServerInstance> current = currentInstances.keySet();
serversShuttingDown.values().forEach(tsi -> current.remove(tsi.instance));
return new HashSet<>(current);
}

public synchronized int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
Expand All @@ -61,7 +62,7 @@ public class DistributedWorkQueue {
private ThreadPoolExecutor threadPool;
private final ZooReaderWriter zoo;
private final String path;
private final ServerContext context;
private final AbstractServer server;
private final long timerInitialDelay;
private final long timerPeriod;

Expand All @@ -80,6 +81,11 @@ private void lookForWork(final Processor processor, List<String> children) {
try {
for (final String child : children) {

// Don't accept work if the server is shutting down
if (server.isShutdownRequested()) {
return;
}

if (child.equals(LOCKS_NODE)) {
continue;
}
Expand Down Expand Up @@ -169,23 +175,27 @@ public interface Processor {
void process(String workID, byte[] data);
}

public DistributedWorkQueue(String path, AccumuloConfiguration config, ServerContext context) {
public DistributedWorkQueue(String path, AccumuloConfiguration config, AbstractServer server) {
// Preserve the old delay and period
this(path, config, context, random.nextInt(toIntExact(MINUTES.toMillis(1))),
this(path, config, server, random.nextInt(toIntExact(MINUTES.toMillis(1))),
MINUTES.toMillis(1));
}

public DistributedWorkQueue(String path, AccumuloConfiguration config, ServerContext context,
public DistributedWorkQueue(String path, AccumuloConfiguration config, AbstractServer server,
long timerInitialDelay, long timerPeriod) {
this.path = path;
this.context = context;
this.server = server;
this.timerInitialDelay = timerInitialDelay;
this.timerPeriod = timerPeriod;
zoo = context.getZooReaderWriter();
zoo = server.getContext().getZooReaderWriter();
}

public ServerContext getContext() {
return context;
return server.getContext();
}

public AbstractServer getServer() {
return server;
}

public void startProcessing(final Processor processor, ThreadPoolExecutor executorService)
Expand Down Expand Up @@ -225,7 +235,7 @@ public void process(WatchedEvent event) {

// Add a little jitter to avoid all the tservers slamming zookeeper at once
ThreadPools.watchCriticalScheduledTask(
context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
server.getContext().getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
log.debug("Looking for work in {}", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.clientImpl.thrift.ThriftConcurrentModificationException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
Expand Down Expand Up @@ -342,6 +343,17 @@ public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer
log.debug("FATE op shutting down " + tabletServer + " finished");
}

@Override
public void tabletServerStopping(TInfo tinfo, TCredentials credentials, String tabletServer)
throws ThriftSecurityException, ThriftNotActiveServiceException, TException {
if (!manager.security.canPerformSystemActions(credentials)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
}
log.info("Tablet Server {} has reported it's shutting down", tabletServer);
manager.tserverSet.tabletServerShuttingDown(tabletServer);
}

@Override
public void reportSplitExtent(TInfo info, TCredentials credentials, String serverName,
TabletSplit split) throws ThriftSecurityException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) {
try {
List<String> workIDs =
new DistributedWorkQueue(manager.getZooKeeperRoot() + Constants.ZRECOVERY,
manager.getConfiguration(), manager.getContext()).getWorkQueued();
manager.getConfiguration(), manager).getWorkQueued();
sortsQueued.addAll(workIDs);
} catch (Exception e) {
log.warn("{}", e.getMessage(), e);
Expand Down Expand Up @@ -132,7 +132,7 @@ private void initiateSort(String sortId, String source, final String destination
throws KeeperException, InterruptedException {
String work = source + "|" + destination;
new DistributedWorkQueue(manager.getZooKeeperRoot() + Constants.ZRECOVERY,
manager.getConfiguration(), manager.getContext()).addWork(sortId, work.getBytes(UTF_8));
manager.getConfiguration(), manager).addWork(sortId, work.getBytes(UTF_8));

synchronized (this) {
sortsQueued.add(sortId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void setZooCache(ZooCache zooCache) {
protected void initializeWorkQueue(AccumuloConfiguration conf) {
workQueue =
new DistributedWorkQueue(ZooUtil.getRoot(client.instanceOperations().getInstanceId())
+ ReplicationConstants.ZOO_WORK_QUEUE, conf, this.workQueue.getContext());
+ ReplicationConstants.ZOO_WORK_QUEUE, conf, this.workQueue.getServer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
if (!loadedFailures.isEmpty()) {
DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(
Constants.ZROOT + "/" + manager.getInstanceID() + Constants.ZBULK_FAILED_COPYQ,
manager.getConfiguration(), manager.getContext());
manager.getConfiguration(), manager);

HashSet<String> workIds = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.ComparablePair;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Pair;
Expand Down Expand Up @@ -257,7 +258,7 @@ protected TabletServer(ServerOpts opts, String[] args) {
this.sessionManager = new SessionManager(context);
this.logSorter = new LogSorter(context, aconf);
@SuppressWarnings("deprecation")
var replWorker = new org.apache.accumulo.tserver.replication.ReplicationWorker(context);
var replWorker = new org.apache.accumulo.tserver.replication.ReplicationWorker(this);
this.replWorker = replWorker;
this.statsKeeper = new TabletStatsKeeper();
final int numBusyTabletsToLog = aconf.getCount(Property.TSERV_LOG_BUSY_TABLETS_COUNT);
Expand Down Expand Up @@ -789,9 +790,8 @@ public void run() {
.createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS, true);

// TODO: Remove when Property.TSERV_WORKQ_THREADS is removed
DistributedWorkQueue bulkFailedCopyQ =
new DistributedWorkQueue(getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ,
getConfiguration(), getContext());
DistributedWorkQueue bulkFailedCopyQ = new DistributedWorkQueue(
getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration(), this);
try {
bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(getContext()),
distWorkQThreadPool);
Expand All @@ -800,7 +800,7 @@ public void run() {
}

try {
logSorter.startWatchingForRecoveryLogs();
logSorter.startWatchingForRecoveryLogs(this);
} catch (Exception ex) {
log.error("Error setting watches for recoveries");
throw new RuntimeException(ex);
Expand Down Expand Up @@ -930,6 +930,19 @@ public void run() {
}
}

// Tell the Manager we are shutting down so that it doesn't try
// to assign tablets.
ManagerClientService.Client iface = managerConnection(getManagerAddress());
try {
iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(),
getClientAddressString());
} catch (TException e) {
LOG.error("Error informing Manager that we are shutting down, halting server", e);
Halt.halt("Error informing Manager that we are shutting down, exiting!", -1);
} finally {
returnManagerConnection(iface);
}

log.debug("Stopping Replication Server");
if (this.replServer != null) {
this.replServer.stop();
Expand All @@ -942,7 +955,7 @@ public void run() {
.getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8)
.numMaxThreads(16).build();

ManagerClientService.Client iface = managerConnection(getManagerAddress());
iface = managerConnection(getManagerAddress());
boolean managerDown = false;

try {
Expand Down Expand Up @@ -974,6 +987,7 @@ public void run() {
log.debug("Waiting on {} {} tablets to close.", futures.size(), level);
UtilWaitThread.sleep(1000);
}
log.debug("All {} tablets unloaded", level);
}
} finally {
if (!managerDown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
Expand Down Expand Up @@ -293,15 +294,16 @@ void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, in
}
}

public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException {
public void startWatchingForRecoveryLogs(AbstractServer server)
throws KeeperException, InterruptedException {
@SuppressWarnings("deprecation")
int threadPoolSize = this.conf.getCount(this.conf
.resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT));
ThreadPoolExecutor threadPool =
ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL)
.numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf,
context).startProcessing(new LogProcessor(), threadPool);
server).startProcessing(new LogProcessor(), threadPool);
}

public List<RecoveryStatus> getLogSorts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand All @@ -37,11 +37,11 @@
public class ReplicationWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ReplicationWorker.class);

private final ServerContext context;
private final AbstractServer server;
private ThreadPoolExecutor executor;

public ReplicationWorker(ServerContext context) {
this.context = context;
public ReplicationWorker(AbstractServer server) {
this.server = server;
}

public void setExecutor(ThreadPoolExecutor executor) {
Expand All @@ -53,7 +53,7 @@ public void run() {
DefaultConfiguration defaultConf = DefaultConfiguration.getInstance();
long defaultDelay = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
long defaultPeriod = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
AccumuloConfiguration conf = context.getConfiguration();
AccumuloConfiguration conf = server.getConfiguration();
long delay = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
long period = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
try {
Expand All @@ -62,15 +62,16 @@ public void run() {
log.debug("Configuration DistributedWorkQueue with delay and period of {} and {}", delay,
period);
workQueue = new DistributedWorkQueue(
context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, context, delay,
period);
server.getContext().getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf,
server, delay, period);
} else {
log.debug("Configuring DistributedWorkQueue with default delay and period");
workQueue = new DistributedWorkQueue(
context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, context);
server.getContext().getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf,
server);
}

workQueue.startProcessing(new ReplicationProcessor(context), executor);
workQueue.startProcessing(new ReplicationProcessor(server.getContext()), executor);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit eae2f22

Please sign in to comment.