Skip to content

Commit

Permalink
Enable reporting of TabletManagementIterator errors
Browse files Browse the repository at this point in the history
Enable the TabletManagementIterator to report error's back
to the TabletGroupWatcher when trying to determine actions
that should be taken for an extent. Increment metric gauges
when an error occurs.

Fixes apache#3469
  • Loading branch information
dlmarion committed Oct 19, 2023
1 parent 06c8ca5 commit 5a4e19b
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TabletManagement {
ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, ColumnType.FILES, ColumnType.LAST,
ColumnType.OPID, ColumnType.ECOMP, ColumnType.DIR, ColumnType.SELECTED);

private static final Text ERROR_COLUMN_NAME = new Text("ERROR");
private static final Text REASONS_COLUMN_NAME = new Text("REASONS");

private static final Text EMPTY = new Text("");
Expand All @@ -62,12 +63,20 @@ public static void addActions(final SortedMap<Key,Value> decodedRow,
decodedRow.put(reasonsKey, reasonsValue);
}

public final Set<ManagementAction> actions;
public final TabletMetadata tabletMetadata;
public static void addError(final SortedMap<Key,Value> decodedRow, final Exception error) {
final Key errorKey = new Key(decodedRow.firstKey().getRow(), ERROR_COLUMN_NAME, EMPTY);
final Value errorValue = new Value(error.getMessage());
decodedRow.put(errorKey, errorValue);
}

private final Set<ManagementAction> actions;
private final TabletMetadata tabletMetadata;
private final String errorMessage;

