diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java index ffd3c5a65b10..62c5dce21801 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java @@ -22,6 +22,7 @@ import alluxio.metrics.MultiDimensionalMetricsSystem; import alluxio.network.protocol.databuffer.DataFileChannel; import alluxio.resource.LockResource; +import alluxio.uri.UfsUrl; import com.codahale.metrics.Counter; import org.slf4j.Logger; @@ -393,4 +394,13 @@ default void invalidate(Predicate predicate) { Optional getDataFileChannel( PageId pageId, int pageOffset, int bytesToRead, CacheContext cacheContext) throws PageNotFoundException; + + /** + * Get pageInfo of a specific prefix. + * @param ufsUrl the prefix of UfsUrl + * @return a list of PageInfos with the same prefix + */ + default List getPageInfoByPrefix(UfsUrl ufsUrl) { + throw new UnsupportedOperationException("Unsupported method: getPageInfoByPrefix"); + } } diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/PageInfo.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/PageInfo.java index 41ab24eaac0e..e41a5f3c2d17 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/PageInfo.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/PageInfo.java @@ -13,6 +13,7 @@ import alluxio.client.file.cache.store.PageStoreDir; import alluxio.client.quota.CacheScope; +import alluxio.uri.UfsUrl; import com.google.common.base.MoreObjects; @@ -66,6 +67,16 @@ public PageInfo(PageId pageId, long pageSize, CacheScope cacheScope, mCreatedTimestamp = createdTimestamp; } + /** + * @return the UfsUrl + */ + public UfsUrl getUfsUrl() { + if (mPageId.getFileId().contains(UfsUrl.SCHEME_SEPARATOR)) { + return UfsUrl.createInstance(mPageId.getFileId()); + } + return UfsUrl.createInstance("zzz:///" + mPageId.getFileId()); + } + /** * @return page id */ diff --git a/dora/core/common/src/main/java/alluxio/util/io/PathUtils.java b/dora/core/common/src/main/java/alluxio/util/io/PathUtils.java index 3583a035befe..61aa1c1e5490 100644 --- a/dora/core/common/src/main/java/alluxio/util/io/PathUtils.java +++ b/dora/core/common/src/main/java/alluxio/util/io/PathUtils.java @@ -16,6 +16,7 @@ import alluxio.conf.PropertyKey; import alluxio.exception.ExceptionMessage; import alluxio.exception.InvalidPathException; +import alluxio.uri.UfsUrl; import alluxio.util.OSUtils; import com.google.common.base.CharMatcher; @@ -526,4 +527,18 @@ public static AlluxioURI convertUfsPathToAlluxioPath( "UFS root %s is not a prefix of %s", ufsRootPath, ufsPath)); } } + + /** + * Converts the Ufs path string to {@link UfsUrl}. + * If the path has no scheme, regards it as a local file. + * + * @param ufsPath the string representing an ufs path + * @return the UfsUrl instance + */ + public static UfsUrl convertUfsPathToUfsUrl(String ufsPath) { + if (ufsPath.contains(UfsUrl.SCHEME_SEPARATOR)) { + return UfsUrl.createInstance(ufsPath); + } + return UfsUrl.createInstance("file" + UfsUrl.SCHEME_SEPARATOR + ufsPath); + } } diff --git a/dora/core/server/worker/pom.xml b/dora/core/server/worker/pom.xml index 1944757aa45b..b8bad3c9b58e 100644 --- a/dora/core/server/worker/pom.xml +++ b/dora/core/server/worker/pom.xml @@ -125,6 +125,12 @@ ${project.version} test + + org.apache.hadoop + hadoop-hdfs + 3.3.1 + test + org.hamcrest hamcrest diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index 9f2ad2c1bfcb..2ed7d3b75ad4 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -24,6 +24,7 @@ import alluxio.client.file.cache.CacheManager; import alluxio.client.file.cache.CacheUsage; import alluxio.client.file.cache.PageId; +import alluxio.client.file.cache.PageInfo; import alluxio.client.file.dora.netty.NettyDataReader; import alluxio.client.file.options.UfsFileSystemOptions; import alluxio.client.file.ufs.UfsBaseFileSystem; @@ -89,6 +90,7 @@ import alluxio.underfs.options.CreateOptions; import alluxio.underfs.options.DeleteOptions; import alluxio.underfs.options.MkdirsOptions; +import alluxio.uri.UfsUrl; import alluxio.util.CommonUtils; import alluxio.util.ModeUtils; import alluxio.util.executor.ExecutorServiceFactories; @@ -1294,4 +1296,13 @@ public static DoraMeta.FileStatus buildFileStatusFromUfsStatus( .setTs(System.nanoTime()) .build(); } + + /** + * Get page info by a prefix. + * @param ufsUrl the start UfsUrl + * @return the PageInfo set + */ + public List getPageInfoByPrefix(UfsUrl ufsUrl) { + return mCacheManager.getPageInfoByPrefix(ufsUrl); + } } diff --git a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java index e0f92fb32f80..e3a951d84916 100644 --- a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java +++ b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java @@ -24,6 +24,7 @@ import alluxio.client.file.cache.CacheManager; import alluxio.client.file.cache.CacheManagerOptions; import alluxio.client.file.cache.PageId; +import alluxio.client.file.cache.PageInfo; import alluxio.client.file.cache.PageMetaStore; import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; @@ -53,7 +54,9 @@ import alluxio.metrics.MetricsSystem; import alluxio.security.authorization.Mode; import alluxio.underfs.UfsStatus; +import alluxio.uri.UfsUrl; import alluxio.util.io.BufferUtils; +import alluxio.util.io.PathUtils; import alluxio.wire.WorkerIdentity; import alluxio.worker.block.BlockMasterClientPool; @@ -62,6 +65,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -74,7 +78,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,6 +93,7 @@ public class PagedDoraWorkerTest { private CacheManager mCacheManager; private MembershipManager mMembershipManager; private long mPageSize; + protected String mFileContent = "fileContent"; private static final GetStatusPOptions GET_STATUS_OPTIONS_MUST_SYNC = GetStatusPOptions.newBuilder().setCommonOptions( FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(0)).build(); @@ -1007,4 +1014,58 @@ public int readInternal(long position, ReadTargetBuffer buffer, int length) { return size; } } + + @Ignore("Remove this annotation once OS worker supports the prefix search.") + @Test + public void searchPagesByPrefix() throws Exception { + String prefix1 = "prefix1"; + String prefix2 = "prefix2"; + String prefix3 = "q"; + String prefix4 = "~"; + + UfsUrl rootPrefix = PathUtils.convertUfsPathToUfsUrl( + mTestFolder.getRoot().getAbsolutePath() + "/"); + + int fileNum = 10; + createFilesWithPrefix(prefix1, fileNum); + createFilesWithPrefix(prefix2, fileNum); + createFilesWithPrefix(prefix3, fileNum); + createFilesWithPrefix(prefix4, fileNum); + + checkPrefixSearch(rootPrefix, rootPrefix.join(prefix1)); + checkPrefixSearch(rootPrefix, rootPrefix.join(prefix2)); + checkPrefixSearch(rootPrefix, rootPrefix.join(prefix3)); + checkPrefixSearch(rootPrefix, rootPrefix.join(prefix4)); + } + + private void checkPrefixSearch(UfsUrl rootUrl, UfsUrl prefixUrl) { + List rootResult = mWorker.getPageInfoByPrefix(rootUrl); + Set rootResultSet = new HashSet<>(rootResult); + Assert.assertEquals(rootResult.size(), rootResultSet.size()); + + List prefixSearchResult = mWorker.getPageInfoByPrefix(prefixUrl); + Set prefixSearchSet = new HashSet<>(prefixSearchResult); + Assert.assertEquals(prefixSearchResult.size(), prefixSearchSet.size()); + + for (PageInfo p : prefixSearchResult) { + Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString())); + } + for (PageInfo p : rootResultSet) { + if (prefixSearchSet.contains(p)) { + Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString())); + } else { + Assert.assertFalse(p.getUfsUrl().toString().startsWith(prefixUrl.toString())); + } + } + } + + private void createFilesWithPrefix(String prefix, int fileNum) + throws AccessControlException, IOException, ExecutionException, InterruptedException, + TimeoutException { + for (int i = 0; i < fileNum; i++) { + File f = mTestFolder.newFile(prefix + "_" + i); + Files.write(f.toPath(), mFileContent.getBytes()); + loadFileData(f.getPath()); + } + } }