Skip to content

Commit

Permalink
Fix MetricsIT and compaction queue metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Dec 13, 2023
1 parent c9d6b95 commit e864dee
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public KeySetView<CompactionExecutorId,CompactionJobPriorityQueue> getQueueIds()
return priorityQueues.keySet();
}

public CompactionJobPriorityQueue getQueue(CompactionExecutorId executorId) {
return priorityQueues.get(executorId);
}

public long getQueueMaxSize(CompactionExecutorId executorId) {
var prioQ = priorityQueues.get(executorId);
return prioQ == null ? 0 : prioQ.getMaxSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,89 @@
*/
package org.apache.accumulo.manager.metrics;

import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

public class QueueMetrics implements MetricsProducer {

private static class QueueMeters {
private final Gauge length;
private final Gauge jobsQueued;
private final Gauge jobsDequeued;
private final Gauge jobsRejected;
private final Gauge jobsLowestPriority;

public QueueMeters(MeterRegistry meterRegistry, CompactionExecutorId queueId,
CompactionJobPriorityQueue queue) {
length =
Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, queue, q -> q.getMaxSize())
.description("Length of priority queues")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
.register(meterRegistry);

jobsQueued = Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, queue, q -> q.getQueuedJobs())
.description("Count of queued jobs")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
.register(meterRegistry);

jobsDequeued = Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, queue,
q -> q.getDequeuedJobs())
.description("Count of jobs dequeued")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
.register(meterRegistry);

jobsRejected = Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, queue,
q -> q.getRejectedJobs())
.description("Count of rejected jobs")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
.register(meterRegistry);

jobsLowestPriority = Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, queue,
q -> q.getLowestPriority())
.description("Lowest priority queued job")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId.canonical()))
.register(meterRegistry);
}

private void removeMeters(MeterRegistry registry) {
registry.remove(length);
registry.remove(jobsQueued);
registry.remove(jobsDequeued);
registry.remove(jobsRejected);
registry.remove(jobsLowestPriority);
}
}

private static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(5);
private MeterRegistry meterRegistry = null;
private final CompactionJobQueues compactionJobQueues;
private AtomicLong queueCount;
private final Map<CompactionExecutorId,QueueMeters> perQueueMetrics = new HashMap<>();
private Gauge queueCountMeter = null;

public QueueMetrics(CompactionJobQueues compactionJobQueues) {
this.compactionJobQueues = compactionJobQueues;
Expand All @@ -51,46 +113,35 @@ public QueueMetrics(CompactionJobQueues compactionJobQueues) {

public void update() {

Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues,
CompactionJobQueues::getQueueCount)
.description("Number of current Queues").tags(getCommonTags()).register(meterRegistry);

for (CompactionExecutorId ceid : compactionJobQueues.getQueueIds()) {
// Normalize the queueId to match metrics tag naming convention.
String queueId = formatString(ceid.toString());

// Register queues by ID rather than by object as queues can be deleted.
Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, ceid,
compactionJobQueues::getQueueMaxSize)
.description("Length of priority queues")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry);

Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, ceid,
compactionJobQueues::getQueuedJobs)
.description("Count of queued jobs")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry);
if (queueCountMeter == null) {
queueCountMeter = Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues,
CompactionJobQueues::getQueueCount)
.description("Number of current Queues").tags(getCommonTags()).register(meterRegistry);
}
LOG.debug("update - cjq queues: {}", compactionJobQueues.getQueueIds());

Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, ceid,
compactionJobQueues::getDequeuedJobs)
.description("Count of jobs dequeued")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry);
Set<CompactionExecutorId> definedQueues = compactionJobQueues.getQueueIds();
LOG.debug("update - defined queues: {}", definedQueues);

Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, ceid,
compactionJobQueues::getRejectedJobs)
.description("Count of rejected jobs")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry);
Set<CompactionExecutorId> queuesWithMetrics = perQueueMetrics.keySet();
LOG.debug("update - queues with metrics: {}", queuesWithMetrics);

SetView<CompactionExecutorId> queuesWithoutMetrics =
Sets.difference(definedQueues, queuesWithMetrics);
queuesWithoutMetrics.forEach(q -> {
LOG.debug("update - creating meters for queue: {}", q);
perQueueMetrics.put(q, new QueueMeters(meterRegistry, q, compactionJobQueues.getQueue(q)));
});

SetView<CompactionExecutorId> metricsWithoutQueues =
Sets.difference(queuesWithMetrics, definedQueues);
metricsWithoutQueues.forEach(q -> {
LOG.debug("update - removing meters for queue: {}", q);
perQueueMetrics.get(q).removeMeters(meterRegistry);
perQueueMetrics.remove(q);
});

