diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 3828bb80c4b..0b82f73f11c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -57,6 +57,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * Fault tolerant executor */ @@ -334,6 +336,44 @@ public FateId startTransaction() { return store.create(); } + public Optional seedTransaction(String txName, FateKey fateKey, Repo repo, + boolean autoCleanUp, String goalMessage) { + + Optional> optTxStore = store.createAndReserve(fateKey); + + return optTxStore.map(txStore -> { + var fateId = txStore.getID(); + try { + Preconditions.checkState(txStore.getStatus() == NEW); + seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); + } finally { + txStore.unreserve(0, MILLISECONDS); + } + return fateId; + }); + } + + private void seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp, + String goalMessage, FateTxStore txStore) { + if (txStore.top() == null) { + try { + log.info("Seeding {} {}", fateId, goalMessage); + txStore.push(repo); + } catch (StackOverflowException e) { + // this should not happen + throw new IllegalStateException(e); + } + } + + if (autoCleanUp) { + txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); + } + + txStore.setTransactionInfo(TxInfo.TX_NAME, txName); + + txStore.setStatus(SUBMITTED); + } + // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once public void seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp, @@ -341,23 +381,7 @@ public void seedTransaction(String txName, FateId fateId, Repo repo, boolean FateTxStore txStore = store.reserve(fateId); try { if (txStore.getStatus() == NEW) { - if (txStore.top() == null) { - try { - log.info("Seeding {} {}", fateId, goalMessage); - txStore.push(repo); - } catch (StackOverflowException e) { - // this should not happen - throw new IllegalStateException(e); - } - } - - if (autoCleanUp) { - txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); - } - - txStore.setTransactionInfo(TxInfo.TX_NAME, txName); - - txStore.setStatus(SUBMITTED); + seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); } } finally { txStore.unreserve(0, TimeUnit.MILLISECONDS); diff --git a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java index 3c37b68067b..39a96225d41 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java +++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java @@ -45,7 +45,6 @@ public enum CacheName { COMPRESSION_ALGORITHM, CRYPT_PASSWORDS, HOST_REGEX_BALANCER_TABLE_REGEX, - HOSTING_REQUEST_CACHE, INSTANCE_ID, NAMESPACE_ID, PROP_CACHE, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index cf364f87229..ad5aa4710b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -280,9 +280,6 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf "summary partition", emitThreadPoolMetrics); case GC_DELETE_THREADS: return createFixedThreadPool(conf.getCount(p), "deleting", emitThreadPoolMetrics); - case MANAGER_SPLIT_WORKER_THREADS: - return createFixedThreadPool(conf.getCount(p), "tablet split inspection", - emitThreadPoolMetrics); default: throw new IllegalArgumentException("Unhandled thread pool property: " + p); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 9f1deac81e5..e8bdd794a46 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -73,7 +73,7 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.metrics.ManagerMetrics; -import org.apache.accumulo.manager.split.SplitTask; +import org.apache.accumulo.manager.split.SeedSplitTask; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; @@ -521,17 +521,7 @@ private TableMgmtStats manageTablets(Iterator iter, if (actions.contains(ManagementAction.NEEDS_SPLITTING)) { LOG.debug("{} may need splitting.", tm.getExtent()); - if (manager.getSplitter().isSplittable(tm)) { - if (manager.getSplitter().addSplitStarting(tm.getExtent())) { - LOG.debug("submitting tablet {} for split", tm.getExtent()); - manager.getSplitter().executeSplit(new SplitTask(manager.getContext(), tm, manager)); - } - } else { - LOG.debug("{} is not splittable.", tm.getExtent()); - } - // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this commented out code to - // show where merge used to make a call to split a tablet. - // sendSplitRequest(mergeStats.getMergeInfo(), state, tm); + manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent())); } if (actions.contains(ManagementAction.NEEDS_COMPACTING)) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java new file mode 100644 index 00000000000..f63047ee548 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.split; + +import java.util.Optional; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.split.FindSplits; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SeedSplitTask implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(SeedSplitTask.class); + private final Manager manager; + private KeyExtent extent; + + public SeedSplitTask(Manager manager, KeyExtent extent) { + this.manager = manager; + this.extent = extent; + } + + @Override + public void run() { + try { + var fateInstanceType = FateInstanceType.fromTableId((extent.tableId())); + + Optional optFateId = + manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", FateKey.forSplit(extent), + new FindSplits(extent), true, "System initiated split of tablet " + extent); + + optFateId.ifPresentOrElse(fateId -> { + log.trace("System initiated a split for : {} {}", extent, fateId); + }, () -> { + log.trace("System attempted to initiate a split but one was in progress : {}", extent); + }); + + } catch (Exception e) { + log.error("Failed to split {}", extent, e); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java deleted file mode 100644 index 999de0f7e99..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.split; - -import java.time.Duration; -import java.util.SortedSet; - -import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.split.PreSplit; -import org.apache.accumulo.server.ServerContext; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SplitTask implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(SplitTask.class); - private final Manager manager; - - private final ServerContext context; - private TabletMetadata tablet; - private final long creationTime; - - public SplitTask(ServerContext context, TabletMetadata tablet, Manager manager) { - this.context = context; - this.tablet = tablet; - this.manager = manager; - this.creationTime = System.nanoTime(); - } - - @Override - public void run() { - try { - if (Duration.ofNanos(System.nanoTime() - creationTime).compareTo(Duration.ofMinutes(2)) > 0) { - // the tablet was in the thread pool queue for a bit, lets reread its metadata - tablet = manager.getContext().getAmple().readTablet(tablet.getExtent()); - if (tablet == null) { - // the tablet no longer exists - return; - } - } - - if (tablet.getOperationId() != null) { - // This will be checked in the FATE op, but no need to inspect files and start a FATE op if - // it currently has an operation running against it. - log.debug("Not splitting {} because it has operation id {}", tablet.getExtent(), - tablet.getOperationId()); - manager.getSplitter().removeSplitStarting(tablet.getExtent()); - return; - } - - var extent = tablet.getExtent(); - - SortedSet splits = SplitUtils.findSplits(context, tablet); - - if (tablet.getEndRow() != null) { - splits.remove(tablet.getEndRow()); - } - - if (splits.size() == 0) { - log.info("Tablet {} needs to split, but no split points could be found.", - tablet.getExtent()); - - manager.getSplitter().rememberUnsplittable(tablet); - manager.getSplitter().removeSplitStarting(tablet.getExtent()); - return; - } - - var fateInstanceType = FateInstanceType.fromTableId((tablet.getTableId())); - FateId fateId = manager.fate(fateInstanceType).startTransaction(); - - manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", fateId, - new PreSplit(extent, splits), true, - "System initiated split of tablet " + extent + " into " + splits.size() + " splits"); - } catch (Exception e) { - log.error("Failed to split {}", tablet.getExtent(), e); - } - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index aeeebea780a..3acdfe13bfe 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -18,39 +18,29 @@ */ package org.apache.accumulo.manager.split; -import static java.nio.charset.StandardCharsets.UTF_8; - import java.util.Objects; import java.util.Set; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.util.FileUtil; import org.apache.accumulo.server.util.FileUtil.FileInfo; -import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Weigher; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; public class Splitter { - private final ExecutorService splitExecutor; - - Cache splitsStarting; - - Cache unsplittable; + private final ThreadPoolExecutor splitExecutor; private static class CacheKey { @@ -84,52 +74,34 @@ public int hashCode() { LoadingCache splitFileCache; - public static int weigh(KeyExtent keyExtent) { - int size = 0; - size += keyExtent.tableId().toString().length(); - if (keyExtent.endRow() != null) { - size += keyExtent.endRow().getLength(); - } - if (keyExtent.prevEndRow() != null) { - size += keyExtent.prevEndRow().getLength(); - } - return size; - } - public Splitter(ServerContext context) { - this.splitExecutor = context.threadPools().createExecutorService(context.getConfiguration(), - Property.MANAGER_SPLIT_WORKER_THREADS, true); + int numThreads = context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS); + // Set up thread pool that constrains the amount of task it queues and when full discards task. + // The purpose of this is to avoid reading lots of data into memory if lots of tablets need to + // split. + BlockingQueue queue = new ArrayBlockingQueue<>(10000); + this.splitExecutor = context.threadPools().createThreadPool(numThreads, numThreads, 0, + TimeUnit.MILLISECONDS, "split_seeder", queue, true); + + // Discard task when the queue is full, this allows the TGW to continue processing task other + // than splits. + this.splitExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); Weigher weigher = (key, info) -> key.tableId.canonical().length() + key.tabletFile.getPath().toString().length() + info.getFirstRow().getLength() + info.getLastRow().getLength(); - CacheLoader loader = new CacheLoader<>() { - @Override - public FileInfo load(CacheKey key) throws Exception { - TableConfiguration tableConf = context.getTableConfiguration(key.tableId); - return FileUtil.tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) - .get(key.tabletFile); - } + CacheLoader loader = key -> { + TableConfiguration tableConf = context.getTableConfiguration(key.tableId); + return FileUtil.tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) + .get(key.tabletFile); }; splitFileCache = context.getCaches().createNewBuilder(CacheName.SPLITTER_FILES, true) .expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher) .build(loader); - Weigher weigher2 = (keyExtent, keyExtent2) -> weigh(keyExtent); - - // Tracks splits starting, but not forever in case something in the code does not remove it. - splitsStarting = context.getCaches().createNewBuilder(CacheName.SPLITTER_STARTING, true) - .expireAfterAccess(3, TimeUnit.HOURS).maximumWeight(10_000_000L).weigher(weigher2).build(); - - Weigher weigher3 = (keyExtent, hc) -> { - return weigh(keyExtent) + hc.bits() / 8; - }; - - unsplittable = context.getCaches().createNewBuilder(CacheName.SPLITTER_UNSPLITTABLE, true) - .expireAfterAccess(24, TimeUnit.HOURS).maximumWeight(10_000_000L).weigher(weigher3).build(); } public synchronized void start() {} @@ -142,59 +114,7 @@ public FileInfo getCachedFileInfo(TableId tableId, TabletFile tabletFile) { return splitFileCache.get(new CacheKey(tableId, tabletFile)); } - private HashCode caclulateFilesHash(TabletMetadata tabletMetadata) { - var hasher = Hashing.goodFastHash(128).newHasher(); - tabletMetadata.getFiles().stream().map(StoredTabletFile::getNormalizedPathStr).sorted() - .forEach(path -> hasher.putString(path, UTF_8)); - return hasher.hash(); - } - - /** - * This tablet met the criteria for split but inspection could not find a split point. Remember - * this to avoid wasting time on future inspections until its files change. - */ - public void rememberUnsplittable(TabletMetadata tablet) { - unsplittable.put(tablet.getExtent(), caclulateFilesHash(tablet)); - } - - /** - * If tablet has not been marked as unsplittable, or file set has changed since being marked - * splittable, then return true. Else false. - */ - public boolean isSplittable(TabletMetadata tablet) { - if (splitsStarting.getIfPresent(tablet.getExtent()) != null) { - return false; - } - - var hashCode = unsplittable.getIfPresent(tablet.getExtent()); - - if (hashCode != null) { - if (hashCode.equals(caclulateFilesHash(tablet))) { - return false; - } else { - // We know that the list of files for this tablet have changed - // so we can remove it from the set of unsplittable tablets. - unsplittable.invalidate(tablet.getExtent()); - } - } - - return true; - } - - /** - * Temporarily remember that the process of splitting is starting for this tablet making - * {@link #isSplittable(TabletMetadata)} return false in the future. - */ - public boolean addSplitStarting(KeyExtent extent) { - Objects.requireNonNull(extent); - return splitsStarting.asMap().put(extent, extent) == null; - } - - public void removeSplitStarting(KeyExtent extent) { - splitsStarting.invalidate(extent); - } - - public void executeSplit(SplitTask splitTask) { - splitExecutor.execute(splitTask); + public void initiateSplit(SeedSplitTask seedSplitTask) { + splitExecutor.execute(seedSplitTask); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java new file mode 100644 index 00000000000..695f5650cc3 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.split; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.split.SplitUtils; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FindSplits extends ManagerRepo { + + private static final long serialVersionUID = 1L; + + private static final Logger log = LoggerFactory.getLogger(PreSplit.class); + private final SplitInfo splitInfo; + + public FindSplits(KeyExtent extent) { + this.splitInfo = new SplitInfo(extent, new TreeSet<>()); + } + + @Override + public Repo call(FateId fateId, Manager manager) throws Exception { + var extent = splitInfo.getOriginal(); + var tabletMetadata = manager.getContext().getAmple().readTablet(extent); + + if (tabletMetadata == null) { + log.trace("Table {} no longer exist, so not gonna try to find a split point for it", extent); + return null; + } + + if (tabletMetadata.getOperationId() != null) { + log.debug("Not splitting {} because it has operation id {}", tabletMetadata.getExtent(), + tabletMetadata.getOperationId()); + return null; + } + + if (!tabletMetadata.getLogs().isEmpty()) { + // This code is only called by system initiated splits, so if walogs are present it probably + // makes sense to wait for the data in them to be written to a file before finding splits + // points. + log.debug("Not splitting {} because it has walogs {}", tabletMetadata.getExtent(), + tabletMetadata.getLogs().size()); + } + + SortedSet splits = SplitUtils.findSplits(manager.getContext(), tabletMetadata); + + if (extent.endRow() != null) { + splits.remove(extent.endRow()); + } + + if (splits.isEmpty()) { + log.info("Tablet {} needs to split, but no split points could be found.", + tabletMetadata.getExtent()); + // ELASTICITY_TODO record the fact that tablet is un-splittable in metadata table in a new + // column. Record the config used to reach this decision and a hash of the file. The tablet + // mgmt iterator can inspect this column and only try to split the tablet when something has + // changed. + return null; + } + + return new PreSplit(extent, splits); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index f94afbdb1ef..d956821d183 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -129,8 +129,6 @@ public long isReady(FateId fateId, Manager manager) throws Exception { @Override public Repo call(FateId fateId, Manager manager) throws Exception { - manager.getSplitter().removeSplitStarting(splitInfo.getOriginal()); - TabletMetadata tabletMetadata = manager.getContext().getAmple() .readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID, LOGS); @@ -171,8 +169,5 @@ public Repo call(FateId fateId, Manager manager) throws Exception { } @Override - public void undo(FateId fateId, Manager manager) throws Exception { - // TODO is this called if isReady fails? - manager.getSplitter().removeSplitStarting(splitInfo.getOriginal()); - } + public void undo(FateId fateId, Manager manager) throws Exception {} } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java deleted file mode 100644 index 75c0f16bea2..00000000000 --- a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.split; - -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.ReferencedTabletFile; -import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.util.cache.Caches; -import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.server.ServerContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; - -public class SplitterTest { - - @Test - public void testIsSplittable() { - ThreadPools threadPools = createNiceMock(ThreadPools.class); - replay(threadPools); - ServerContext context = createNiceMock(ServerContext.class); - expect(context.threadPools()).andReturn(threadPools).anyTimes(); - expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); - replay(context); - - var splitter = new Splitter(context); - - KeyExtent ke1 = new KeyExtent(TableId.of("1"), new Text("m"), null); - KeyExtent ke2 = new KeyExtent(TableId.of("1"), null, new Text("m")); - - Set files1 = new HashSet<>(); - files1.add(new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")).insert()); - files1.add(new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")).insert()); - - TabletMetadata tabletMeta1 = createMock(TabletMetadata.class); - expect(tabletMeta1.getExtent()).andReturn(ke1).anyTimes(); - expect(tabletMeta1.getFiles()).andReturn(files1).times(3); - replay(tabletMeta1); - - TabletMetadata tabletMeta2 = createMock(TabletMetadata.class); - expect(tabletMeta2.getExtent()).andReturn(ke2).anyTimes(); - replay(tabletMeta2); - - assertTrue(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - splitter.addSplitStarting(ke1); - - assertFalse(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - splitter.removeSplitStarting(ke1); - - assertTrue(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - splitter.rememberUnsplittable(tabletMeta1); - - assertFalse(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - // tabletMeta1 is currently unsplittable. Adding a file - // to it's file set should cause it to be removed from - // the unsplittable set of tablets, becoming splittable - // again. - files1.add(new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")).insert()); - assertTrue(splitter.isSplittable(tabletMeta1)); - - // when a tablets files change it should become a candidate for inspection - Set files2 = Set.of( - new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")) - .insert(), - new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")) - .insert(), - new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000073.rf")) - .insert()); - TabletMetadata tabletMeta3 = createMock(TabletMetadata.class); - expect(tabletMeta3.getExtent()).andReturn(ke1).anyTimes(); - expect(tabletMeta3.getFiles()).andReturn(files2).anyTimes(); - replay(tabletMeta3); - - assertTrue(splitter.isSplittable(tabletMeta3)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - verify(threadPools, context, tabletMeta1, tabletMeta2, tabletMeta3); - } - -}