public TabletManagement(Set<ManagementAction> actions, TabletMetadata tm) {
public TabletManagement(Set<ManagementAction> actions, TabletMetadata tm, String errorMessage) {
this.actions = actions;
this.tabletMetadata = tm;
this.errorMessage = errorMessage;
}

public TabletManagement(Key wholeRowKey, Value wholeRowValue) throws IOException {
Expand All @@ -77,9 +86,21 @@ public TabletManagement(Key wholeRowKey, Value wholeRowValue) throws IOException
public TabletManagement(Key wholeRowKey, Value wholeRowValue, boolean saveKV) throws IOException {
final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(wholeRowKey, wholeRowValue);
Text row = decodedRow.firstKey().getRow();
Value val = decodedRow.remove(new Key(row, REASONS_COLUMN_NAME, EMPTY));
// Decode any errors that happened on the TabletServer
Value errorValue = decodedRow.remove(new Key(row, ERROR_COLUMN_NAME, EMPTY));
if (errorValue != null) {
this.errorMessage = errorValue.toString();
} else {
this.errorMessage = null;
}
// Decode the ManagementActions if it exists
Value actionValue = decodedRow.remove(new Key(row, REASONS_COLUMN_NAME, EMPTY));
Set<ManagementAction> actions = new HashSet<>();
Splitter.on(',').split(val.toString()).forEach(a -> actions.add(ManagementAction.valueOf(a)));
if (actionValue != null) {
Splitter.on(',').split(actionValue.toString())
.forEach(a -> actions.add(ManagementAction.valueOf(a)));
}

TabletMetadata tm = TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
CONFIGURED_COLUMNS, saveKV, true);
this.actions = actions;
Expand All @@ -94,6 +115,10 @@ public TabletMetadata getTabletMetadata() {
return tabletMetadata;
}

public String getErrorMessage() {
return errorMessage;
}

@Override
public String toString() {
return actions.toString() + "," + tabletMetadata.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,27 @@
* <td>Counter</td>
* <td></td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@link #METRICS_MANAGER_ROOT_TGW_ERRORS}</td>
* <td>Gauge</td>
* <td></td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@link #METRICS_MANAGER_META_TGW_ERRORS}</td>
* <td>Gauge</td>
* <td></td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@link #METRICS_MANAGER_USER_TGW_ERRORS}</td>
* <td>Gauge</td>
* <td></td>
* </tr>
* </table>
*
* @since 2.1.0
Expand Down Expand Up @@ -674,6 +695,11 @@ public interface MetricsProducer {
String METRICS_GC_POST_OP_DURATION = METRICS_GC_PREFIX + "post.op.duration";
String METRICS_GC_RUN_CYCLE = METRICS_GC_PREFIX + "run.cycle";

String METRICS_MANAGER_PREFIX = "accumulo.manager.";
String METRICS_MANAGER_ROOT_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tgw.root.errors";
String METRICS_MANAGER_META_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tgw.meta.errors";
String METRICS_MANAGER_USER_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tgw.user.errors";

String METRICS_MAJC_PREFIX = "accumulo.tserver.compactions.majc.";
String METRICS_MAJC_QUEUED = METRICS_MAJC_PREFIX + "queued";
String METRICS_MAJC_RUNNING = METRICS_MAJC_PREFIX + "running";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,20 +429,33 @@ protected void consume() throws IOException {
TabletManagement.CONFIGURED_COLUMNS, false, true);

actions.clear();
if (managerState != ManagerState.NORMAL || current.isEmpty() || onlineTables.isEmpty()) {
// when manager is in the process of starting up or shutting down return everything.
actions.add(ManagementAction.NEEDS_LOCATION_UPDATE);
} else {
LOG.trace("Evaluating extent: {}", tm);
computeTabletManagementActions(tm, actions);
Exception error = null;
try {
if (managerState != ManagerState.NORMAL || current.isEmpty() || onlineTables.isEmpty()) {
// when manager is in the process of starting up or shutting down return everything.
actions.add(ManagementAction.NEEDS_LOCATION_UPDATE);
} else {
LOG.trace("Evaluating extent: {}", tm);
computeTabletManagementActions(tm, actions);
}
} catch (Exception e) {
LOG.error("Error computing tablet management actions for extent: {}", tm.getExtent(), e);
error = e;
}

if (!actions.isEmpty()) {
// If we simply returned here, then the client would get the encoded K,V
// from the WholeRowIterator. However, it would not know the reason(s) why
// it was returned. Insert a K,V pair to represent the reasons. The client
// can pull this K,V pair from the results by looking at the colf.
TabletManagement.addActions(decodedRow, actions);
if (!actions.isEmpty() || error != null) {
if (error != null) {
// Insert the error into K,V pair representing
// the tablet metadata.
TabletManagement.addError(decodedRow, error);
}
if (!actions.isEmpty()) {
// If we simply returned here, then the client would get the encoded K,V
// from the WholeRowIterator. However, it would not know the reason(s) why
// it was returned. Insert a K,V pair to represent the reasons. The client
// can pull this K,V pair from the results by looking at the colf.
TabletManagement.addActions(decodedRow, actions);
}
topKey = decodedRow.firstKey();
topValue = WholeRowIterator.encodeRow(new ArrayList<>(decodedRow.keySet()),
new ArrayList<>(decodedRow.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ public TabletManagement next() {
Entry<Key,Value> e = iter.next();
try {
TabletManagement tm = TabletManagementIterator.decode(e);
log.trace("Returning metadata tablet, extent: {}, hostingGoal: {}, actions: {}",
log.trace("Returning metadata tablet, extent: {}, hostingGoal: {}, actions: {}, error: {}",
tm.getTabletMetadata().getExtent(), tm.getTabletMetadata().getHostingGoal(),
tm.getActions());
tm.getActions(), tm.getErrorMessage());
return tm;
} catch (IOException e1) {
throw new RuntimeException("Error creating TabletMetadata object", e1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,24 @@ public boolean hasNext() {
@Override
public TabletManagement next() {
finished = true;
TabletMetadata tm = ample.readTablet(RootTable.EXTENT, ReadConsistency.EVENTUAL);

var actions = EnumSet.of(ManagementAction.NEEDS_LOCATION_UPDATE);

CompactionJobGenerator cjg =
new CompactionJobGenerator(new ServiceEnvironmentImpl(ctx), Map.of());
var jobs = cjg.generateJobs(tm,
EnumSet.of(CompactionKind.SYSTEM, CompactionKind.USER, CompactionKind.SELECTOR));
if (!jobs.isEmpty()) {
actions.add(ManagementAction.NEEDS_COMPACTING);
final var actions = EnumSet.of(ManagementAction.NEEDS_LOCATION_UPDATE);
final TabletMetadata tm = ample.readTablet(RootTable.EXTENT, ReadConsistency.EVENTUAL);
String error = null;
try {
CompactionJobGenerator cjg =
new CompactionJobGenerator(new ServiceEnvironmentImpl(ctx), Map.of());
var jobs = cjg.generateJobs(tm,
EnumSet.of(CompactionKind.SYSTEM, CompactionKind.USER, CompactionKind.SELECTOR));
if (!jobs.isEmpty()) {
actions.add(ManagementAction.NEEDS_COMPACTING);
}
} catch (Exception e) {
log.error("Error computing tablet management actions for Root extent", e);
error = e.getMessage();
}
return new TabletManagement(actions, tm, error);

return new TabletManagement(actions, tm);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ public void testEncodeDecodeWithReasons() throws Exception {
assertEquals(actions, tmi.getActions());
}

@Test
public void testEncodeDecodeWithErrors() throws Exception {
KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da"));

final SortedMap<Key,Value> entries = createMetadataEntryKV(extent);

TabletManagement.addError(entries, new UnsupportedOperationException("Not supported."));
Key key = entries.firstKey();
Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()),
new ArrayList<>(entries.values()));

// Remove the ERROR column from the entries map for the comparison check
// below
entries.remove(new Key(key.getRow().toString(), "ERROR", ""));

TabletManagement tmi = new TabletManagement(key, val, true);
assertEquals(entries, tmi.getTabletMetadata().getKeyValues());
assertEquals("Not supported.", tmi.getErrorMessage());
}

@Test
public void testBinary() throws Exception {
// test end row with non ascii data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ public class GarbageCollectWriteAheadLogsTest {
try {
tabletAssignedToServer1 = new TabletManagement(Set.of(),
TabletMetadata.builder(extent).putLocation(Location.current(server1))
.putHostingGoal(TabletHostingGoal.ALWAYS).build(LAST, SUSPEND, LOGS));
.putHostingGoal(TabletHostingGoal.ALWAYS).build(LAST, SUSPEND, LOGS),
"");
tabletAssignedToServer2 = new TabletManagement(Set.of(),
TabletMetadata.builder(extent).putLocation(Location.current(server2))
.putHostingGoal(TabletHostingGoal.NEVER).build(LAST, SUSPEND, LOGS));
.putHostingGoal(TabletHostingGoal.NEVER).build(LAST, SUSPEND, LOGS),
"");
} catch (Exception ex) {
throw new RuntimeException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,10 +1163,10 @@ public void run() {
managerUpgrading.set(true);
}

ManagerMetrics mm = new ManagerMetrics(getConfiguration(), this);
try {
MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
sa.getAddress());
ManagerMetrics mm = new ManagerMetrics(getConfiguration(), this);
MetricsUtil.initializeProducers(this, mm);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | NoSuchMethodException
Expand Down Expand Up @@ -1221,15 +1221,15 @@ public void process(WatchedEvent event) {
this.splitter = new Splitter(context);
this.splitter.start();

watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null) {
watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null, mm) {
@Override
boolean canSuspendTablets() {
// Always allow user data tablets to enter suspended state.
return true;
}
});

watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0)) {
watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), mm) {
@Override
boolean canSuspendTablets() {
// Allow metadata tablets to enter suspended state only if so configured. Generally
Expand All @@ -1240,7 +1240,7 @@ boolean canSuspendTablets() {
}
});

watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1)) {
watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), mm) {
@Override
boolean canSuspendTablets() {
// Never allow root tablet to enter suspended state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
import org.apache.accumulo.manager.Manager.TabletGoalState;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.split.SplitTask;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.state.TableStats;
Expand Down Expand Up @@ -122,14 +123,17 @@ public Text getEncodedEndRow() {
final TableStats stats = new TableStats();
private SortedSet<TServerInstance> lastScanServers = Collections.emptySortedSet();
private final EventHandler eventHandler;
private final ManagerMetrics metrics;

private WalStateManager walStateManager;

TabletGroupWatcher(Manager manager, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
TabletGroupWatcher(Manager manager, TabletStateStore store, TabletGroupWatcher dependentWatcher,
ManagerMetrics metrics) {
super("Watching " + store.name());
this.manager = manager;
this.store = store;
this.dependentWatcher = dependentWatcher;
this.metrics = metrics;
this.walStateManager = new WalStateManager(manager.getContext());
this.eventHandler = new EventHandler();
manager.getEventCoordinator().addListener(store.getLevel(), eventHandler);
Expand Down Expand Up @@ -330,6 +334,17 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
final Set<ManagementAction> actions = mti.getActions();
final TabletMetadata tm = mti.getTabletMetadata();

final String mtiError = mti.getErrorMessage();
if (mtiError != null) {
// An error happened on the TabletServer in the TabletManagementIterator
// when trying to process this extent.
LOG.warn(
"Error on TabletServer trying to get Tablet management information for extent: {}. Error message: {}",
tm.getExtent(), mtiError);
this.metrics.incrementTabletGroupWatcherError(this.store.getLevel());
continue;
}

if (tm.isFutureAndCurrentLocationSet()) {
throw new BadLocationStateException(
tm.getExtent() + " is both assigned and hosted, which should never happen: " + this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import static java.util.Objects.requireNonNull;

import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.metrics.fate.FateMetrics;

Expand All @@ -33,6 +37,10 @@ public class ManagerMetrics implements MetricsProducer {
private final FateMetrics fateMetrics;
private final QueueMetrics queueMetrics;

private AtomicLong rootTGWErrorsGauge;
private AtomicLong metadataTGWErrorsGauge;
private AtomicLong userTGWErrorsGauge;

public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) {
requireNonNull(conf, "AccumuloConfiguration must not be null");
requireNonNull(conf, "Manager must not be null");
Expand All @@ -41,9 +49,31 @@ public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) {
queueMetrics = new QueueMetrics(manager.getCompactionQueues());
}

public void incrementTabletGroupWatcherError(DataLevel level) {
switch (level) {
case METADATA:
metadataTGWErrorsGauge.incrementAndGet();
break;
case ROOT:
rootTGWErrorsGauge.incrementAndGet();
break;
case USER:
userTGWErrorsGauge.incrementAndGet();
break;
default:
throw new IllegalStateException("Unhandled DataLevel: " + level);
}
}

@Override
public void registerMetrics(MeterRegistry registry) {
fateMetrics.registerMetrics(registry);
queueMetrics.registerMetrics(registry);
rootTGWErrorsGauge = registry.gauge(METRICS_MANAGER_ROOT_TGW_ERRORS,
MetricsUtil.getCommonTags(), new AtomicLong(0));
metadataTGWErrorsGauge = registry.gauge(METRICS_MANAGER_META_TGW_ERRORS,
MetricsUtil.getCommonTags(), new AtomicLong(0));
userTGWErrorsGauge = registry.gauge(METRICS_MANAGER_USER_TGW_ERRORS,
MetricsUtil.getCommonTags(), new AtomicLong(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public void confirmMetricsPublished() throws Exception {

Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS, METRICS_UPDATE_ERRORS,
METRICS_SCAN_BUSY_TIMEOUT, METRICS_SCAN_PAUSED_FOR_MEM, METRICS_SCAN_RETURN_FOR_MEM,
METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED, METRICS_MAJC_QUEUED, METRICS_MAJC_RUNNING);
METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED, METRICS_MAJC_QUEUED, METRICS_MAJC_RUNNING,
METRICS_MANAGER_ROOT_TGW_ERRORS, METRICS_MANAGER_META_TGW_ERRORS,
METRICS_MANAGER_USER_TGW_ERRORS);
Set<String> flakyMetrics = Set.of(METRICS_GC_WAL_ERRORS, METRICS_FATE_TYPE_IN_PROGRESS,
METRICS_PROPSTORE_EVICTION_COUNT, METRICS_PROPSTORE_REFRESH_COUNT,
METRICS_PROPSTORE_REFRESH_LOAD_COUNT, METRICS_PROPSTORE_ZK_ERROR_COUNT,
Expand Down

0 comments on commit 5a4e19b

Please sign in to comment.