Skip to content

Commit

Permalink
Validate compaction planner json property, increase backoff on config…
Browse files Browse the repository at this point in the history
… error

This commit changes PropUtil.vlaidateProperties such that it attempts
to parse the JSON for the CompactionService planner.opts.executors
property to ensure that it's valid JSON. This commit also changes
the CompactionManager such that when a configuration error happens
that it increases the Retry backoff for the compactions to reduce
the number of log entries.

Fixes apache#3909
  • Loading branch information
dlmarion committed Oct 31, 2023
1 parent ed82108 commit f97e4cf
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 11 deletions.
2 changes: 0 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/util/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ private Retry(long maxRetries, long startWait, long waitIncrement, long maxWait,

}

// Visible for testing
@VisibleForTesting
public void setBackOffFactor(double baskOffFactor) {
this.backOffFactor = baskOffFactor;
this.currentBackOffFactor = this.backOffFactor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.PropStoreKey;

import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;

public final class PropUtil {

private PropUtil() {}
Expand Down Expand Up @@ -70,6 +73,14 @@ protected static void validateProperties(final PropStoreKey<?> propStoreKey,
&& !ClassLoaderUtil.isValidContext(prop.getValue())) {
throw new IllegalArgumentException(
"Unable to resolve classloader for context: " + prop.getValue());
} else if (prop.getKey().startsWith(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey())
&& prop.getKey().endsWith("planner.opts.executors")) {
try {
JsonParser.parseString(prop.getValue());
} catch (JsonSyntaxException e) {
throw new IllegalArgumentException(
Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + " contains invalid JSON.");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -98,6 +99,38 @@ private void warnAboutDeprecation(String warning) {
}
}

@FunctionalInterface
private interface ConfigurationFailure {
boolean execute();
}

private static class RetryBackoffModifier {
private final AtomicBoolean failure;
private final Retry retry;

public RetryBackoffModifier(AtomicBoolean failure, Retry retry) {
super();
this.failure = failure;
this.retry = retry;
}

public void execute(ConfigurationFailure method) {
if (!method.execute()) {
// False is returned when checkForConfigChange runs into
// an error. Change the backOffFactor so that we don't
// spam the logs. It's likely a human needs to intervene
// to fix the configuration.
log.warn("Setting backoff factor to 10 due to configuration error");
retry.setBackOffFactor(10);
failure.set(true);
} else if (failure.get()) {
log.info("Re-setting backoff factor to original value due to configuration error recovery");
retry.setBackOffFactor(1.07);
failure.set(false);
}
}
}

private void mainLoop() {
long lastCheckAllTime = System.nanoTime();

Expand All @@ -108,6 +141,8 @@ private void mainLoop() {
.backOffFactor(1.07).logInterval(1, MINUTES).createFactory();
var retry = retryFactory.createRetry();
Compactable last = null;
AtomicBoolean configurationFailure = new AtomicBoolean(false);
RetryBackoffModifier retryModifier = new RetryBackoffModifier(configurationFailure, retry);

while (true) {
try {
Expand All @@ -118,7 +153,7 @@ private void mainLoop() {
new HashSet<>(runningExternalCompactions.keySet());
for (Compactable compactable : compactables) {
last = compactable;
submitCompaction(compactable);
retryModifier.execute(() -> submitCompaction(compactable));
// remove anything from snapshot that tablets know are running
compactable.getExternalCompactionIds(runningEcids::remove);
}
Expand All @@ -130,7 +165,7 @@ private void mainLoop() {
var compactable = compactablesToCheck.poll(maxTimeBetweenChecks - passed, MILLISECONDS);
if (compactable != null) {
last = compactable;
submitCompaction(compactable);
retryModifier.execute(() -> submitCompaction(compactable));
}
}

Expand All @@ -139,8 +174,7 @@ private void mainLoop() {
retry = retryFactory.createRetry();
}

checkForConfigChanges(false);

retryModifier.execute(() -> checkForConfigChanges(false));
} catch (Exception e) {
var extent = last == null ? null : last.getExtent();
log.warn("Failed to compact {} ", extent, e);
Expand All @@ -155,14 +189,17 @@ private void mainLoop() {
}

/**
* Get each configured service for the compactable tablet and submit for compaction
* Get each configured service for the compactable tablet and submit for compaction return submit
* success / failure
*/
private void submitCompaction(Compactable compactable) {
private boolean submitCompaction(Compactable compactable) {
for (CompactionKind ctype : CompactionKind.values()) {
var csid = compactable.getConfiguredService(ctype);
var service = services.get(csid);
if (service == null) {
checkForConfigChanges(true);
if (!checkForConfigChanges(true)) {
return false;
}
service = services.get(csid);
if (service == null) {
log.error(
Expand All @@ -178,6 +215,7 @@ private void submitCompaction(Compactable compactable) {
service.submitCompaction(ctype, compactable, compactablesToCheck::add);
}
}
return true;
}

public CompactionManager(Iterable<Compactable> compactables, ServerContext context,
Expand Down Expand Up @@ -222,12 +260,15 @@ public void compactableChanged(Compactable compactable) {
compactablesToCheck.add(compactable);
}

private synchronized void checkForConfigChanges(boolean force) {
/*
* return success / failure
*/
private synchronized boolean checkForConfigChanges(boolean force) {
try {
final long secondsSinceLastCheck =
NANOSECONDS.toSeconds(System.nanoTime() - lastConfigCheckTime);
if (!force && (secondsSinceLastCheck < 1)) {
return;
return true;
}

lastConfigCheckTime = System.nanoTime();
Expand Down Expand Up @@ -276,8 +317,10 @@ private synchronized void checkForConfigChanges(boolean force) {
externalExecutors.keySet().retainAll(activeExternalExecs);

}
return true;
} catch (RuntimeException e) {
log.error("Failed to reconfigure compaction services ", e);
return false;
}
}

Expand Down

0 comments on commit f97e4cf

Please sign in to comment.