Gauge
.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, ceid,
compactionJobQueues::getLowestPriority)
.description("Lowest priority queued job")
.tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry);
}
}

@Override
Expand Down
65 changes: 60 additions & 5 deletions test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,26 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.conf.Property;
Expand All @@ -45,6 +49,7 @@
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
Expand All @@ -60,7 +65,7 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer {

@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(1);
return Duration.ofMinutes(3);
}

@BeforeAll
Expand Down Expand Up @@ -92,9 +97,6 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit
@Test
public void confirmMetricsPublished() throws Exception {

doWorkToGenerateMetrics();
cluster.stop();

Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS, METRICS_UPDATE_ERRORS,
METRICS_SCAN_BUSY_TIMEOUT, METRICS_SCAN_PAUSED_FOR_MEM, METRICS_SCAN_RETURN_FOR_MEM,
METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED);
Expand All @@ -114,8 +116,30 @@ public void confirmMetricsPublished() throws Exception {

List<String> statsDMetrics;

final int compactionPriorityQueueLengthBit = 0;
final int compactionPriorityQueueQueuedBit = 1;
final int compactionPriorityQueueDequeuedBit = 2;
final int compactionPriorityQueueRejectedBit = 3;
final int compactionPriorityQueuePriorityBit = 4;

final BitSet trueSet = new BitSet(5);
trueSet.set(0, 4, true);

final BitSet queueMetricsSeen = new BitSet(5);

AtomicReference<Exception> error = new AtomicReference<>();
Thread workerThread = new Thread(() -> {
try {
doWorkToGenerateMetrics();
} catch (Exception e) {
error.set(e);
}
});
workerThread.start();

// loop until we run out of lines or until we see all expected metrics
while (!(statsDMetrics = sink.getLines()).isEmpty() && !expectedMetricNames.isEmpty()) {
while (!(statsDMetrics = sink.getLines()).isEmpty() && !expectedMetricNames.isEmpty()
&& !queueMetricsSeen.intersects(trueSet)) {
// for each metric name not yet seen, check if it is expected, flaky, or unknown
statsDMetrics.stream().filter(line -> line.startsWith("accumulo"))
.map(TestStatsDSink::parseStatsDMetric).map(Metric::getName)
Expand All @@ -126,14 +150,37 @@ public void confirmMetricsPublished() throws Exception {
} else if (flakyMetrics.contains(name)) {
// ignore any flaky metric names seen
// these aren't always expected, but we shouldn't be surprised if we see them
} else if (name.startsWith("accumulo.compactor.queue")) {
// Compactor queue metrics are not guaranteed to be emitted
// during the call to doWorkToGenerateMetrics above. This will
// flip a bit in the BitSet when each metric is seen. The top-level
// loop will continue to iterate until all the metrics are seen.
seenMetricNames.put(name, expectedMetricNames.remove(name));
if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.equals(name)) {
queueMetricsSeen.set(compactionPriorityQueueLengthBit, true);
} else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.equals(name)) {
queueMetricsSeen.set(compactionPriorityQueueQueuedBit, true);
} else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED.equals(name)) {
queueMetricsSeen.set(compactionPriorityQueueDequeuedBit, true);
} else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED.equals(name)) {
queueMetricsSeen.set(compactionPriorityQueueRejectedBit, true);
} else if (METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.equals(name)) {
queueMetricsSeen.set(compactionPriorityQueuePriorityBit, true);
}
} else {
// completely unexpected metric
fail("Found accumulo metric not in expectedMetricNames or flakyMetricNames: " + name);
}
});
Thread.sleep(4_000);
}
assertTrue(expectedMetricNames.isEmpty(),
"Did not see all expected metric names, missing: " + expectedMetricNames.values());

workerThread.join();
assertNull(error.get());
cluster.stop();

}

private void doWorkToGenerateMetrics() throws Exception {
Expand Down Expand Up @@ -167,6 +214,14 @@ private void doWorkToGenerateMetrics() throws Exception {
try (Scanner scanner = client.createScanner(tableName)) {
scanner.forEach((k, v) -> {});
}
// Start a compaction with the slow iterator to ensure that the compaction queues
// are not removed quickly
CompactionConfig cc = new CompactionConfig();
IteratorSetting is = new IteratorSetting(100, "slow", SlowIterator.class);
SlowIterator.setSleepTime(is, 3000);
cc.setIterators(List.of(is));
cc.setWait(false);
client.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
client.tableOperations().delete(tableName);
while (client.tableOperations().exists(tableName)) {
Thread.sleep(1000);
Expand Down

0 comments on commit e864dee

Please sign in to comment.