diff --git a/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java b/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java index a11b7a24714..11b0182bb3f 100644 --- a/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java +++ b/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java @@ -28,9 +28,11 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.Comparator; @@ -55,6 +57,8 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; /** @@ -560,4 +564,26 @@ public String obscured() { return Base64.getEncoder().encodeToString(digester.digest()); } + public String toBase64() { + DataOutputBuffer buffer = new DataOutputBuffer(); + try { + writeTo(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength())); + } + + public static KeyExtent fromBase64(String encoded) { + byte[] data = Base64.getDecoder().decode(encoded); + DataInputBuffer buffer = new DataInputBuffer(); + buffer.reset(data, data.length); + try { + return KeyExtent.readFrom(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } diff --git a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java index c9716b561ea..99b1e294626 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java @@ -36,7 +36,6 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -147,20 +146,18 @@ public static void selected(FateId fateId, KeyExtent extent, Collections2.transform(inputs, StoredTabletFile::toMinimalString)); } - public static void compacting(TabletMetadata tabletMetadata, ExternalCompactionId cid, + public static void compacting(KeyExtent extent, FateId selectedFateId, ExternalCompactionId cid, String compactorAddress, CompactionJob job) { if (fileLog.isDebugEnabled()) { if (job.getKind() == CompactionKind.USER) { - var fateId = tabletMetadata.getSelectedFiles().getFateId(); fileLog.debug( "Compacting {} driver:{} id:{} group:{} compactor:{} priority:{} size:{} kind:{} files:{}", - tabletMetadata.getExtent(), fateId, cid, job.getGroup(), compactorAddress, - job.getPriority(), getSize(job.getFiles()), job.getKind(), - asMinimalString(job.getFiles())); + extent, selectedFateId, cid, job.getGroup(), compactorAddress, job.getPriority(), + getSize(job.getFiles()), job.getKind(), asMinimalString(job.getFiles())); } else { fileLog.debug( "Compacting {} id:{} group:{} compactor:{} priority:{} size:{} kind:{} files:{}", - tabletMetadata.getExtent(), cid, job.getGroup(), compactorAddress, job.getPriority(), + extent, cid, job.getGroup(), compactorAddress, job.getPriority(), getSize(job.getFiles()), job.getKind(), asMinimalString(job.getFiles())); } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index c78e5661bb8..cf2ea3bb82b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -534,9 +534,10 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t ConditionalTabletMutator requireAbsentLoaded(Set files); /** - * Requires the given set of files are not currently involved in any running compactions. + * This check will run atomically on the server side and must pass in order for the tablet to be + * updated. */ - ConditionalTabletMutator requireNotCompacting(Set files); + ConditionalTabletMutator requireCheckSuccess(TabletMetadataCheck check); /** *

diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataCheck.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataCheck.java new file mode 100644 index 00000000000..14c97f625a3 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataCheck.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; + +/** + * This interface facilitates atomic checks of tablet metadata prior to updating tablet metadata. + * The way it is intended to be used is the following. + *

    + *
  1. On the client side a TabletMetadataCheck object is created and passed to Ample
  2. + *
  3. Ample uses Gson to serialize the object, so it must not reference anything that can not + * serialize. Also it should not reference big things like server context or the Manager, it should + * only reference a small amount of data needed for the check. + *
  4. On the tablet server side, as part of conditional mutation processing, this class is + * recreated and the {@link #canUpdate(TabletMetadata)} method is called and if it returns true the + * conditional mutation will go through.
  5. + *
+ * + *

+ * Implementations are expected to have a no arg constructor. + *

+ * + */ +public interface TabletMetadataCheck { + + Set ALL_COLUMNS = + Collections.unmodifiableSet(EnumSet.noneOf(TabletMetadata.ColumnType.class)); + + boolean canUpdate(TabletMetadata tabletMetadata); + + /** + * Determines what tablet metadata columns are read on the server side. Return the empty set to + * read all tablet metadata columns. + */ + default Set columnsToRead() { + return ALL_COLUMNS; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java index 95d1491700b..115ba96f089 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.Objects; -import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; @@ -40,22 +39,13 @@ public class CompactionJobImpl implements CompactionJob { private final CompactorGroupId group; private final Set files; private final CompactionKind kind; - // Tracks if a job selected all the tablet's files that existed at the time the job was created. - private final Optional jobSelectedAll; - /** - * - * @param jobSelectedAll This parameters only needs to be non-empty for job objects that are used - * to start compaction. After a job is running, its not used. So when a job object is - * recreated for a running external compaction this parameter can be empty. - */ public CompactionJobImpl(short priority, CompactorGroupId group, - Collection files, CompactionKind kind, Optional jobSelectedAll) { + Collection files, CompactionKind kind) { this.priority = priority; this.group = Objects.requireNonNull(group); this.files = Set.copyOf(files); this.kind = Objects.requireNonNull(kind); - this.jobSelectedAll = Objects.requireNonNull(jobSelectedAll); } @Override @@ -92,10 +82,6 @@ public int hashCode() { return Objects.hash(priority, group, files, kind); } - public boolean selectedAll() { - return jobSelectedAll.orElseThrow(); - } - @Override public boolean equals(Object o) { if (o instanceof CompactionJobImpl) { diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java index ab1c924fc55..49b8eaa53c9 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; @@ -80,8 +79,7 @@ public Builder addJob(short priority, CompactorGroupId group, seenFiles.addAll(filesSet); - jobs.add(new CompactionJobImpl(priority, group, filesSet, kind, - Optional.of(filesSet.equals(allFiles)))); + jobs.add(new CompactionJobImpl(priority, group, filesSet, kind)); return this; } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java index d2c25a206b1..90fc5a4c580 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java @@ -34,7 +34,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.IntStream; @@ -632,7 +631,7 @@ public void testMaxTabletFilesNoCompaction() { // that a compaction is not planned all = createCFs(1_000, 2, 2, 2, 2, 2, 2, 2); var job = new CompactionJobImpl((short) 1, CompactorGroupId.of("ee1"), createCFs("F1", "1000"), - CompactionKind.SYSTEM, Optional.of(false)); + CompactionKind.SYSTEM); params = createPlanningParams(all, all, Set.of(job), 3, CompactionKind.SYSTEM, conf); plan = planner.makePlan(params); diff --git a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java index 001dff3dff0..e181e07722d 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java @@ -37,7 +37,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Optional; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.clientImpl.Namespace; @@ -60,10 +59,9 @@ public CompactionJob createJob(CompactionKind kind, String tablet, int numFiles, files.add(CompactableFile .create(URI.create("hdfs://foonn/accumulo/tables/5/" + tablet + "/" + i + ".rf"), 4, 4)); } - return new CompactionJobImpl( - CompactionJobPrioritizer.createPriority(Namespace.DEFAULT.id(), TableId.of("5"), kind, - totalFiles, numFiles, totalFiles * 2), - CompactorGroupId.of("test"), files, kind, Optional.of(false)); + return new CompactionJobImpl(CompactionJobPrioritizer.createPriority(Namespace.DEFAULT.id(), + TableId.of("5"), kind, totalFiles, numFiles, totalFiles * 2), CompactorGroupId.of("test"), + files, kind); } @Test diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index 987b29279ff..349dedcc971 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -285,7 +284,7 @@ public Collection getRunningCompactions() { Collection files = ecMeta.getJobFiles().stream() .map(f -> new CompactableFileImpl(f, allFiles2.get(f))).collect(Collectors.toList()); CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), - ecMeta.getCompactionGroupId(), files, ecMeta.getKind(), Optional.empty()); + ecMeta.getCompactionGroupId(), files, ecMeta.getKind()); return job; }).collect(Collectors.toUnmodifiableList()); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java index fdcb93732fe..6b36a1cc36e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.classloader.ClassLoaderUtil; @@ -211,7 +212,7 @@ public Optional> getSample(CompactableFile cf, public static Map computeOverrides(Optional compactionConfig, ServerContext context, KeyExtent extent, Set inputFiles, - Set selectedFiles) { + Supplier> selectedFiles) { if (compactionConfig.isPresent() && !UserCompactionUtils.isDefault(compactionConfig.orElseThrow().getConfigurer())) { @@ -234,7 +235,8 @@ public static Map computeOverrides(Optional com } public static Map computeOverrides(ServerContext context, KeyExtent extent, - Set inputFiles, Set selectedFiles, PluginConfig cfg) { + Set inputFiles, Supplier> selectedFiles, + PluginConfig cfg) { CompactionConfigurer configurer = newInstance(context.getTableConfiguration(extent.tableId()), cfg.getClassName(), CompactionConfigurer.class); @@ -266,7 +268,7 @@ public Collection getInputFiles() { @Override public Set getSelectedFiles() { - return selectedFiles; + return selectedFiles.get(); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java index 41a59986db0..19ad33d6af6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java @@ -24,11 +24,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap; import static java.util.stream.Collectors.toUnmodifiableSet; -import java.io.IOException; -import java.io.UncheckedIOException; import java.net.URI; -import java.util.Arrays; -import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -49,8 +45,6 @@ import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; import com.google.common.base.Suppliers; import com.google.gson.Gson; @@ -120,7 +114,7 @@ private TabletManagementParameters(JsonData jdata) { this.serversToShutdown = jdata.serversToShutdown.stream().map(TServerInstance::new).collect(toUnmodifiableSet()); this.migrations = jdata.migrations.entrySet().stream() - .collect(toUnmodifiableMap(entry -> JsonData.strToExtent(entry.getKey()), + .collect(toUnmodifiableMap(entry -> KeyExtent.fromBase64(entry.getKey()), entry -> new TServerInstance(entry.getValue()))); this.level = jdata.level; this.compactionHints = makeImmutable(jdata.compactionHints.entrySet().stream() @@ -226,30 +220,6 @@ private static class JsonData { Map volumeReplacements; long steadyTime; - private static String toString(KeyExtent extent) { - DataOutputBuffer buffer = new DataOutputBuffer(); - try { - extent.writeTo(buffer); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - - return Base64.getEncoder() - .encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength())); - - } - - private static KeyExtent strToExtent(String kes) { - byte[] data = Base64.getDecoder().decode(kes); - DataInputBuffer buffer = new DataInputBuffer(); - buffer.reset(data, data.length); - try { - return KeyExtent.readFrom(buffer); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - // Gson requires private constructor @SuppressWarnings("unused") private JsonData() {} @@ -262,8 +232,9 @@ private JsonData() {} .collect(toList()); serversToShutdown = params.serversToShutdown.stream().map(TServerInstance::getHostPortSession) .collect(toList()); - migrations = params.migrations.entrySet().stream().collect( - toMap(entry -> toString(entry.getKey()), entry -> entry.getValue().getHostPortSession())); + migrations = + params.migrations.entrySet().stream().collect(toMap(entry -> entry.getKey().toBase64(), + entry -> entry.getValue().getHostPortSession())); level = params.level; tserverGroups = params.getGroupedTServers().entrySet().stream() .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().stream() diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index ac0dc4b1126..81194358715 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -61,16 +61,17 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.metadata.schema.TabletMetadataCheck; import org.apache.accumulo.core.metadata.schema.TabletMutatorBase; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.iterators.ColumnFamilySizeLimitIterator; -import org.apache.accumulo.server.metadata.iterators.DisjointCompactionIterator; import org.apache.accumulo.server.metadata.iterators.PresentIterator; import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator; import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator; +import org.apache.accumulo.server.metadata.iterators.TabletMetadataCheckIterator; import com.google.common.base.Preconditions; @@ -370,9 +371,9 @@ public ConditionalTabletMutator requireAbsentLoaded(Set fi } @Override - public ConditionalTabletMutator requireNotCompacting(Set files) { + public ConditionalTabletMutator requireCheckSuccess(TabletMetadataCheck check) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); - Condition condition = DisjointCompactionIterator.createCondition(files); + Condition condition = TabletMetadataCheckIterator.createCondition(check, extent); mutation.addCondition(condition); return this; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/DisjointCompactionIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/DisjointCompactionIterator.java deleted file mode 100644 index fb8ba8c82ac..00000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/DisjointCompactionIterator.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.metadata.iterators; - -import static org.apache.accumulo.core.util.LazySingletons.GSON; - -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.Condition; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.schema.CompactionMetadata; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; -import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; - -import com.google.common.base.Preconditions; -import com.google.gson.reflect.TypeToken; - -/** - * An iterator used with conditional updates to tablets to check that for a given set of files none - * of them are currently compacting. - */ -public class DisjointCompactionIterator extends ColumnFamilyTransformationIterator { - - private Set filesToCompact; - - private static final String DISJOINT = "disjoint"; - private static final String OVERLAPS = "overlaps"; - private static final String FILES_KEY = "files"; - - @Override - protected Value transform(SortedKeyValueIterator source) throws IOException { - while (source.hasTop()) { - Preconditions.checkState( - source.getTopKey().getColumnFamily().equals(ExternalCompactionColumnFamily.NAME)); - var compactionMetadata = CompactionMetadata.fromJson(source.getTopValue().toString()); - if (!Collections.disjoint(filesToCompact, compactionMetadata.getJobFiles())) { - return new Value(OVERLAPS); - } - source.next(); - } - return new Value(DISJOINT); - } - - @Override - public void init(SortedKeyValueIterator source, Map options, - IteratorEnvironment env) throws IOException { - super.init(source, options, env); - filesToCompact = decode(options.get(FILES_KEY)); - } - - /** - * Creates a condition that will only pass if the given files are disjoint with all files - * currently compacting for a tablet. - */ - public static Condition createCondition(Set filesToCompact) { - Preconditions.checkArgument(!filesToCompact.isEmpty()); - IteratorSetting is = new IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO, - DisjointCompactionIterator.class); - is.addOption(FILES_KEY, encode(filesToCompact)); - return new Condition(ExternalCompactionColumnFamily.NAME, EMPTY).setValue(DISJOINT) - .setIterators(is); - } - - private static String encode(Set filesToCompact) { - var files = - filesToCompact.stream().map(StoredTabletFile::getMetadata).collect(Collectors.toSet()); - return GSON.get().toJson(files); - } - - private Set decode(String s) { - Type fileSetType = new TypeToken>() {}.getType(); - Set files = GSON.get().fromJson(s, fileSetType); - return files.stream().map(StoredTabletFile::of).collect(Collectors.toSet()); - } -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/TabletMetadataCheckIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/TabletMetadataCheckIterator.java new file mode 100644 index 00000000000..83c06cd8c96 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/TabletMetadataCheckIterator.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata.iterators; + +import static org.apache.accumulo.core.util.LazySingletons.GSON; +import static org.apache.accumulo.server.metadata.iterators.ColumnFamilyTransformationIterator.getTabletRow; + +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; + +import org.apache.accumulo.core.classloader.ClassLoaderUtil; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.IteratorAdapter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadataCheck; +import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class TabletMetadataCheckIterator implements SortedKeyValueIterator { + + private static final Logger log = LoggerFactory.getLogger(TabletMetadataCheckIterator.class); + + private TabletMetadataCheck check; + private KeyExtent expectedExtent; + + private Key startKey; + private Value topValue; + private SortedKeyValueIterator source; + + private static final String CHECK_CLASS_KEY = "checkClass"; + private static final String CHECK_DATA_KEY = "checkData"; + private static final String EXTENT_KEY = "checkExtent"; + private static final String SUCCESS = "success"; + + @Override + public void init(SortedKeyValueIterator source, Map options, + IteratorEnvironment env) throws IOException { + Preconditions.checkState(check == null && expectedExtent == null && startKey == null); + try { + String className = Objects.requireNonNull(options.get(CHECK_CLASS_KEY)); + String checkData = Objects.requireNonNull(options.get(CHECK_DATA_KEY)); + log.trace("Instantiating class {} using {}", className, checkData); + Class clazz = + ClassLoaderUtil.loadClass(null, className, TabletMetadataCheck.class); + check = GSON.get().fromJson(checkData, clazz); + expectedExtent = KeyExtent.fromBase64(options.get(EXTENT_KEY)); + this.source = source; + } catch (ReflectiveOperationException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + Preconditions.checkState(check != null && expectedExtent != null); + + this.startKey = null; + + Text tabletRow = getTabletRow(range); + + var expectedMetaRow = expectedExtent.toMetaRow(); + Preconditions.checkState(tabletRow.equals(expectedMetaRow), "Tablet row mismatch %s %s", + tabletRow, expectedMetaRow); + + var colsToRead = check.columnsToRead(); + + source.seek(new Range(tabletRow), Set.of(), false); + + if (source.hasTop()) { + var tabletMetadata = TabletMetadata.convertRow(new IteratorAdapter(source), + EnumSet.allOf(TabletMetadata.ColumnType.class), false, false); + + // TODO checking the prev end row here is redundant w/ other checks that ample currently + // does.. however we could try to make all checks eventually use this class + if (tabletMetadata.getExtent().equals(expectedExtent)) { + if (check.canUpdate(tabletMetadata)) { + topValue = new Value(SUCCESS); + } else { + topValue = null; + } + log.trace("Checked tablet {} using {} {} and it was a {}", expectedExtent, check, + tabletMetadata, topValue != null ? SUCCESS : "FAILURE"); + } else { + topValue = null; + log.trace( + "Attempted to check tablet {} using {} but found another extent {}, so failing check", + expectedExtent, check, tabletMetadata.getExtent()); + } + } else { + topValue = null; + log.trace("Attempted to check tablet {} using {} but it does not exists, so failing check", + expectedExtent, check); + } + + this.startKey = range.getStartKey(); + } + + @Override + public Key getTopKey() { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + if (topValue == null) { + throw new NoSuchElementException(); + } + + return startKey; + } + + @Override + public Value getTopValue() { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + if (topValue == null) { + throw new NoSuchElementException(); + } + return topValue; + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isRunningLowOnMemory() { + return source.isRunningLowOnMemory(); + } + + @Override + public boolean hasTop() { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + return topValue != null; + } + + @Override + public void next() throws IOException { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + topValue = null; + } + + public static Condition createCondition(TabletMetadataCheck tmCheck, KeyExtent expectedExtent) { + Objects.requireNonNull(tmCheck); + IteratorSetting is = new IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO, + TabletMetadataCheckIterator.class); + is.addOption(CHECK_CLASS_KEY, tmCheck.getClass().getName()); + is.addOption(CHECK_DATA_KEY, GSON.get().toJson(tmCheck)); + is.addOption(EXTENT_KEY, expectedExtent.toBase64()); + return new Condition("", "").setValue(SUCCESS).setIterators(is); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java index 7ab3e048f73..25a7da3056b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java @@ -27,7 +27,6 @@ import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; @@ -68,12 +67,12 @@ public static ReferencedTabletFile getNextDataFilename(FilePrefix prefix, Server } public static ReferencedTabletFile getNextDataFilenameForMajc(boolean propagateDeletes, - ServerContext context, TabletMetadata tabletMetadata, Consumer dirCreator, + ServerContext context, KeyExtent extent, String tabletDir, Consumer dirCreator, ExternalCompactionId ecid) { String tmpFileName = getNextDataFilename( !propagateDeletes ? FilePrefix.MAJOR_COMPACTION_ALL_FILES : FilePrefix.MAJOR_COMPACTION, - context, tabletMetadata.getExtent(), tabletMetadata.getDirName(), dirCreator).insert() - .getMetadataPath() + "_tmp_" + ecid.canonical(); + context, extent, tabletDir, dirCreator).insert().getMetadataPath() + "_tmp_" + + ecid.canonical(); return new ReferencedTabletFile(new Path(tmpFileName)); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/tablets/TabletNameGeneratorTest.java b/server/base/src/test/java/org/apache/accumulo/server/tablets/TabletNameGeneratorTest.java index 644939d68b9..a95a71060d6 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/tablets/TabletNameGeneratorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/tablets/TabletNameGeneratorTest.java @@ -31,7 +31,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; @@ -74,20 +73,16 @@ public void testGetNextDataFilenameForMajc() { EasyMock.expect(context.getBaseUris()).andReturn(Set.of(baseUri)); EasyMock.expect(context.getUniqueNameAllocator()).andReturn(allocator); - TabletMetadata tm1 = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm1.getExtent()).andReturn(ke1); - EasyMock.expect(tm1.getDirName()).andReturn(dirName); - Consumer dirCreator = (dir) -> {}; ExternalCompactionId ecid = ExternalCompactionId.generate(UUID.randomUUID()); - EasyMock.replay(tableConf, vm, allocator, context, tm1); + EasyMock.replay(tableConf, vm, allocator, context); - ReferencedTabletFile rtf = - TabletNameGenerator.getNextDataFilenameForMajc(false, context, tm1, dirCreator, ecid); + ReferencedTabletFile rtf = TabletNameGenerator.getNextDataFilenameForMajc(false, context, ke1, + dirName, dirCreator, ecid); assertEquals("ANextFileName.rf_tmp_" + ecid.canonical(), rtf.getFileName()); - EasyMock.verify(tableConf, vm, allocator, context, tm1); + EasyMock.verify(tableConf, vm, allocator, context); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index ab8e1bbf447..ebe27cb2be1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -25,13 +25,12 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.ref.SoftReference; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -96,6 +95,7 @@ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -113,7 +113,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; -import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; @@ -122,7 +121,6 @@ import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; @@ -130,6 +128,7 @@ import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile; import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; +import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionConfigStorage; @@ -148,8 +147,8 @@ import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Weigher; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; @@ -404,44 +403,44 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials TExternalCompactionJob result = null; - CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); + ResolvedCompactionJob rcJob = (ResolvedCompactionJob) jobQueues.poll(groupId); - while (metaJob != null) { + while (rcJob != null) { - Optional compactionConfig = getCompactionConfig(metaJob); + Optional compactionConfig = getCompactionConfig(rcJob); - // this method may reread the metadata, do not use the metadata in metaJob for anything after + // this method may reread the metadata, do not use the metadata in rcJob for anything after // this method CompactionMetadata ecm = null; - var kind = metaJob.getJob().getKind(); + var kind = rcJob.getKind(); // Only reserve user compactions when the config is present. When compactions are canceled the // config is deleted. var cid = ExternalCompactionId.from(externalCompactionId); if (kind == CompactionKind.SYSTEM || (kind == CompactionKind.USER && compactionConfig.isPresent())) { - ecm = reserveCompaction(metaJob, compactorAddress, cid); + ecm = reserveCompaction(rcJob, compactorAddress, cid); } if (ecm != null) { - result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); + result = createThriftJob(externalCompactionId, ecm, rcJob, compactionConfig); // It is possible that by the time this added that the the compactor that made this request // is dead. In this cases the compaction is not actually running. RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), new RunningCompaction(result, compactorAddress, groupName)); - TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, - metaJob.getJob()); + TabletLogger.compacting(rcJob.getExtent(), rcJob.getSelectedFateId(), cid, compactorAddress, + rcJob); break; } else { LOG.debug( "Unable to reserve compaction job for {}, pulling another off the queue for group {}", - metaJob.getTabletMetadata().getExtent(), groupName); - metaJob = jobQueues.poll(CompactorGroupId.of(groupName)); + rcJob.getExtent(), groupName); + rcJob = (ResolvedCompactionJob) jobQueues.poll(CompactorGroupId.of(groupName)); } } - if (metaJob == null) { + if (rcJob == null) { LOG.trace("No jobs found in group {} ", groupName); } @@ -454,60 +453,6 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials return new TNextCompactionJob(result, compactorCounts.get(groupName)); } - @VisibleForTesting - public static boolean canReserveCompaction(TabletMetadata tablet, CompactionKind kind, - Set jobFiles, ServerContext ctx, SteadyTime steadyTime) { - - if (tablet == null) { - // the tablet no longer exist - return false; - } - - if (tablet.getOperationId() != null) { - return false; - } - - if (ctx.getTableState(tablet.getTableId()) != TableState.ONLINE) { - return false; - } - - if (!tablet.getFiles().containsAll(jobFiles)) { - return false; - } - - var currentlyCompactingFiles = tablet.getExternalCompactions().values().stream() - .flatMap(ecm -> ecm.getJobFiles().stream()).collect(Collectors.toSet()); - - if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) { - return false; - } - - switch (kind) { - case SYSTEM: - var userRequestedCompactions = tablet.getUserCompactionsRequested().size(); - if (userRequestedCompactions > 0) { - LOG.debug( - "Unable to reserve {} for system compaction, tablet has {} pending requested user compactions", - tablet.getExtent(), userRequestedCompactions); - return false; - } else if (!Collections.disjoint(jobFiles, - getFilesReservedBySelection(tablet, steadyTime, ctx))) { - return false; - } - break; - case USER: - if (tablet.getSelectedFiles() == null - || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) { - return false; - } - break; - default: - throw new UnsupportedOperationException("Not currently handling " + kind); - } - - return true; - } - private void checkTabletDir(KeyExtent extent, Path path) { try { if (tabletDirCache.getIfPresent(path) == null) { @@ -530,54 +475,30 @@ private void checkTabletDir(KeyExtent extent, Path path) { } } - protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, - Set jobFiles, TabletMetadata tablet, String compactorAddress, - ExternalCompactionId externalCompactionId) { - boolean propDels; - - FateId fateId = null; - - switch (job.getKind()) { - case SYSTEM: { - boolean compactingAll = tablet.getFiles().equals(jobFiles); - propDels = !compactingAll; - } - break; - case USER: { - boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll() - && tablet.getSelectedFiles().getFiles().equals(jobFiles); - propDels = !compactingAll; - fateId = tablet.getSelectedFiles().getFateId(); - } - break; - default: - throw new IllegalArgumentException(); - } + protected CompactionMetadata createExternalCompactionMetadata(ResolvedCompactionJob job, + String compactorAddress, ExternalCompactionId externalCompactionId) { + boolean propDels = !job.isCompactingAll(); + FateId fateId = job.getSelectedFateId(); - Consumer directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir)); + Consumer directoryCreator = dir -> checkTabletDir(job.getExtent(), new Path(dir)); ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, - tablet, directoryCreator, externalCompactionId); + job.getExtent(), job.getTabletDir(), directoryCreator, externalCompactionId); - return new CompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(), + return new CompactionMetadata(job.getJobFiles(), newFile, compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), propDels, fateId); } private class ReserveCompactionTask implements Supplier { - - // Use a soft reference for this in case free memory gets low while this is sitting in the queue - // waiting to process. This object can contain the tablets list of files and if there are lots - // of tablet with lots of files then that could start to cause memory problems. This hack could - // be removed if #5188 were implemented. - private final SoftReference metaJobRef; + private final ResolvedCompactionJob rcJob; private final String compactorAddress; private final ExternalCompactionId externalCompactionId; - private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String compactorAddress, + private ReserveCompactionTask(ResolvedCompactionJob rcJob, String compactorAddress, ExternalCompactionId externalCompactionId) { - Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM - || metaJob.getJob().getKind() == CompactionKind.USER); - this.metaJobRef = new SoftReference<>(Objects.requireNonNull(metaJob)); + Preconditions.checkArgument( + rcJob.getKind() == CompactionKind.SYSTEM || rcJob.getKind() == CompactionKind.USER); + this.rcJob = Objects.requireNonNull(rcJob); this.compactorAddress = Objects.requireNonNull(compactorAddress); this.externalCompactionId = Objects.requireNonNull(externalCompactionId); Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress), @@ -587,89 +508,42 @@ private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String compac @Override public CompactionMetadata get() { - try { - var metaJob = metaJobRef.get(); - if (metaJob == null) { - LOG.warn("Compaction reservation request for {} {} was garbage collected.", - compactorAddress, externalCompactionId); - return null; - } - - var tabletMetadata = metaJob.getTabletMetadata(); - - var jobFiles = metaJob.getJob().getFiles().stream() - .map(CompactableFileImpl::toStoredTabletFile).collect(Collectors.toSet()); - - Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100)) - .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) - .logInterval(Duration.ofMinutes(3)).createRetry(); - - while (retry.canRetry()) { - try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { - var extent = metaJob.getTabletMetadata().getExtent(); - - if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx, - manager.getSteadyTime())) { - return null; - } - - var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata, - compactorAddress, externalCompactionId); - - // any data that is read from the tablet to make a decision about if it can compact or - // not - // must be checked for changes in the conditional mutation. - var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() - .requireFiles(jobFiles).requireNotCompacting(jobFiles); - if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { - // For system compactions the user compaction requested column is examined when - // deciding - // if a compaction can start so need to check for changes to this column. - tabletMutator.requireSame(tabletMetadata, SELECTED, USER_COMPACTION_REQUESTED); - } else { - tabletMutator.requireSame(tabletMetadata, SELECTED); - } - - if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { - var selectedFiles = tabletMetadata.getSelectedFiles(); - var reserved = - getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx); - - // If there is a selectedFiles column, and the reserved set is empty this means that - // either no user jobs were completed yet or the selection expiration time has passed - // so the column is eligible to be deleted so a system job can run instead - if (selectedFiles != null && reserved.isEmpty() - && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) { - LOG.debug("Deleting user compaction selected files for {} {}", extent, - externalCompactionId); - tabletMutator.deleteSelectedFiles(); - } - } + if (ctx.getTableState(rcJob.getExtent().tableId()) != TableState.ONLINE) { + return null; + } - tabletMutator.putExternalCompaction(externalCompactionId, ecm); - tabletMutator.submit( - tm -> tm.getExternalCompactions().containsKey(externalCompactionId), - () -> "compaction reservation"); + try { + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var extent = rcJob.getExtent(); + var jobFiles = rcJob.getJobFiles(); + long selectedExpirationDuration = ctx.getTableConfiguration(extent.tableId()) + .getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION); + var reservationCheck = new CompactionReservationCheck(rcJob.getKind(), jobFiles, + rcJob.getSelectedFateId(), rcJob.isOverlapsSelectedFiles(), manager.getSteadyTime(), + selectedExpirationDuration); + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireCheckSuccess(reservationCheck); + + var ecm = createExternalCompactionMetadata(rcJob, compactorAddress, externalCompactionId); + + if (rcJob.isOverlapsSelectedFiles()) { + // There is corresponding code in CompactionReservationCheck that ensures this delete is + // safe to do. + tabletMutator.deleteSelectedFiles(); + } + tabletMutator.putExternalCompaction(externalCompactionId, ecm); - var result = tabletsMutator.process().get(extent); + tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId), + () -> "compaction reservation"); - if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { - return ecm; - } else { - tabletMetadata = result.readMetadata(); - } - } + var result = tabletsMutator.process().get(extent); - retry.useRetry(); - try { - retry.waitForNextAttempt(LOG, - "Reserved compaction for " + metaJob.getTabletMetadata().getExtent()); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + return ecm; + } else { + return null; } } - - return null; } finally { Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress), "compactorAddress:%s", compactorAddress); @@ -677,7 +551,7 @@ public CompactionMetadata get() { } } - protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, + protected CompactionMetadata reserveCompaction(ResolvedCompactionJob rcJob, String compactorAddress, ExternalCompactionId externalCompactionId) { if (activeCompactorReservationRequest.contains(compactorAddress)) { @@ -695,37 +569,43 @@ protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJ return null; } - var dataLevel = DataLevel.of(metaJob.getTabletMetadata().getTableId()); + var dataLevel = DataLevel.of(rcJob.getExtent().tableId()); var future = CompletableFuture.supplyAsync( - new ReserveCompactionTask(metaJob, compactorAddress, externalCompactionId), + new ReserveCompactionTask(rcJob, compactorAddress, externalCompactionId), reservationPools.get(dataLevel)); return future.join(); } protected TExternalCompactionJob createThriftJob(String externalCompactionId, - CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob, + CompactionMetadata ecm, ResolvedCompactionJob rcJob, Optional compactionConfig) { - Set selectedFiles; - if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { - selectedFiles = Set.of(); - } else { - selectedFiles = metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream() - .map(file -> new CompactableFileImpl(file, - metaJob.getTabletMetadata().getFilesMap().get(file))) - .collect(Collectors.toUnmodifiableSet()); - } + // Only reach out to metadata table and get these if requested, usually not needed unless + // plugiun requests it. + Supplier> selectedFiles = Suppliers.memoize(() -> { + if (rcJob.getKind() == CompactionKind.SYSTEM) { + return Set.of(); + } else { + var tabletMetadata = + ctx.getAmple().readTablet(rcJob.getExtent(), SELECTED, FILES, PREV_ROW); + Preconditions.checkState( + tabletMetadata.getSelectedFiles().getFateId().equals(rcJob.getSelectedFateId())); + return tabletMetadata.getSelectedFiles().getFiles().stream() + .map(file -> new CompactableFileImpl(file, tabletMetadata.getFilesMap().get(file))) + .collect(Collectors.toUnmodifiableSet()); + } + }); Map overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx, - metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), selectedFiles); + rcJob.getExtent(), rcJob.getFiles(), selectedFiles); IteratorConfig iteratorSettings = SystemIteratorUtil .toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of())); - var files = ecm.getJobFiles().stream().map(storedTabletFile -> { - var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile); - return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(), - dfv.getTime()); + var files = rcJob.getJobFilesMap().entrySet().stream().map(e -> { + StoredTabletFile file = e.getKey(); + DataFileValue dfv = e.getValue(); + return new InputFile(file.getMetadata(), dfv.getSize(), dfv.getNumEntries(), dfv.getTime()); }).collect(toList()); // The fateId here corresponds to the Fate transaction that is driving a user initiated @@ -733,13 +613,12 @@ protected TExternalCompactionJob createThriftJob(String externalCompactionId, // it to null. If anything tries to use the id for a system compaction and triggers a NPE it's // probably a bug that needs to be fixed. FateId fateId = null; - if (metaJob.getJob().getKind() == CompactionKind.USER) { - fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId(); + if (rcJob.getKind() == CompactionKind.USER) { + fateId = rcJob.getSelectedFateId(); } - return new TExternalCompactionJob(externalCompactionId, - metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, - ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + return new TExternalCompactionJob(externalCompactionId, rcJob.getExtent().toThrift(), files, + iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), TCompactionKind.valueOf(ecm.getKind().name()), fateId == null ? null : fateId.toThrift(), overrides); } @@ -750,18 +629,21 @@ public void registerMetrics(MeterRegistry registry) { } public void addJobs(TabletMetadata tabletMetadata, Collection jobs) { - jobQueues.add(tabletMetadata, jobs); + ArrayList resolvedJobs = new ArrayList<>(jobs.size()); + for (var job : jobs) { + resolvedJobs.add(new ResolvedCompactionJob(job, tabletMetadata)); + } + + jobQueues.add(tabletMetadata.getExtent(), resolvedJobs); } public CompactionCoordinatorService.Iface getThriftService() { return this; } - private Optional getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { - if (metaJob.getJob().getKind() == CompactionKind.USER - && metaJob.getTabletMetadata().getSelectedFiles() != null) { - var cconf = - compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateId()); + private Optional getCompactionConfig(ResolvedCompactionJob rcJob) { + if (rcJob.getKind() == CompactionKind.USER) { + var cconf = compactionConfigCache.get(rcJob.getSelectedFateId()); return Optional.ofNullable(cconf); } return Optional.empty(); @@ -1306,25 +1188,4 @@ public void cleanUpInternalState() { } } } - - private static Set getFilesReservedBySelection(TabletMetadata tabletMetadata, - SteadyTime steadyTime, ServerContext ctx) { - if (tabletMetadata.getSelectedFiles() == null) { - return Set.of(); - } - - if (tabletMetadata.getSelectedFiles().getCompletedJobs() > 0) { - return tabletMetadata.getSelectedFiles().getFiles(); - } - - long selectedExpirationDuration = ctx.getTableConfiguration(tabletMetadata.getTableId()) - .getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION); - - if (steadyTime.minus(tabletMetadata.getSelectedFiles().getSelectedTime()).toMillis() - < selectedExpirationDuration) { - return tabletMetadata.getSelectedFiles().getFiles(); - } - - return Set.of(); - } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionReservationCheck.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionReservationCheck.java new file mode 100644 index 00000000000..214aa24f799 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionReservationCheck.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadataCheck; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Checks if a compaction job can be reserved + */ +public class CompactionReservationCheck implements TabletMetadataCheck { + + private static final Logger log = LoggerFactory.getLogger(CompactionReservationCheck.class); + + private CompactionKind kind; + private List jobFilesStr; + private Long steadyTimeNanos; + private Long selectedExpirationDurationMillis; + private String selectedFateIdStr; + private boolean checkIfCanDeleteSelectedFiles; + + public CompactionReservationCheck() {} + + public CompactionReservationCheck(CompactionKind jobKind, Set jobFiles, + FateId selectedFateId, boolean checkIfCanDeleteSelectedFiles, SteadyTime steadyTime, + long selectedExpirationDurationMillis) { + this.kind = jobKind; + // since this class will be serialized as json, make the types simpler to avoid many levels of + // nesting + this.jobFilesStr = + jobFiles.stream().map(StoredTabletFile::getMetadata).collect(Collectors.toList()); + this.steadyTimeNanos = steadyTime.getNanos(); + this.selectedExpirationDurationMillis = selectedExpirationDurationMillis; + this.selectedFateIdStr = selectedFateId == null ? null : selectedFateId.canonical(); + this.checkIfCanDeleteSelectedFiles = checkIfCanDeleteSelectedFiles; + } + + @Override + public boolean canUpdate(TabletMetadata tablet) { + Objects.requireNonNull(tablet); + Preconditions.checkState(kind != null && jobFilesStr != null && steadyTimeNanos != null + && selectedExpirationDurationMillis != null); + // expect selectedFateIdStr to be set if this is user compaction + Preconditions.checkState((kind == CompactionKind.USER && selectedFateIdStr != null) + || (kind == CompactionKind.SYSTEM && selectedFateIdStr == null)); + if (checkIfCanDeleteSelectedFiles) { + Preconditions.checkState(kind == CompactionKind.SYSTEM); + } + + var jobFiles = jobFilesStr.stream().map(StoredTabletFile::of).collect(Collectors.toSet()); + var steadyTime = SteadyTime.from(steadyTimeNanos, TimeUnit.NANOSECONDS); + + if (tablet.getOperationId() != null) { + return false; + } + + if (!tablet.getFiles().containsAll(jobFiles)) { + return false; + } + + var currentlyCompactingFiles = tablet.getExternalCompactions().values().stream() + .flatMap(ecm -> ecm.getJobFiles().stream()).collect(Collectors.toSet()); + + if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) { + return false; + } + + switch (kind) { + case SYSTEM: { + var userRequestedCompactions = tablet.getUserCompactionsRequested().size(); + if (userRequestedCompactions > 0) { + log.debug( + "Unable to reserve {} for system compaction, tablet has {} pending requested user compactions", + tablet.getExtent(), userRequestedCompactions); + return false; + } + + var selected = tablet.getSelectedFiles(); + if (selected != null) { + if (checkIfCanDeleteSelectedFiles) { + // The mutation is deleting the selected files column. This can only proceed if no jobs + // have run against the selected files, the selected files are expired, and the files + // being compacted overlaps with the selected files. + if (Collections.disjoint(jobFiles, selected.getFiles())) { + // This job does not overlap with the selected files, so something probably changed + // since the job was generated. Do not want to risk deleted selected files that + // changed after the job was generated. + return false; + } + + if (selected.getCompletedJobs() > 0) { + return false; + } + + if (steadyTime.minus(tablet.getSelectedFiles().getSelectedTime()).toMillis() + < selectedExpirationDurationMillis) { + return false; + } + } else { + // To start a system compaction that overlaps with files selected for user compaction + // the mutation must delete the selected set of files. The mutation is not deleting the + // selected files column, so it can not overlap w/ them. + if (!Collections.disjoint(jobFiles, selected.getFiles())) { + return false; + } + } + } + break; + } + case USER: { + var selectedFateId = FateId.from(selectedFateIdStr); + if (tablet.getSelectedFiles() == null + || !tablet.getSelectedFiles().getFateId().equals(selectedFateId) + || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) { + return false; + } + break; + } + default: + throw new UnsupportedOperationException("Not currently handling " + kind); + } + + return true; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 51d33a80572..17c50773bb8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -39,7 +39,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.util.Stat; @@ -72,10 +71,8 @@ public class CompactionJobPriorityQueue { static final int FUTURE_CHECK_THRESHOLD = 10_000; private class CjpqKey implements Comparable { - private final CompactionJob job; - // this exists to make every entry unique even if the job is the same, this is done because a - // treeset is used as a queue + private final CompactionJob job; private final long seq; CjpqKey(CompactionJob job) { @@ -85,16 +82,20 @@ private class CjpqKey implements Comparable { @Override public int compareTo(CjpqKey oe) { - int cmp = CompactionJobPrioritizer.JOB_COMPARATOR.compare(this.job, oe.job); + int cmp = CompactionJobPrioritizer.JOB_COMPARATOR.compare(job, oe.job); if (cmp == 0) { cmp = Long.compare(seq, oe.seq); } return cmp; } + public short getPriority() { + return job.getPriority(); + } + @Override public int hashCode() { - return Objects.hash(job, seq); + return Objects.hash(job.getPriority(), seq); } @Override @@ -106,7 +107,17 @@ public boolean equals(Object o) { return false; } CjpqKey cjqpKey = (CjpqKey) o; - return seq == cjqpKey.seq && job.equals(cjqpKey.job); + return seq == cjqpKey.seq && job.getPriority() == cjqpKey.job.getPriority(); + } + } + + private static class MetaJob { + private final CompactionJob job; + private final KeyExtent extent; + + public MetaJob(CompactionJob job, KeyExtent extent) { + this.job = job; + this.extent = extent; } } @@ -114,11 +125,11 @@ public boolean equals(Object o) { // behavior is not supported with a PriorityQueue. Second a PriorityQueue does not support // efficiently removing entries from anywhere in the queue. Efficient removal is needed for the // case where tablets decided to issues different compaction jobs than what is currently queued. - private final SizeTrackingTreeMap jobQueue; + private final SizeTrackingTreeMap jobQueue; private final AtomicLong maxSize; private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; - private final ArrayDeque> futures; + private final ArrayDeque> futures; private long futuresAdded = 0; private final Map jobAges; private final Supplier jobQueueStats; @@ -141,8 +152,8 @@ private TabletJobs(long generation, HashSet jobs) { private final AtomicLong nextSeq = new AtomicLong(0); public CompactionJobPriorityQueue(CompactorGroupId groupId, long maxSize, - SizeTrackingTreeMap.Weigher weigher) { - this.jobQueue = new SizeTrackingTreeMap<>(weigher); + SizeTrackingTreeMap.Weigher weigher) { + this.jobQueue = new SizeTrackingTreeMap<>(mj -> weigher.weigh(mj.job)); this.maxSize = new AtomicLong(maxSize); this.tabletJobs = new HashMap<>(); this.groupId = groupId; @@ -176,14 +187,13 @@ public synchronized void removeOlderGenerations(Ample.DataLevel level, long curr /** * @return the number of jobs added. If the queue is closed returns -1 */ - public synchronized int add(TabletMetadata tabletMetadata, Collection jobs, - long generation) { + public synchronized int add(KeyExtent extent, Collection jobs, long generation) { Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId))); // Do not clear jobAge timers, they are cleared later at the end of this method // if there are no jobs for the extent so we do not reset the timer for an extent // that had previous jobs and still has jobs - removePreviousSubmissions(tabletMetadata.getExtent(), false); + removePreviousSubmissions(extent, false); HashSet newEntries = new HashSet<>(jobs.size()); @@ -196,7 +206,7 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection Timer.startNew()); + checkState(tabletJobs.put(extent, new TabletJobs(generation, newEntries)) == null); + jobAges.computeIfAbsent(extent, e -> Timer.startNew()); } else { - jobAges.remove(tabletMetadata.getExtent()); + jobAges.remove(extent); } return jobsAdded; @@ -253,15 +262,15 @@ public synchronized long getLowestPriority() { if (jobQueue.isEmpty()) { return 0; } - return jobQueue.lastKey().job.getPriority(); + return jobQueue.lastKey().getPriority(); } - public synchronized CompactionJobQueues.MetaJob poll() { + public synchronized CompactionJob poll() { var first = jobQueue.pollFirstEntry(); if (first != null) { dequeuedJobs.getAndIncrement(); - var extent = first.getValue().getTabletMetadata().getExtent(); + var extent = first.getValue().extent; var timer = jobAges.get(extent); checkState(timer != null); jobQueueTimer.get().ifPresent(jqt -> jqt.record(timer.elapsed())); @@ -277,10 +286,10 @@ public synchronized CompactionJobQueues.MetaJob poll() { timer.restart(); } } - return first == null ? null : first.getValue(); + return first == null ? null : first.getValue().job; } - public synchronized CompletableFuture getAsync() { + public synchronized CompletableFuture getAsync() { var job = poll(); if (job != null) { return CompletableFuture.completedFuture(job); @@ -288,7 +297,7 @@ public synchronized CompletableFuture getAsync() { // There is currently nothing in the queue, so create an uncompleted future and queue it up to // be completed when something does arrive. - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); futures.add(future); futuresAdded++; // Handle the case where nothing is ever being added to this queue and futures are constantly @@ -311,9 +320,9 @@ synchronized int futuresSize() { } // exists for tests - synchronized CompactionJobQueues.MetaJob peek() { + synchronized CompactionJob peek() { var firstEntry = jobQueue.firstEntry(); - return firstEntry == null ? null : firstEntry.getValue(); + return firstEntry == null ? null : firstEntry.getValue().job; } private void removePreviousSubmissions(KeyExtent extent, boolean removeJobAges) { @@ -328,10 +337,10 @@ private void removePreviousSubmissions(KeyExtent extent, boolean removeJobAges) } } - private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) { + private CjpqKey addJobToQueue(KeyExtent extent, CompactionJob job) { if (jobQueue.dataSize() >= maxSize.get()) { var lastEntry = jobQueue.lastKey(); - if (job.getPriority() <= lastEntry.job.getPriority()) { + if (job.getPriority() <= lastEntry.getPriority()) { // the queue is full and this job has a lower or same priority than the lowest job in the // queue, so do not add it rejectedJobs.getAndIncrement(); @@ -345,7 +354,7 @@ private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) } var key = new CjpqKey(job); - jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata)); + jobQueue.put(key, new MetaJob(job, extent)); return key; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index e342b5b9655..a39e8856691 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -28,8 +28,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.slf4j.Logger; @@ -52,9 +52,6 @@ public class CompactionJobQueues { private final Map currentGenerations; - private SizeTrackingTreeMap.Weigher weigher = - val -> val.getTabletMetadata().toString().length() + val.getJob().toString().length(); - public CompactionJobQueues(long queueSize) { this.queueSize = queueSize; Map cg = new EnumMap<>(DataLevel.class); @@ -83,13 +80,13 @@ public void endFullScan(DataLevel level) { .forEach(pq -> pq.removeOlderGenerations(level, currentGenerations.get(level).get())); } - public void add(TabletMetadata tabletMetadata, Collection jobs) { + public void add(KeyExtent extent, Collection jobs) { if (jobs.size() == 1) { var executorId = jobs.iterator().next().getGroup(); - add(tabletMetadata, executorId, jobs); + add(extent, executorId, jobs); } else { jobs.stream().collect(Collectors.groupingBy(CompactionJob::getGroup)) - .forEach(((groupId, compactionJobs) -> add(tabletMetadata, groupId, compactionJobs))); + .forEach(((groupId, compactionJobs) -> add(extent, groupId, compactionJobs))); } } @@ -138,39 +135,19 @@ public long getQueuedJobCount() { return count; } - public static class MetaJob { - private final CompactionJob job; - - // the metadata from which the compaction job was derived - private final TabletMetadata tabletMetadata; - - public MetaJob(CompactionJob job, TabletMetadata tabletMetadata) { - this.job = job; - this.tabletMetadata = tabletMetadata; - } - - public CompactionJob getJob() { - return job; - } - - public TabletMetadata getTabletMetadata() { - return tabletMetadata; - } - } - /** * Asynchronously get a compaction job from the queue. If the queue currently has jobs then a * completed future will be returned containing the highest priority job in the queue. If the * queue is currently empty, then an uncompleted future will be returned and later when something * is added to the queue the future will be completed. */ - public CompletableFuture getAsync(CompactorGroupId groupId) { + public CompletableFuture getAsync(CompactorGroupId groupId) { var pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher)); + gid -> new CompactionJobPriorityQueue(gid, queueSize, ResolvedCompactionJob.WEIGHER)); return pq.getAsync(); } - public MetaJob poll(CompactorGroupId groupId) { + public CompactionJob poll(CompactorGroupId groupId) { var prioQ = priorityQueues.get(groupId); if (prioQ == null) { return null; @@ -179,19 +156,17 @@ public MetaJob poll(CompactorGroupId groupId) { return prioQ.poll(); } - private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, - Collection jobs) { + private void add(KeyExtent extent, CompactorGroupId groupId, Collection jobs) { if (log.isTraceEnabled()) { - log.trace("Adding jobs to queue {} {} {}", groupId, tabletMetadata.getExtent(), + log.trace("Adding jobs to queue {} {} {}", groupId, extent, jobs.stream().map(job -> "#files:" + job.getFiles().size() + ",prio:" + job.getPriority() + ",kind:" + job.getKind()).collect(Collectors.toList())); } var pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher)); - pq.add(tabletMetadata, jobs, - currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()); + gid -> new CompactionJobPriorityQueue(gid, queueSize, ResolvedCompactionJob.WEIGHER)); + pq.add(extent, jobs, currentGenerations.get(DataLevel.of(extent.tableId())).get()); } public void resetMaxSize(long size) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/ResolvedCompactionJob.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/ResolvedCompactionJob.java new file mode 100644 index 00000000000..e2cb48d0c9a --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/ResolvedCompactionJob.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; + +/** + * This class takes a compaction job and the tablet metadata from which is was generated and + * computes all information that is needed to actually run the compaction. A goal of this class is + * to avoid referencing anything that is not needed. For example only a subset of the information + * from TabletMetadata is needed, therefore a reference to the TabletMetadata object should not be + * kept. This object will placed on a queue for compaction and that is why it should avoid using + * more memory than needed. Before this class was created queued compaction jobs would reference a + * tablet metadata object which included all of the tablets files. When tablets had lots of file and + * lots compactions against them this could use lots of memory and make the manager unstable at time + * when things were already bad. + */ +public class ResolvedCompactionJob implements CompactionJob { + + // The fate id of the table compaction that selected files + private final FateId selectedFateId; + private final Map jobFiles; + private final CompactionKind kind; + private final boolean compactingAll; + private final KeyExtent extent; + private final short priority; + private final CompactorGroupId group; + private final String tabletDir; + private final boolean overlapsSelectedFiles; + + private static long weigh(Range range) { + long estDataSize = 0; + if (range != null) { + var staryKey = range.getStartKey(); + estDataSize += staryKey == null ? 0 : staryKey.getSize(); + var endKey = range.getEndKey(); + estDataSize += endKey == null ? 0 : endKey.getSize(); + } + return estDataSize; + } + + public static final SizeTrackingTreeMap.Weigher WEIGHER = job -> { + if (job instanceof ResolvedCompactionJob) { + var rcj = (ResolvedCompactionJob) job; + long estDataSize = 0; + if (rcj.selectedFateId != null) { + estDataSize += rcj.selectedFateId.canonical().length(); + } + for (var file : rcj.jobFiles.keySet()) { + estDataSize += file.getMetadataPath().length(); + estDataSize += 24; // There are three longs in DataFileValue + estDataSize += weigh(file.getRange()); + } + + estDataSize += rcj.group.canonical().length(); + estDataSize += rcj.extent.tableId().canonical().length(); + estDataSize += rcj.extent.prevEndRow() == null ? 0 : rcj.extent.prevEndRow().getLength(); + estDataSize += rcj.extent.endRow() == null ? 0 : rcj.extent.endRow().getLength(); + estDataSize += rcj.tabletDir.length(); + // Guess at how many bytes the overhead at things like pointers and primitives are taking in + // this object. + estDataSize += 64; + return estDataSize; + } else { + // Do not know the concrete type so weigh based on the interface methods of CompaactionJob + long estDataSize = 0; + for (var compactableFile : job.getFiles()) { + estDataSize += compactableFile.getUri().toString().length(); + estDataSize += 16; // There are two longs on compactableFile, this accounts for those + estDataSize += weigh(compactableFile.getRange()); + } + + estDataSize += job.getGroup().canonical().length(); + + return estDataSize; + } + }; + + public ResolvedCompactionJob(CompactionJob job, TabletMetadata tabletMetadata) { + this.jobFiles = new HashMap<>(); + for (CompactableFile cfile : job.getFiles()) { + var stFile = CompactableFileImpl.toStoredTabletFile(cfile); + var dfv = tabletMetadata.getFilesMap().get(stFile); + jobFiles.put(stFile, dfv); + } + + this.kind = job.getKind(); + + if (job.getKind() == CompactionKind.USER) { + this.selectedFateId = tabletMetadata.getSelectedFiles().getFateId(); + this.compactingAll = tabletMetadata.getSelectedFiles().initiallySelectedAll() + && tabletMetadata.getSelectedFiles().getFiles().equals(jobFiles.keySet()); + this.overlapsSelectedFiles = false; + } else { + this.selectedFateId = null; + this.compactingAll = tabletMetadata.getFiles().equals(jobFiles.keySet()); + var selected = tabletMetadata.getSelectedFiles(); + if (selected == null) { + this.overlapsSelectedFiles = false; + } else { + this.overlapsSelectedFiles = + !Collections.disjoint(this.jobFiles.keySet(), selected.getFiles()); + } + } + + this.priority = job.getPriority(); + this.group = job.getGroup(); + + this.extent = tabletMetadata.getExtent(); + this.tabletDir = tabletMetadata.getDirName(); + } + + public FateId getSelectedFateId() { + return selectedFateId; + } + + public Set getJobFiles() { + return jobFiles.keySet(); + } + + public Map getJobFilesMap() { + return jobFiles; + } + + public Set getCompactionFiles() { + return jobFiles.entrySet().stream().map(e -> new CompactableFileImpl(e.getKey(), e.getValue())) + .collect(Collectors.toSet()); + } + + @Override + public CompactionKind getKind() { + return kind; + } + + public boolean isCompactingAll() { + return compactingAll; + } + + public KeyExtent getExtent() { + return extent; + } + + public String getTabletDir() { + return tabletDir; + } + + @Override + public short getPriority() { + return priority; + } + + @Override + public CompactorGroupId getGroup() { + return group; + } + + @Override + public Set getFiles() { + return getCompactionFiles(); + } + + public boolean isOverlapsSelectedFiles() { + return overlapsSelectedFiles; + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java index 306a56fb647..ab1e8f5f521 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java @@ -28,7 +28,7 @@ * This class wraps a treemap and tracks the data size of everything added and removed from the * treemap. */ -class SizeTrackingTreeMap { +public class SizeTrackingTreeMap { private static class ValueWrapper { final V2 val; diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 95ebdb8b47e..dd3d48eddc0 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -19,22 +19,14 @@ package org.apache.accumulo.manager.compaction; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; -import static org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator.canReserveCompaction; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -46,7 +38,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.servers.ServerId; @@ -63,17 +54,10 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; -import org.apache.accumulo.core.manager.state.tables.TableState; -import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.ReferencedTabletFile; -import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; -import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletOperationId; -import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; @@ -90,7 +74,7 @@ import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; -import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob; +import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.security.AuditedSecurityOperation; @@ -197,36 +181,30 @@ protected Set getRunningCompactors() { } @Override - protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress, - ExternalCompactionId externalCompactionId) { - return createExternalCompactionMetadata(metaJob.getJob(), - metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) - .collect(Collectors.toSet()), - metaJob.getTabletMetadata(), compactorAddress, externalCompactionId); + protected CompactionMetadata reserveCompaction(ResolvedCompactionJob rcJob, + String compactorAddress, ExternalCompactionId externalCompactionId) { + return createExternalCompactionMetadata(rcJob, compactorAddress, externalCompactionId); } @Override - protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, - Set jobFiles, TabletMetadata tablet, String compactorAddress, - ExternalCompactionId externalCompactionId) { - FateInstanceType type = FateInstanceType.fromTableId(tablet.getExtent().tableId()); + protected CompactionMetadata createExternalCompactionMetadata(ResolvedCompactionJob job, + String compactorAddress, ExternalCompactionId externalCompactionId) { + FateInstanceType type = FateInstanceType.fromTableId(job.getExtent().tableId()); FateId fateId = FateId.from(type, UUID.randomUUID()); - return new CompactionMetadata(jobFiles, + return new CompactionMetadata(job.getJobFiles(), new ReferencedTabletFile(new Path("file:///accumulo/tables/1/default_tablet/F00001.rf")), compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), true, fateId); } @Override protected TExternalCompactionJob createThriftJob(String externalCompactionId, - CompactionMetadata ecm, MetaJob metaJob, Optional compactionConfig) { - return new TExternalCompactionJob(externalCompactionId, - metaJob.getTabletMetadata().getExtent().toThrift(), List.of(), - SystemIteratorUtil.toIteratorConfig(List.of()), + CompactionMetadata ecm, ResolvedCompactionJob rcJob, + Optional compactionConfig) { + return new TExternalCompactionJob(externalCompactionId, rcJob.getExtent().toThrift(), + List.of(), SystemIteratorUtil.toIteratorConfig(List.of()), ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), TCompactionKind.valueOf(ecm.getKind().name()), - FateId - .from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getExtent().tableId()), - UUID.randomUUID()) + FateId.from(FateInstanceType.fromTableId(rcJob.getExtent().tableId()), UUID.randomUUID()) .toThrift(), Map.of()); } @@ -336,6 +314,7 @@ public void testGetCompactionJob() throws Exception { expect(tm.getExtent()).andReturn(ke).anyTimes(); expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes(); + expect(tm.getDirName()).andReturn("t-00001").anyTimes(); Manager manager = EasyMock.createNiceMock(Manager.class); expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) .anyTimes(); @@ -354,8 +333,8 @@ public void testGetCompactionJob() throws Exception { assertEquals(0, coordinator.getRunning().size()); // Add a job to the job queue - CompactionJob job = new CompactionJobImpl((short) 1, GROUP_ID, Collections.emptyList(), - CompactionKind.SYSTEM, Optional.of(true)); + CompactionJob job = + new CompactionJobImpl((short) 1, GROUP_ID, Collections.emptyList(), CompactionKind.SYSTEM); coordinator.addJobs(tm, Collections.singleton(job)); CompactionJobPriorityQueue queue = coordinator.getJobQueues().getQueue(GROUP_ID); assertEquals(1, queue.getQueuedJobs()); @@ -447,169 +426,4 @@ public void testCleanUpRunning() throws Exception { EasyMock.verify(context, creds, security); } - - @Test - public void testCanReserve() throws Exception { - TableId tableId1 = TableId.of("5"); - TableId tableId2 = TableId.of("6"); - - var file1 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00001.rf")); - var file2 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00002.rf")); - var file3 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00003.rf")); - var file4 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00004.rf")); - - ServerContext context = EasyMock.mock(ServerContext.class); - EasyMock.expect(context.getTableState(tableId1)).andReturn(TableState.ONLINE).atLeastOnce(); - EasyMock.expect(context.getTableState(tableId2)).andReturn(TableState.OFFLINE).atLeastOnce(); - - TableConfiguration tableConf = EasyMock.createMock(TableConfiguration.class); - EasyMock.expect(tableConf.getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION)) - .andReturn(100L).atLeastOnce(); - - EasyMock.expect(context.getTableConfiguration(anyObject())).andReturn(tableConf).atLeastOnce(); - - FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); - - CompactorGroupId cgid = CompactorGroupId.of("G1"); - ReferencedTabletFile tmp1 = - ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00005.rf_tmp")); - CompactionMetadata cm1 = new CompactionMetadata(Set.of(file1, file2), tmp1, "localhost:4444", - CompactionKind.SYSTEM, (short) 5, cgid, false, null); - - ReferencedTabletFile tmp2 = - ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00006.rf_tmp")); - CompactionMetadata cm2 = new CompactionMetadata(Set.of(file3), tmp2, "localhost:5555", - CompactionKind.USER, (short) 5, cgid, false, fateId1); - - EasyMock.replay(context, tableConf); - - KeyExtent extent1 = new KeyExtent(tableId1, null, null); - - var dfv = new DataFileValue(1000, 100); - - var cid1 = ExternalCompactionId.generate(UUID.randomUUID()); - var cid2 = ExternalCompactionId.generate(UUID.randomUUID()); - - var selectedWithoutComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, - SteadyTime.from(100, TimeUnit.SECONDS)); - var selectedWithComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, 1, - SteadyTime.from(100, TimeUnit.SECONDS)); - - var time = SteadyTime.from(1000, TimeUnit.SECONDS); - - // should not be able to compact an offline table - var tabletOffline = TabletMetadata.builder(new KeyExtent(tableId2, null, null)) - .putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) - .build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); - assertFalse(canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, Set.of(file1, file2), - context, time)); - - // nothing should prevent this compaction - var tablet1 = - TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) - .putFile(file4, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); - assertTrue( - canReserveCompaction(tablet1, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); - - // should not be able to do a user compaction unless selected files are present - assertFalse( - canReserveCompaction(tablet1, CompactionKind.USER, Set.of(file1, file2), context, time)); - - // should not be able to compact a tablet with user compaction request in place - var tablet3 = - TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) - .putFile(file4, dfv).putUserCompactionRequested(fateId1).build(OPID, ECOMP, SELECTED); - assertFalse( - canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); - - // should not be able to compact a tablet when the job has files not present in the tablet - var tablet4 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) - .putFile(file3, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); - assertFalse(canReserveCompaction(tablet4, CompactionKind.SYSTEM, Set.of(file1, file2, file4), - context, time)); - - // should not be able to compact a tablet with an operation id present - TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); - var tablet5 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) - .putFile(file3, dfv).putFile(file4, dfv).putOperation(opid) - .build(ECOMP, USER_COMPACTION_REQUESTED, SELECTED); - assertFalse( - canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); - - // should not be able to compact a tablet if the job files overlaps with running compactions - var tablet6 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) - .putFile(file3, dfv).putFile(file4, dfv).putExternalCompaction(cid1, cm1) - .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED, SELECTED); - assertFalse( - canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); - // should be able to compact the file that is outside of the set of files currently compacting - assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file4), context, time)); - - // create a tablet with a selected set of files - var selTabletWithComp = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) - .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) - .build(OPID, USER_COMPACTION_REQUESTED, ECOMP); - // 0 completed jobs - var selTabletWithoutComp = TabletMetadata.builder(extent1).putFile(file1, dfv) - .putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) - .putSelectedFiles(selectedWithoutComp).build(OPID, USER_COMPACTION_REQUESTED, ECOMP); - - // Should be able to start if no completed and overlap - assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, - Set.of(file1, file2), context, time)); - assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, - Set.of(file3, file4), context, time)); - - // should not be able to start a system compaction if the set of files overlaps with the - // selected files - assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file1, file2), - context, time)); - assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file3, file4), - context, time)); - // should be able to start a system compaction on the set of files not in the selected set - assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file4), - context, time)); - // should be able to start user compactions on files that are selected - assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file2), - context, time)); - assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file2, file3), - context, time)); - assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, - Set.of(file1, file2, file3), context, time)); - // should not be able to start user compactions on files that fall outside of the selected set - assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file4), - context, time)); - assertFalse( - canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file4), context, time)); - assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, - Set.of(file1, file2, file3, file4), context, time)); - - // test selected files and running compaction - var selRunningTablet = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) - .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) - .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED); - // should be able to compact files that are in the selected set and not in the running set - assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file1, file2), - context, time)); - // should not be able to compact because files overlap the running set - assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file2, file3), - context, time)); - // should not be able to start a system compaction if the set of files overlaps with the - // selected files and/or the running set - assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file1, file2), - context, time)); - assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file3, file4), - context, time)); - // should be able to start a system compaction on the set of files not in the selected set - assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file4), context, - time)); - - // should not be able to compact a tablet that does not exists - assertFalse( - canReserveCompaction(null, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); - assertFalse( - canReserveCompaction(null, CompactionKind.USER, Set.of(file1, file2), context, time)); - - EasyMock.verify(context); - } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionReservationCheckTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionReservationCheckTest.java new file mode 100644 index 00000000000..65540deea0b --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionReservationCheckTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.time.Duration; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.manager.compaction.coordinator.CompactionReservationCheck; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Test; + +public class CompactionReservationCheckTest { + @Test + public void testCanReserve() throws Exception { + TableId tableId1 = TableId.of("5"); + + var file1 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00001.rf")); + var file2 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00002.rf")); + var file3 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00003.rf")); + var file4 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00004.rf")); + + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + CompactorGroupId cgid = CompactorGroupId.of("G1"); + ReferencedTabletFile tmp1 = + ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00005.rf_tmp")); + CompactionMetadata cm1 = new CompactionMetadata(Set.of(file1, file2), tmp1, "localhost:4444", + CompactionKind.SYSTEM, (short) 5, cgid, false, null); + + ReferencedTabletFile tmp2 = + ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00006.rf_tmp")); + CompactionMetadata cm2 = new CompactionMetadata(Set.of(file3), tmp2, "localhost:5555", + CompactionKind.USER, (short) 5, cgid, false, fateId1); + + KeyExtent extent1 = new KeyExtent(tableId1, null, null); + + var dfv = new DataFileValue(1000, 100); + + var cid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var cid2 = ExternalCompactionId.generate(UUID.randomUUID()); + + var selectedWithoutComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, + SteadyTime.from(100, TimeUnit.SECONDS)); + var selectedWithComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, 1, + SteadyTime.from(100, TimeUnit.SECONDS)); + + var time = SteadyTime.from(1000, TimeUnit.SECONDS); + + // nothing should prevent this compaction + var tablet1 = + TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) + .putFile(file4, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertTrue(canReserveSystem(tablet1, CompactionKind.SYSTEM, Set.of(file1, file2), false, time)); + + // should not be able to do a user compaction unless selected files are present + assertFalse(canReserveUser(tablet1, CompactionKind.USER, Set.of(file1, file2), fateId1, time)); + + // should not be able to compact a tablet with user compaction request in place + var tablet3 = + TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) + .putFile(file4, dfv).putUserCompactionRequested(fateId1).build(OPID, ECOMP, SELECTED); + assertFalse( + canReserveSystem(tablet3, CompactionKind.SYSTEM, Set.of(file1, file2), false, time)); + + // should not be able to compact a tablet when the job has files not present in the tablet + var tablet4 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse( + canReserveSystem(tablet4, CompactionKind.SYSTEM, Set.of(file1, file2, file4), false, time)); + + // should not be able to compact a tablet with an operation id present + TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); + var tablet5 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putOperation(opid) + .build(ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse( + canReserveSystem(tablet5, CompactionKind.SYSTEM, Set.of(file1, file2), false, time)); + + // should not be able to compact a tablet if the job files overlaps with running compactions + var tablet6 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putExternalCompaction(cid1, cm1) + .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse( + canReserveSystem(tablet6, CompactionKind.SYSTEM, Set.of(file1, file2), false, time)); + // should be able to compact the file that is outside of the set of files currently compacting + assertTrue(canReserveSystem(tablet6, CompactionKind.SYSTEM, Set.of(file4), false, time)); + + // create a tablet with a selected set of files + var selTabletWithComp = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) + .build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + // 0 completed jobs + var selTabletWithoutComp = TabletMetadata.builder(extent1).putFile(file1, dfv) + .putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) + .putSelectedFiles(selectedWithoutComp).build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + + // Should be able to start if no completed and overlap + assertTrue(canReserveSystem(selTabletWithoutComp, CompactionKind.SYSTEM, Set.of(file1, file2), + true, time)); + assertTrue(canReserveSystem(selTabletWithoutComp, CompactionKind.SYSTEM, Set.of(file3, file4), + true, time)); + // Should not be able to start a system compaction that overlaps with selected files that have + // not expired. + assertFalse(canReserveSystem(selTabletWithoutComp, CompactionKind.SYSTEM, Set.of(file3, file4), + true, selectedWithoutComp.getSelectedTime().plus(Duration.ofMillis(5)))); + // When not deleting the selected files column can not overlap with selected set at all. It does + // not matter if the selected set if eligible for deletion. In this test case the selected set + // is eligible for deletion. + assertFalse(canReserveSystem(selTabletWithoutComp, CompactionKind.SYSTEM, Set.of(file3, file4), + false, time)); + // When deleting the selected files column the files being compacted must overlap with it. In + // this case there is no overlap and the reservation should fail. This means there was a change + // since the job was generated and the decision it made to delete the selected set is no longer + // valid. + assertFalse( + canReserveSystem(selTabletWithoutComp, CompactionKind.SYSTEM, Set.of(file4), true, time)); + + // should not be able to start a system compaction if the set of files overlaps with the + // selected files + assertFalse(canReserveSystem(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file1, file2), + true, time)); + assertFalse(canReserveSystem(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file3, file4), + true, time)); + // should be able to start a system compaction on the set of files not in the selected set + assertTrue( + canReserveSystem(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file4), false, time)); + // should be able to start user compactions on files that are selected + assertTrue(canReserveUser(selTabletWithComp, CompactionKind.USER, Set.of(file1, file2), fateId1, + time)); + assertTrue(canReserveUser(selTabletWithComp, CompactionKind.USER, Set.of(file2, file3), fateId1, + time)); + assertTrue(canReserveUser(selTabletWithComp, CompactionKind.USER, Set.of(file1, file2, file3), + fateId1, time)); + // should not be able to start user compactions on files that fall outside of the selected set + assertFalse(canReserveUser(selTabletWithComp, CompactionKind.USER, Set.of(file1, file4), + fateId1, time)); + assertFalse( + canReserveUser(selTabletWithComp, CompactionKind.USER, Set.of(file4), fateId1, time)); + assertFalse(canReserveUser(selTabletWithComp, CompactionKind.USER, + Set.of(file1, file2, file3, file4), fateId1, time)); + + // test selected files and running compaction + var selRunningTablet = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) + .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED); + // should be able to compact files that are in the selected set and not in the running set + assertTrue( + canReserveUser(selRunningTablet, CompactionKind.USER, Set.of(file1, file2), fateId1, time)); + // should not be able to compact when the fateId used to generate the compaction job does not + // match the fate id stored in the selected file set. + assertFalse( + canReserveUser(selRunningTablet, CompactionKind.USER, Set.of(file1, file2), fateId2, time)); + // should not be able to compact because files overlap the running set + assertFalse( + canReserveUser(selRunningTablet, CompactionKind.USER, Set.of(file2, file3), fateId1, time)); + // should not be able to start a system compaction if the set of files overlaps with the + // selected files and/or the running set + assertFalse(canReserveSystem(selRunningTablet, CompactionKind.SYSTEM, Set.of(file1, file2), + true, time)); + assertFalse(canReserveSystem(selRunningTablet, CompactionKind.SYSTEM, Set.of(file3, file4), + true, time)); + // should be able to start a system compaction on the set of files not in the selected set + assertTrue( + canReserveSystem(selRunningTablet, CompactionKind.SYSTEM, Set.of(file4), false, time)); + + // should not be able to compact a tablet that does not exists + assertThrows(NullPointerException.class, + () -> canReserveSystem(null, CompactionKind.SYSTEM, Set.of(file1, file2), false, time)); + assertThrows(NullPointerException.class, + () -> canReserveUser(null, CompactionKind.USER, Set.of(file1, file2), fateId1, time)); + } + + private boolean canReserveSystem(TabletMetadata tabletMetadata, CompactionKind kind, + Set jobFiles, boolean deletingSelected, SteadyTime steadyTime) { + var check = new CompactionReservationCheck(CompactionKind.SYSTEM, jobFiles, null, + deletingSelected, steadyTime, 100L); + + return check.canUpdate(tabletMetadata); + } + + private boolean canReserveUser(TabletMetadata tabletMetadata, CompactionKind kind, + Set jobFiles, FateId selectedId, SteadyTime steadyTime) { + var check = new CompactionReservationCheck(CompactionKind.USER, jobFiles, selectedId, false, + steadyTime, 100L); + + return check.canUpdate(tabletMetadata); + } + +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index d7b067f049f..82f3f8997b0 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -35,13 +35,11 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue.CompactionJobPriorityQueueStats; -import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob; import org.apache.hadoop.io.Text; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; @@ -59,8 +57,6 @@ public void testTabletFileReplacement() { CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); - TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); @@ -72,14 +68,14 @@ public void testTabletFileReplacement() { EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); - EasyMock.replay(tm, cj1, cj2); + EasyMock.replay(cj1, cj2); CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); - assertEquals(1, queue.add(tm, List.of(cj1), 1L)); + assertEquals(1, queue.add(extent, List.of(cj1), 1L)); - MetaJob job = queue.peek(); - assertEquals(cj1, job.getJob()); - assertEquals(Set.of(file1), job.getJob().getFiles()); + CompactionJob job = queue.peek(); + assertEquals(cj1, job); + assertEquals(Set.of(file1), job.getFiles()); assertEquals(10L, queue.getLowestPriority()); assertEquals(2, queue.getMaxSize()); @@ -88,12 +84,11 @@ public void testTabletFileReplacement() { assertEquals(1, queue.getQueuedJobs()); // replace the files for the same tablet - assertEquals(1, queue.add(tm, List.of(cj2), 1L)); + assertEquals(1, queue.add(extent, List.of(cj2), 1L)); job = queue.peek(); - assertEquals(cj2, job.getJob()); - assertEquals(Set.of(file2, file3, file4), job.getJob().getFiles()); - assertEquals(tm, job.getTabletMetadata()); + assertEquals(cj2, job); + assertEquals(Set.of(file2, file3, file4), job.getFiles()); assertEquals(5L, queue.getLowestPriority()); assertEquals(2, queue.getMaxSize()); @@ -101,7 +96,7 @@ public void testTabletFileReplacement() { assertEquals(0, queue.getRejectedJobs()); assertEquals(1, queue.getQueuedJobs()); - EasyMock.verify(tm, cj1, cj2); + EasyMock.verify(cj1, cj2); } @@ -114,8 +109,6 @@ public void testAddEqualToMaxSize() { CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); - TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); @@ -127,26 +120,24 @@ public void testAddEqualToMaxSize() { EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); - EasyMock.replay(tm, cj1, cj2); + EasyMock.replay(cj1, cj2); CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); - assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); + assertEquals(2, queue.add(extent, List.of(cj1, cj2), 1L)); - EasyMock.verify(tm, cj1, cj2); + EasyMock.verify(cj1, cj2); assertEquals(5L, queue.getLowestPriority()); assertEquals(2, queue.getMaxSize()); assertEquals(0, queue.getDequeuedJobs()); assertEquals(0, queue.getRejectedJobs()); assertEquals(2, queue.getQueuedJobs()); - MetaJob job = queue.poll(); - assertEquals(cj1, job.getJob()); - assertEquals(tm, job.getTabletMetadata()); + CompactionJob job = queue.poll(); + assertEquals(cj1, job); assertEquals(1, queue.getDequeuedJobs()); job = queue.poll(); - assertEquals(cj2, job.getJob()); - assertEquals(tm, job.getTabletMetadata()); + assertEquals(cj2, job); assertEquals(2, queue.getDequeuedJobs()); job = queue.poll(); @@ -166,8 +157,6 @@ public void testAddMoreThanMax() { CompactableFile file6 = EasyMock.createMock(CompactableFileImpl.class); KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); - TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); @@ -184,12 +173,12 @@ public void testAddMoreThanMax() { EasyMock.expect(cj3.getPriority()).andReturn((short) 1).anyTimes(); EasyMock.expect(cj3.getFiles()).andReturn(Set.of(file5, file6)).anyTimes(); - EasyMock.replay(tm, cj1, cj2, cj3); + EasyMock.replay(cj1, cj2, cj3); CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); - assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L)); + assertEquals(2, queue.add(extent, List.of(cj1, cj2, cj3), 1L)); - EasyMock.verify(tm, cj1, cj2, cj3); + EasyMock.verify(cj1, cj2, cj3); assertEquals(5L, queue.getLowestPriority()); assertEquals(2, queue.getMaxSize()); @@ -199,16 +188,15 @@ public void testAddMoreThanMax() { // One tablet was added with jobs assertEquals(1, queue.getJobAges().size()); - MetaJob job = queue.poll(); - assertEquals(cj1, job.getJob()); - assertEquals(tm, job.getTabletMetadata()); + CompactionJob job = queue.poll(); + // assertEquals(cj1, job.getJob()); + // assertEquals(tm, job.getTabletMetadata()); assertEquals(1, queue.getDequeuedJobs()); // still 1 job left so should still have a timer assertEquals(1, queue.getJobAges().size()); job = queue.poll(); - assertEquals(cj2, job.getJob()); - assertEquals(tm, job.getTabletMetadata()); + assertEquals(cj2, job); assertEquals(2, queue.getDequeuedJobs()); // no more jobs so timer should be gone assertTrue(queue.getJobAges().isEmpty()); @@ -220,7 +208,7 @@ public void testAddMoreThanMax() { private static int counter = 1; - private Pair createJob() { + private Pair createJob() { // Use an ever increasing tableId KeyExtent extent = new KeyExtent(TableId.of("" + counter++), new Text("z"), new Text("a")); @@ -231,15 +219,13 @@ private Pair createJob() { } CompactionJob job = EasyMock.createMock(CompactionJob.class); - TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); EasyMock.expect(job.getGroup()).andReturn(GROUP).anyTimes(); EasyMock.expect(job.getPriority()).andReturn((short) counter).anyTimes(); EasyMock.expect(job.getFiles()).andReturn(files).anyTimes(); - EasyMock.replay(tm, job); + EasyMock.replay(job); - return new Pair<>(tm, job); + return new Pair<>(extent, job); } @Test @@ -251,7 +237,7 @@ public void test() { // create and add 1000 jobs for (int x = 0; x < 1000; x++) { - Pair pair = createJob(); + Pair pair = createJob(); queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L); expected.add(pair.getSecond()); } @@ -272,12 +258,12 @@ public void test() { // matches int matchesSeen = 0; for (CompactionJob expectedJob : expected) { - MetaJob queuedJob = queue.poll(); + CompactionJob queuedJob = queue.poll(); if (queuedJob == null) { break; } - assertEquals(expectedJob.getPriority(), queuedJob.getJob().getPriority()); - assertEquals(expectedJob.getFiles(), queuedJob.getJob().getFiles()); + assertEquals(expectedJob.getPriority(), queuedJob.getPriority()); + assertEquals(expectedJob.getFiles(), queuedJob.getFiles()); matchesSeen++; } @@ -314,7 +300,7 @@ public void test() { public void testAsyncCancelCleanup() { CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100, mj -> 1); - List> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); int maxFuturesSize = 0; @@ -349,7 +335,7 @@ public void testResetMaxSize() { // create and add 200 jobs, because of the queue size 100 should be dropped. for (int x = 0; x < 200; x++) { - Pair pair = createJob(); + Pair pair = createJob(); queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L); expected.add(pair.getSecond()); } @@ -371,12 +357,12 @@ public void testResetMaxSize() { // ensure what is left in the queue is the 50 highest priority jobs int matchesSeen = 0; for (CompactionJob expectedJob : expected) { - MetaJob queuedJob = queue.poll(); + var queuedJob = queue.poll(); if (queuedJob == null) { break; } - assertEquals(expectedJob.getPriority(), queuedJob.getJob().getPriority()); - assertEquals(expectedJob.getFiles(), queuedJob.getJob().getFiles()); + assertEquals(expectedJob.getPriority(), queuedJob.getPriority()); + assertEquals(expectedJob.getFiles(), queuedJob.getFiles()); matchesSeen++; } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 9d3592c893e..be1dc41152f 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -30,7 +30,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -47,7 +46,6 @@ import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; @@ -61,7 +59,7 @@ private CompactionJob newJob(short prio, int file, CompactorGroupId cgi) throws URISyntaxException { Collection files = List .of(new CompactableFileImpl(new URI("file://accumulo/tables//123/t-0/f" + file), 100, 100)); - return new CompactionJobImpl(prio, cgi, files, CompactionKind.SYSTEM, Optional.empty()); + return new CompactionJobImpl(prio, cgi, files, CompactionKind.SYSTEM); } @Test @@ -73,11 +71,6 @@ public void testFullScanHandling() throws Exception { var extent3 = new KeyExtent(tid, new Text("l"), new Text("c")); var extent4 = new KeyExtent(tid, new Text("c"), new Text("a")); - var tm1 = TabletMetadata.builder(extent1).build(); - var tm2 = TabletMetadata.builder(extent2).build(); - var tm3 = TabletMetadata.builder(extent3).build(); - var tm4 = TabletMetadata.builder(extent4).build(); - var cg1 = CompactorGroupId.of("CG1"); var cg2 = CompactorGroupId.of("CG2"); var cg3 = CompactorGroupId.of("CG3"); @@ -86,15 +79,15 @@ public void testFullScanHandling() throws Exception { jobQueues.beginFullScan(DataLevel.USER); - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); - jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); - jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); - jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(extent2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(extent3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(extent4, List.of(newJob((short) 4, 8, cg1))); - jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2))); - jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2))); - jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2))); - jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2))); + jobQueues.add(extent1, List.of(newJob((short) 4, 1, cg2))); + jobQueues.add(extent2, List.of(newJob((short) 3, 2, cg2))); + jobQueues.add(extent3, List.of(newJob((short) 2, 3, cg2))); + jobQueues.add(extent4, List.of(newJob((short) 1, 4, cg2))); jobQueues.endFullScan(DataLevel.USER); @@ -102,8 +95,8 @@ public void testFullScanHandling() throws Exception { assertEquals(4, jobQueues.getQueuedJobs(cg2)); assertEquals(0, jobQueues.getQueuedJobs(cg3)); - assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent()); - assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + assertEquals(4, jobQueues.poll(cg1).getPriority()); + assertEquals(4, jobQueues.poll(cg2).getPriority()); assertEquals(3, jobQueues.getQueuedJobs(cg1)); assertEquals(3, jobQueues.getQueuedJobs(cg2)); @@ -112,17 +105,17 @@ public void testFullScanHandling() throws Exception { jobQueues.beginFullScan(DataLevel.USER); // should still be able to poll and get things added in the last full scan - assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent()); + assertEquals(3, jobQueues.poll(cg1).getPriority()); assertEquals(2, jobQueues.getQueuedJobs(cg1)); assertEquals(3, jobQueues.getQueuedJobs(cg2)); // add something new during the full scan - jobQueues.add(tm1, List.of(newJob((short) -7, 9, cg2))); + jobQueues.add(extent1, List.of(newJob((short) -7, 9, cg2))); assertEquals(2, jobQueues.getQueuedJobs(cg1)); assertEquals(4, jobQueues.getQueuedJobs(cg2)); // should still be able to poll and get things added in the last full scan - assertEquals(extent2, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + assertEquals(3, jobQueues.poll(cg2).getPriority()); assertEquals(2, jobQueues.getQueuedJobs(cg1)); assertEquals(3, jobQueues.getQueuedJobs(cg2)); @@ -134,18 +127,18 @@ public void testFullScanHandling() throws Exception { assertEquals(0, jobQueues.getQueuedJobs(cg3)); assertNull(jobQueues.poll(cg1)); - assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + assertEquals(-7, jobQueues.poll(cg2).getPriority()); assertEquals(0, jobQueues.getQueuedJobs(cg1)); assertEquals(0, jobQueues.getQueuedJobs(cg2)); assertEquals(0, jobQueues.getQueuedJobs(cg3)); // add some things outside of a begin/end full scan calls - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); - jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(extent2, List.of(newJob((short) 2, 6, cg1))); - jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2))); - jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2))); + jobQueues.add(extent1, List.of(newJob((short) 4, 1, cg2))); + jobQueues.add(extent2, List.of(newJob((short) 3, 2, cg2))); jobQueues.beginFullScan(DataLevel.USER); @@ -154,19 +147,19 @@ public void testFullScanHandling() throws Exception { assertEquals(0, jobQueues.getQueuedJobs(cg3)); // add some things inside the begin/end full scan calls - jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); - jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + jobQueues.add(extent3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(extent4, List.of(newJob((short) 4, 8, cg1))); - jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2))); - jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2))); + jobQueues.add(extent3, List.of(newJob((short) 2, 3, cg2))); + jobQueues.add(extent4, List.of(newJob((short) 1, 4, cg2))); assertEquals(4, jobQueues.getQueuedJobs(cg1)); assertEquals(4, jobQueues.getQueuedJobs(cg2)); assertEquals(0, jobQueues.getQueuedJobs(cg3)); // poll inside the full scan calls - assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent()); - assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + assertEquals(4, jobQueues.poll(cg1).getPriority()); + assertEquals(4, jobQueues.poll(cg2).getPriority()); assertEquals(3, jobQueues.getQueuedJobs(cg1)); assertEquals(3, jobQueues.getQueuedJobs(cg2)); @@ -179,9 +172,9 @@ public void testFullScanHandling() throws Exception { assertEquals(2, jobQueues.getQueuedJobs(cg2)); assertEquals(0, jobQueues.getQueuedJobs(cg3)); - assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent()); - assertEquals(extent3, jobQueues.poll(cg2).getTabletMetadata().getExtent()); - assertEquals(extent4, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + assertEquals(3, jobQueues.poll(cg1).getPriority()); + assertEquals(2, jobQueues.poll(cg2).getPriority()); + assertEquals(1, jobQueues.poll(cg2).getPriority()); assertNull(jobQueues.poll(cg1)); assertNull(jobQueues.poll(cg2)); @@ -192,22 +185,22 @@ public void testFullScanHandling() throws Exception { assertEquals(0, jobQueues.getQueuedJobs(cg3)); // add jobs outside of begin/end full scan - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); - jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); - jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); - jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(extent2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(extent3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(extent4, List.of(newJob((short) 4, 8, cg1))); - jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2))); - jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2))); - jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2))); - jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2))); + jobQueues.add(extent1, List.of(newJob((short) 4, 1, cg2))); + jobQueues.add(extent2, List.of(newJob((short) 3, 2, cg2))); + jobQueues.add(extent3, List.of(newJob((short) 2, 3, cg2))); + jobQueues.add(extent4, List.of(newJob((short) 1, 4, cg2))); jobQueues.beginFullScan(DataLevel.USER); // readd some of the tablets added before the beginFullScan, this should prevent those tablets // from being removed by endFullScan - jobQueues.add(tm4, List.of(newJob((short) 5, 5, cg2))); - jobQueues.add(tm1, List.of(newJob((short) -7, 5, cg2))); + jobQueues.add(extent4, List.of(newJob((short) 5, 5, cg2))); + jobQueues.add(extent1, List.of(newJob((short) -7, 5, cg2))); assertEquals(4, jobQueues.getQueuedJobs(cg1)); assertEquals(4, jobQueues.getQueuedJobs(cg2)); @@ -221,8 +214,8 @@ public void testFullScanHandling() throws Exception { assertEquals(0, jobQueues.getQueuedJobs(cg3)); // make sure we see what was added last for the tablets - assertEquals(5, jobQueues.poll(cg2).getJob().getPriority()); - assertEquals(-7, jobQueues.poll(cg2).getJob().getPriority()); + assertEquals(5, jobQueues.poll(cg2).getPriority()); + assertEquals(-7, jobQueues.poll(cg2).getPriority()); assertEquals(0, jobQueues.getQueuedJobs(cg1)); assertEquals(0, jobQueues.getQueuedJobs(cg2)); @@ -241,19 +234,14 @@ public void testFullScanLevels() throws Exception { var meta = new KeyExtent(AccumuloTable.METADATA.tableId(), new Text("l"), new Text("c")); var root = RootTable.EXTENT; - var tm1 = TabletMetadata.builder(extent1).build(); - var tm2 = TabletMetadata.builder(extent2).build(); - var tmm = TabletMetadata.builder(meta).build(); - var tmr = TabletMetadata.builder(root).build(); - var cg1 = CompactorGroupId.of("CG1"); CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); - jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); - jobQueues.add(tmm, List.of(newJob((short) 3, 7, cg1))); - jobQueues.add(tmr, List.of(newJob((short) 4, 8, cg1))); + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(extent2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(meta, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(root, List.of(newJob((short) 4, 8, cg1))); // verify that a begin and end full scan will only drop tablets in its level @@ -322,8 +310,7 @@ public void testAddPollRaceCondition() throws Exception { // Create unique exents because re-adding the same extent will clobber any jobs already in the // queue for that extent which could throw off the counts KeyExtent extent = new KeyExtent(TableId.of("1"), new Text(i + "z"), new Text(i + "a")); - TabletMetadata tm = TabletMetadata.builder(extent).build(); - jobQueues.add(tm, List.of(newJob((short) (i % 31), i, groups[i % groups.length]))); + jobQueues.add(extent, List.of(newJob((short) (i % 31), i, groups[i % groups.length]))); } // Cause the background threads to exit after polling all data @@ -351,26 +338,26 @@ public void testGetAsync() throws Exception { var extent3 = new KeyExtent(tid, new Text("l"), new Text("c")); var extent4 = new KeyExtent(tid, new Text("c"), new Text("a")); - var tm1 = TabletMetadata.builder(extent1).build(); - var tm2 = TabletMetadata.builder(extent2).build(); - var tm3 = TabletMetadata.builder(extent3).build(); - var tm4 = TabletMetadata.builder(extent4).build(); - var cg1 = CompactorGroupId.of("CG1"); + var job1 = newJob((short) 1, 5, cg1); + var job2 = newJob((short) 2, 6, cg1); + var job3 = newJob((short) 3, 7, cg1); + var job4 = newJob((short) 4, 8, cg1); + var future1 = jobQueues.getAsync(cg1); var future2 = jobQueues.getAsync(cg1); assertFalse(future1.isDone()); assertFalse(future2.isDone()); - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); - jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(extent1, List.of(job1)); + jobQueues.add(extent2, List.of(job2)); // Futures were immediately completed so nothing should be queued assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty()); - jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); - jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + jobQueues.add(extent3, List.of(job3)); + jobQueues.add(extent4, List.of(job4)); // No futures available, so jobAges should exist for 2 tablets assertEquals(2, jobQueues.getQueue(cg1).getJobAges().size()); @@ -385,10 +372,10 @@ public void testGetAsync() throws Exception { assertTrue(future3.isDone()); assertTrue(future4.isDone()); - assertEquals(extent1, future1.get().getTabletMetadata().getExtent()); - assertEquals(extent2, future2.get().getTabletMetadata().getExtent()); - assertEquals(extent4, future3.get().getTabletMetadata().getExtent()); - assertEquals(extent3, future4.get().getTabletMetadata().getExtent()); + assertEquals(job1, future1.get()); + assertEquals(job2, future2.get()); + assertEquals(job4, future3.get()); + assertEquals(job3, future4.get()); // test cancelling a future and having a future timeout var future5 = jobQueues.getAsync(cg1); @@ -404,9 +391,10 @@ public void testGetAsync() throws Exception { var future7 = jobQueues.getAsync(cg1); assertFalse(future7.isDone()); // since future5 was canceled and future6 timed out, this addition should go to future7 - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + var job5 = newJob((short) 1, 5, cg1); + jobQueues.add(extent1, List.of(job5)); assertTrue(future7.isDone()); - assertEquals(extent1, future7.get().getTabletMetadata().getExtent()); + assertEquals(job5, future7.get()); assertTrue(future5.isDone()); assertTrue(future6.isCompletedExceptionally()); assertTrue(future6.isDone()); @@ -419,12 +407,10 @@ public void testResetSize() throws Exception { var tid = TableId.of("1"); var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); - var tm1 = TabletMetadata.builder(extent1).build(); - var cg1 = CompactorGroupId.of("CG1"); var cg2 = CompactorGroupId.of("CG2"); - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); assertEquals(Set.of(cg1), jobQueues.getQueueIds()); assertEquals(1000000, jobQueues.getQueueMaxSize(cg1)); @@ -435,7 +421,7 @@ public void testResetSize() throws Exception { assertEquals(500000, jobQueues.getQueueMaxSize(cg1)); // create a new queue and ensure it uses the updated max size - jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg2))); + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg2))); assertEquals(Set.of(cg1, cg2), jobQueues.getQueueIds()); assertEquals(500000, jobQueues.getQueueMaxSize(cg1)); assertEquals(500000, jobQueues.getQueueMaxSize(cg2)); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index 6f937a68adb..a032aeb1918 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -92,9 +92,9 @@ public void run() { // Create tmp output file final TabletMetadata tm = getContext().getAmple() .readTablet(KeyExtent.fromThrift(job.getExtent()), ColumnType.DIR); - ReferencedTabletFile newFile = - TabletNameGenerator.getNextDataFilenameForMajc(job.isPropagateDeletes(), getContext(), - tm, (dir) -> {}, ExternalCompactionId.from(job.getExternalCompactionId())); + ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc( + job.isPropagateDeletes(), getContext(), tm.getExtent(), tm.getDirName(), (dir) -> {}, + ExternalCompactionId.from(job.getExternalCompactionId())); LOG.info("Creating tmp file: {}", newFile.getPath()); getContext().getVolumeManager().createNewFile(newFile.getPath()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index f594f7b9ec7..b965b0b07d2 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -94,6 +94,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder; +import org.apache.accumulo.core.metadata.schema.TabletMetadataCheck; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; @@ -1938,8 +1939,26 @@ public void testRequireAbsentLoaded() { assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); } + public static class TestCheck implements TabletMetadataCheck { + + private Set expectedFiles; + + public TestCheck() {} + + public TestCheck(Set expectedFiles) { + this.expectedFiles = + expectedFiles.stream().map(StoredTabletFile::getMetadata).collect(Collectors.toSet()); + } + + @Override + public boolean canUpdate(TabletMetadata tabletMetadata) { + var files = expectedFiles.stream().map(StoredTabletFile::of).collect(Collectors.toSet()); + return tabletMetadata.getFiles().containsAll(files); + } + } + @Test - public void testRequireNotCompacting() { + public void testMetadataCheck() { var context = cluster.getServerContext(); var stf1 = StoredTabletFile @@ -1948,69 +1967,45 @@ public void testRequireNotCompacting() { .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); var stf3 = StoredTabletFile .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); - var stf4 = StoredTabletFile - .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf")); var dfv = new DataFileValue(100, 100); - // add all four files to tablet try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv) - .putFile(stf3, dfv).putFile(stf4, dfv).submit(tm -> false); + .submit(tm -> false); assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); } + assertEquals(Set.of(stf1, stf2), context.getAmple().readTablet(e1).getFiles()); - var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); - var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); - var compaction1 = createCompaction(Set.of(stf1, stf2)); - var compaction2 = createCompaction(Set.of(stf3)); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireCheckSuccess(new TestCheck(Set.of(stf1, stf3))).putFile(stf3, dfv) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2), context.getAmple().readTablet(e1).getFiles()); - // add first compaction to tablet try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { - ctmi.mutateTablet(e1).requireAbsentOperation().requireNotCompacting(compaction1.getJobFiles()) - .putExternalCompaction(ecid1, compaction1).submit(tm -> false); + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireCheckSuccess(new TestCheck(Set.of(stf1, stf2))).putFile(stf3, dfv) + .submit(tm -> false); assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); } - assertEquals(Set.of(ecid1), - context.getAmple().readTablet(e1).getExternalCompactions().keySet()); + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); - // add second compaction to tablet, there is an existing compaction but the files do not overlap try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { - ctmi.mutateTablet(e1).requireAbsentOperation().requireNotCompacting(compaction2.getJobFiles()) - .putExternalCompaction(ecid2, compaction2).submit(tm -> false); + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireCheckSuccess(new TestCheck(Set.of(stf1, stf3))).deleteFile(stf2) + .submit(tm -> false); assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); } - assertEquals(Set.of(ecid1, ecid2), - context.getAmple().readTablet(e1).getExternalCompactions().keySet()); + assertEquals(Set.of(stf1, stf3), context.getAmple().readTablet(e1).getFiles()); - // try different adding a compaction for different subsets of files that overlap with the - // existing compacting files - for (var compactingFiles : Sets.powerSet(Set.of(stf1, stf2, stf3))) { - if (compactingFiles.isEmpty()) { - continue; - } - // attempt to add a compaction that overlaps with existing compacting files - try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { - // This compactions overlaps files from compaction1 and compaction2 - var compaction3 = createCompaction(compactingFiles); - var ecid3 = ExternalCompactionId.generate(UUID.randomUUID()); - ctmi.mutateTablet(e1).requireAbsentOperation() - .requireNotCompacting(compaction3.getJobFiles()) - .putExternalCompaction(ecid3, compaction3).submit(tm -> false); - assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); - } - assertEquals(Set.of(ecid1, ecid2), - context.getAmple().readTablet(e1).getExternalCompactions().keySet()); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireCheckSuccess(new TestCheck(Set.of(stf1, stf2))).deleteFile(stf1) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); } - } - - private CompactionMetadata createCompaction(Set jobFiles) { - FateInstanceType type = FateInstanceType.fromTableId(tid); - FateId fateId = FateId.from(type, UUID.randomUUID()); - ReferencedTabletFile tmpFile = - ReferencedTabletFile.of(new Path("file:///accumulo/tables/t-0/b-0/c1.rf")); - CompactorGroupId ceid = CompactorGroupId.of("G1"); - CompactionMetadata ecMeta = new CompactionMetadata(jobFiles, tmpFile, "localhost:4444", - CompactionKind.SYSTEM, (short) 2, ceid, false, fateId); - return ecMeta; + assertEquals(Set.of(stf1, stf3), context.getAmple().readTablet(e1).getFiles()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index b7cf0f4bc87..cec692cda11 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -244,7 +244,7 @@ public CompactionPlan makePlan(PlanningParameters params) { LOG.debug("Plan job priority is {}:{}", job.getKind(), job.getPriority()); return new CompactionJobImpl( job.getKind() == CompactionKind.SYSTEM ? Short.MAX_VALUE : job.getPriority(), - job.getGroup(), job.getFiles(), job.getKind(), Optional.empty()); + job.getGroup(), job.getFiles(), job.getKind()); }).collect(toList()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java index af4bcefe5cd..9721c82a987 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java @@ -65,8 +65,9 @@ private Set generateTmpFilePaths(ServerContext context, TableId tid, Path } for (int i = 0; i < numFiles; i++) { - ReferencedTabletFile rtf = TabletNameGenerator.getNextDataFilenameForMajc(false, context, tm, - (s) -> {}, ExternalCompactionId.generate(UUID.randomUUID())); + ReferencedTabletFile rtf = + TabletNameGenerator.getNextDataFilenameForMajc(false, context, tm.getExtent(), + tm.getDirName(), (s) -> {}, ExternalCompactionId.generate(UUID.randomUUID())); paths.add(rtf.getPath()); } return paths;