diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 2d44d14d5f1..7af245fde3c 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -437,85 +437,43 @@ public void testCompactionQueueClearedWhenNotNeeded() throws Exception { writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); } c.tableOperations().importDirectory(dir).to(tableName).load(); - - try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { - // Get each tablet's file sizes - for (TabletMetadata tablet : tm) { - long fileSize = tablet.getFiles().size(); - log.debug("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); - } - } verifyData(c, tableName, 0, 100 * 100 - 1, false); } final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis(); - Wait.waitFor(() -> { - while (!queueMetrics.isEmpty()) { - var qm = queueMetrics.take(); - if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) - && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - if (Integer.parseInt(qm.getValue()) > 0) { - return true; - } - } - } - return false; - }, 60_000, sleepMillis, "did not see Q1 metrics"); - - Wait.waitFor(() -> { - int queueSize = 0; - boolean sawQueues = false; - while (!queueMetrics.isEmpty()) { - var metric = queueMetrics.take(); - if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - queueSize = Integer.parseInt(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { - sawQueues = true; - } else { - log.debug("Other metric not used in the test: {}", metric); - } - } - return queueSize == QUEUE1_SIZE && sawQueues; - }, 60_000, sleepMillis, "did not see the expected number of queued compactions"); + // wait for compaction jobs to be queued + Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis, + "Expected to see compaction jobs queued"); // change compactor settings so that compactions no longer need to run context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000"); context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000"); // wait for queue to clear - Wait.waitFor(() -> { - int jobsQueued = QUEUE1_SIZE; - int queueLength = 0; - long dequeued = 0; - long rejected = 0; - while (!queueMetrics.isEmpty()) { - var metric = queueMetrics.take(); - if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - jobsQueued = Integer.parseInt(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - queueLength = Integer.parseInt(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - rejected = Long.parseLong(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - dequeued = Long.parseLong(metric.getValue()); - } + Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis, + "Expected job queue to be cleared once compactions no longer need to happen"); + } + + /** + * @return the number of jobs queued in the compaction queue. Returns -1 if no metrics are found. + */ + private int getJobsQueued() throws InterruptedException { + Integer jobsQueued = null; + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + jobsQueued = Integer.parseInt(metric.getValue()); } - log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs Rejected: {}", queueLength, - jobsQueued, dequeued, rejected); - return jobsQueued == 0; - }, 60_000, sleepMillis, - "expected job queue to be cleared once compactions no longer need to happen"); + } + if (jobsQueued == null) { + log.warn("No compaction job queue metrics found."); + return -1; + } + log.info("Jobs Queued: {}", jobsQueued); + return jobsQueued; } }