From 2a12b1342957d8af5ee7211b1bafbc25af820e9c Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 16 Jan 2025 17:59:10 -0500 Subject: [PATCH] Used data size instead of entry size of compaction queue (#5252) Modified the compaction queue limit to use the data size of the compaction jobs instead of the number of compaction jobs for limiting the number of compaction jobs buffered. fixes #5186 --- .../apache/accumulo/core/conf/Property.java | 16 +-- .../MiniAccumuloConfigImpl.java | 4 +- .../coordinator/CompactionCoordinator.java | 17 +-- .../queue/CompactionJobPriorityQueue.java | 21 ++- .../compaction/queue/CompactionJobQueues.java | 12 +- .../compaction/queue/SizeTrackingTreeMap.java | 129 ++++++++++++++++++ .../queue/CompactionJobPriorityQueueTest.java | 17 ++- .../queue/CompactionJobQueuesTest.java | 8 +- .../queue/SizeTrackingTreeMapTest.java | 113 +++++++++++++++ .../CompactionPriorityQueueMetricsIT.java | 4 +- 10 files changed, 285 insertions(+), 56 deletions(-) create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java create mode 100644 server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index c5bcdf4d452..555785f5806 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -465,14 +465,12 @@ public enum Property { "The number of threads used to seed fate split task, the actual split work is done by fate" + " threads.", "4.0.0"), - - MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE( - "manager.compaction.major.service.queue.initial.size", "10000", PropertyType.COUNT, - "The initial size of each resource groups compaction job priority queue.", "4.0.0"), - MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR( - "manager.compaction.major.service.queue.size.factor", "3.0", PropertyType.FRACTION, - "The dynamic resizing of the compaction job priority queue is based on" - + " the number of compactors for the group multiplied by this factor.", + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", + "1M", PropertyType.MEMORY, + "The data size of each resource groups compaction job priority queue. The memory size of " + + "each compaction job is estimated and the sum of these sizes per resource group will not " + + "exceed this setting. When the size is exceeded the lowest priority jobs are dropped as " + + "needed.", "4.0.0"), SPLIT_PREFIX("split.", null, PropertyType.PREFIX, "System wide properties related to splitting tablets.", "3.1.0"), @@ -1460,7 +1458,7 @@ public static boolean isValidTablePropertyKey(String key) { RPC_MAX_MESSAGE_SIZE, // compaction coordiantor properties - MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE, + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, // block cache options GENERAL_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index 5b81fac4684..b5c6667519f 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@ -188,8 +188,8 @@ MiniAccumuloConfigImpl initialize() { mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true"); - mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(), - Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue()); + mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(), + Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue()); mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(), Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 4dc42c11379..f1e6f54a5cd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -194,7 +194,7 @@ public class CompactionCoordinator private final Manager manager; private final LoadingCache compactorCounts; - private final int jobQueueInitialSize; + private final long jobQueueInitialSize; private volatile long coordinatorStartTime; @@ -208,8 +208,8 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, this.security = security; this.manager = Objects.requireNonNull(manager); - this.jobQueueInitialSize = ctx.getConfiguration() - .getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE); + this.jobQueueInitialSize = + ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE); this.jobQueues = new CompactionJobQueues(jobQueueInitialSize); @@ -1121,8 +1121,6 @@ private void cleanUpEmptyCompactorPathInZK() { final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; final var zoorw = this.ctx.getZooSession().asReaderWriter(); - final double queueSizeFactor = ctx.getConfiguration() - .getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR); try { var groups = zoorw.getChildren(compactorQueuesPath); @@ -1139,7 +1137,6 @@ private void cleanUpEmptyCompactorPathInZK() { CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); if (queue != null) { queue.clearIfInactive(Duration.ofMinutes(10)); - queue.setMaxSize(this.jobQueueInitialSize); } } else { int aliveCompactorsForGroup = 0; @@ -1152,16 +1149,8 @@ private void cleanUpEmptyCompactorPathInZK() { aliveCompactorsForGroup++; } } - CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); - if (queue != null) { - queue.setMaxSize(Math.min( - Math.max(1, (int) (aliveCompactorsForGroup * queueSizeFactor)), Integer.MAX_VALUE)); - } - } - } - } catch (KeeperException | RuntimeException e) { LOG.warn("Failed to clean up compactors", e); } catch (InterruptedException e) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 1f9738dac78..f183b50b86f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -31,11 +31,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -116,8 +114,8 @@ public boolean equals(Object o) { // behavior is not supported with a PriorityQueue. Second a PriorityQueue does not support // efficiently removing entries from anywhere in the queue. Efficient removal is needed for the // case where tablets decided to issues different compaction jobs than what is currently queued. - private final TreeMap jobQueue; - private final AtomicInteger maxSize; + private final SizeTrackingTreeMap jobQueue; + private final AtomicLong maxSize; private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; private final ArrayDeque> futures; @@ -142,9 +140,10 @@ private TabletJobs(long generation, HashSet jobs) { private final AtomicLong nextSeq = new AtomicLong(0); - public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { - this.jobQueue = new TreeMap<>(); - this.maxSize = new AtomicInteger(maxSize); + public CompactionJobPriorityQueue(CompactorGroupId groupId, long maxSize, + SizeTrackingTreeMap.Weigher weigher) { + this.jobQueue = new SizeTrackingTreeMap<>(weigher); + this.maxSize = new AtomicLong(maxSize); this.tabletJobs = new HashMap<>(); this.groupId = groupId; this.rejectedJobs = new AtomicLong(0); @@ -230,11 +229,11 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection 0, "Maximum size of the Compaction job priority queue must be greater than 0"); this.maxSize.set(maxSize); @@ -249,7 +248,7 @@ public long getDequeuedJobs() { } public synchronized long getQueuedJobs() { - return jobQueue.size(); + return jobQueue.entrySize(); } public synchronized long getLowestPriority() { @@ -332,7 +331,7 @@ private void removePreviousSubmissions(KeyExtent extent, boolean removeJobAges) } private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) { - if (jobQueue.size() >= maxSize.get()) { + if (jobQueue.dataSize() >= maxSize.get()) { var lastEntry = jobQueue.lastKey(); if (job.getPriority() <= lastEntry.job.getPriority()) { // the queue is full and this job has a lower or same priority than the lowest job in the diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index b9fe1ed424c..2e2dc3cef95 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -48,18 +48,20 @@ public class CompactionJobQueues { private final ConcurrentHashMap priorityQueues = new ConcurrentHashMap<>(); - private final int queueSize; + private final long queueSize; private final Map currentGenerations; - public CompactionJobQueues(int queueSize) { + private SizeTrackingTreeMap.Weigher weigher = + val -> val.getTabletMetadata().toString().length() + val.getJob().toString().length(); + + public CompactionJobQueues(long queueSize) { this.queueSize = queueSize; Map cg = new EnumMap<>(DataLevel.class); for (var level : DataLevel.values()) { cg.put(level, new AtomicLong()); } currentGenerations = Collections.unmodifiableMap(cg); - } public void beginFullScan(DataLevel level) { @@ -164,7 +166,7 @@ public TabletMetadata getTabletMetadata() { */ public CompletableFuture getAsync(CompactorGroupId groupId) { var pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize)); + gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher)); return pq.getAsync(); } @@ -187,7 +189,7 @@ private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, } var pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize)); + gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher)); pq.add(tabletMetadata, jobs, currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java new file mode 100644 index 00000000000..306a56fb647 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.TreeMap; + +import com.google.common.base.Preconditions; + +/** + * This class wraps a treemap and tracks the data size of everything added and removed from the + * treemap. + */ +class SizeTrackingTreeMap { + + private static class ValueWrapper { + final V2 val; + final long computedSize; + + private ValueWrapper(V2 val, long computedSize) { + this.val = val; + this.computedSize = computedSize; + } + } + + private final TreeMap> map = new TreeMap<>(); + private long dataSize = 0; + private Weigher weigher; + + private Map.Entry unwrap(Map.Entry> wrapperEntry) { + if (wrapperEntry == null) { + return null; + } + return new AbstractMap.SimpleImmutableEntry<>(wrapperEntry.getKey(), + wrapperEntry.getValue().val); + } + + private void incrementDataSize(ValueWrapper val) { + Preconditions.checkState(dataSize >= 0); + dataSize += val.computedSize; + } + + private void decrementDataSize(Map.Entry> entry) { + if (entry != null) { + decrementDataSize(entry.getValue()); + } + } + + private void decrementDataSize(ValueWrapper val) { + if (val != null) { + Preconditions.checkState(dataSize >= val.computedSize); + dataSize -= val.computedSize; + } + } + + interface Weigher { + long weigh(V2 val); + } + + public SizeTrackingTreeMap(Weigher weigher) { + this.weigher = weigher; + } + + public boolean isEmpty() { + return map.isEmpty(); + } + + public long dataSize() { + return dataSize; + } + + public int entrySize() { + return map.size(); + } + + public K lastKey() { + return map.lastKey(); + } + + public Map.Entry firstEntry() { + return unwrap(map.firstEntry()); + } + + public void remove(K key) { + var prev = map.remove(key); + decrementDataSize(prev); + } + + public Map.Entry pollFirstEntry() { + var first = map.pollFirstEntry(); + decrementDataSize(first); + return unwrap(first); + } + + public Map.Entry pollLastEntry() { + var last = map.pollLastEntry(); + decrementDataSize(last); + return unwrap(last); + } + + public void put(K key, V val) { + var wrapped = new ValueWrapper<>(val, weigher.weigh(val)); + var prev = map.put(key, wrapped); + decrementDataSize(prev); + incrementDataSize(wrapped); + } + + public void clear() { + map.clear(); + dataSize = 0; + } +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 37213cdc488..01c18798754 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -74,7 +74,7 @@ public void testTabletFileReplacement() { EasyMock.replay(tm, cj1, cj2); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); assertEquals(1, queue.add(tm, List.of(cj1), 1L)); MetaJob job = queue.peek(); @@ -129,7 +129,7 @@ public void testAddEqualToMaxSize() { EasyMock.replay(tm, cj1, cj2); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); EasyMock.verify(tm, cj1, cj2); @@ -186,7 +186,7 @@ public void testAddMoreThanMax() { EasyMock.replay(tm, cj1, cj2, cj3); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L)); EasyMock.verify(tm, cj1, cj2, cj3); @@ -247,7 +247,7 @@ public void test() { TreeSet expected = new TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 1000, mj -> 10); // create and add 1000 jobs for (int x = 0; x < 1000; x++) { @@ -256,7 +256,7 @@ public void test() { expected.add(pair.getSecond()); } - assertEquals(100, queue.getMaxSize()); + assertEquals(1000, queue.getMaxSize()); assertEquals(100, queue.getQueuedJobs()); assertEquals(900, queue.getRejectedJobs()); // There should be 1000 total job ages even though 900 were rejected @@ -268,7 +268,7 @@ public void test() { assertTrue(stats.getMaxAge().toMillis() > 0); assertTrue(stats.getAvgAge().toMillis() > 0); - // iterate over the expected set and make sure that they next job in the queue + // iterate over the expected set and make sure that the next job in the queue // matches int matchesSeen = 0; for (CompactionJob expectedJob : expected) { @@ -312,7 +312,7 @@ public void test() { */ @Test public void testAsyncCancelCleanup() { - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100, mj -> 1); List> futures = new ArrayList<>(); @@ -342,7 +342,7 @@ public void testAsyncCancelCleanup() { @Test public void testChangeMaxSize() { - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100, mj -> 1); assertEquals(100, queue.getMaxSize()); queue.setMaxSize(50); assertEquals(50, queue.getMaxSize()); @@ -351,5 +351,4 @@ public void testChangeMaxSize() { // Make sure previous value was not changed after invalid setting assertEquals(50, queue.getMaxSize()); } - } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 3d8933fa386..f63e56cc490 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -81,7 +81,7 @@ public void testFullScanHandling() throws Exception { var cg2 = CompactorGroupId.of("CG2"); var cg3 = CompactorGroupId.of("CG3"); - CompactionJobQueues jobQueues = new CompactionJobQueues(100); + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); jobQueues.beginFullScan(DataLevel.USER); @@ -247,7 +247,7 @@ public void testFullScanLevels() throws Exception { var cg1 = CompactorGroupId.of("CG1"); - CompactionJobQueues jobQueues = new CompactionJobQueues(100); + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); @@ -283,7 +283,7 @@ public void testAddPollRaceCondition() throws Exception { final int numToAdd = 100_000; - CompactionJobQueues jobQueues = new CompactionJobQueues(numToAdd + 1); + CompactionJobQueues jobQueues = new CompactionJobQueues(10000000); CompactorGroupId[] groups = Stream.of("G1", "G2", "G3").map(CompactorGroupId::of).toArray(CompactorGroupId[]::new); @@ -342,7 +342,7 @@ public void testAddPollRaceCondition() throws Exception { @Test public void testGetAsync() throws Exception { - CompactionJobQueues jobQueues = new CompactionJobQueues(100); + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); var tid = TableId.of("1"); var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java new file mode 100644 index 00000000000..384363228f0 --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +import org.junit.jupiter.api.Test; + +public class SizeTrackingTreeMapTest { + @Test + public void testSizeTracking() { + List computeSizeCalls = new ArrayList<>(); + var stmap = new SizeTrackingTreeMap(val -> { + computeSizeCalls.add(val); + return val.length(); + }); + + TreeMap expected = new TreeMap<>(); + + check(expected, stmap); + assertEquals(List.of(), computeSizeCalls); + + stmap.put(3, "1234567890"); + expected.put(3, "1234567890"); + check(expected, stmap); + assertEquals(List.of("1234567890"), computeSizeCalls); + + stmap.put(4, "12345"); + expected.put(4, "12345"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345"), computeSizeCalls); + + // remove a key that does not exist + stmap.remove(2); + expected.remove(2); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345"), computeSizeCalls); + + // remove a key that does exist + stmap.remove(3); + expected.remove(3); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345"), computeSizeCalls); + + // update an existing key, should decrement the old size and increment the new size + stmap.put(4, "123"); + expected.put(4, "123"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123"), computeSizeCalls); + + stmap.put(7, "123456789012345"); + expected.put(7, "123456789012345"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345"), computeSizeCalls); + + stmap.put(11, "1234567"); + expected.put(11, "1234567"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + + assertEquals(expected.pollFirstEntry(), stmap.pollFirstEntry()); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + + assertEquals(expected.pollLastEntry(), stmap.pollLastEntry()); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + + expected.clear(); + stmap.clear(); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + } + + private void check(TreeMap expected, SizeTrackingTreeMap stmap) { + long expectedDataSize = expected.values().stream().mapToLong(String::length).sum(); + assertEquals(expectedDataSize, stmap.dataSize()); + assertEquals(expected.size(), stmap.entrySize()); + assertEquals(expected.isEmpty(), stmap.isEmpty()); + assertEquals(expected.firstEntry(), stmap.firstEntry()); + if (expected.isEmpty()) { + assertThrows(NoSuchElementException.class, stmap::lastKey); + } else { + assertEquals(expected.lastKey(), stmap.lastKey()); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index bdee45f9353..2a032565953 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -109,7 +109,7 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { public static final String QUEUE1 = "METRICSQ1"; public static final String QUEUE1_METRIC_LABEL = MetricsUtil.formatString(QUEUE1); public static final String QUEUE1_SERVICE = "Q1"; - public static final int QUEUE1_SIZE = 6; + public static final int QUEUE1_SIZE = 10 * 1024; // Metrics collector Thread final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); @@ -202,7 +202,7 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE1_SERVICE + ".planner.opts.groups", "[{'group':'" + QUEUE1 + "'}]"); - cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE, "6"); + cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, "10K"); cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0); // This test waits for dead compactors to be absent in zookeeper. The following setting will