Skip to content

Commit

Permalink
Add interface and unit tests of worker page info prefix search
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Please outline the changes and how this PR fixes the issue.

### Why are the changes needed?

Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, describe the bug.

### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
  1. change in user-facing APIs
  2. addition or removal of property keys
  3. webui

			pr-link: #18664
			change-id: cid-5a2565681a810493e49081f7f786215100f1c637
  • Loading branch information
YichuanSun authored Jul 29, 2024
1 parent 5cebbcf commit ff884b8
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import alluxio.metrics.MultiDimensionalMetricsSystem;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.resource.LockResource;
import alluxio.uri.UfsUrl;

import com.codahale.metrics.Counter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -393,4 +394,13 @@ default void invalidate(Predicate<PageInfo> predicate) {
Optional<DataFileChannel> getDataFileChannel(
PageId pageId, int pageOffset, int bytesToRead, CacheContext cacheContext)
throws PageNotFoundException;

/**
* Get pageInfo of a specific prefix.
* @param ufsUrl the prefix of UfsUrl
* @return a list of PageInfos with the same prefix
*/
default List<PageInfo> getPageInfoByPrefix(UfsUrl ufsUrl) {
throw new UnsupportedOperationException("Unsupported method: getPageInfoByPrefix");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.client.file.cache.store.PageStoreDir;
import alluxio.client.quota.CacheScope;
import alluxio.uri.UfsUrl;

import com.google.common.base.MoreObjects;

Expand Down Expand Up @@ -66,6 +67,16 @@ public PageInfo(PageId pageId, long pageSize, CacheScope cacheScope,
mCreatedTimestamp = createdTimestamp;
}

/**
* @return the UfsUrl
*/
public UfsUrl getUfsUrl() {
if (mPageId.getFileId().contains(UfsUrl.SCHEME_SEPARATOR)) {
return UfsUrl.createInstance(mPageId.getFileId());
}
return UfsUrl.createInstance("zzz:///" + mPageId.getFileId());
}

/**
* @return page id
*/
Expand Down
15 changes: 15 additions & 0 deletions dora/core/common/src/main/java/alluxio/util/io/PathUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.InvalidPathException;
import alluxio.uri.UfsUrl;
import alluxio.util.OSUtils;

import com.google.common.base.CharMatcher;
Expand Down Expand Up @@ -526,4 +527,18 @@ public static AlluxioURI convertUfsPathToAlluxioPath(
"UFS root %s is not a prefix of %s", ufsRootPath, ufsPath));
}
}

/**
* Converts the Ufs path string to {@link UfsUrl}.
* If the path has no scheme, regards it as a local file.
*
* @param ufsPath the string representing an ufs path
* @return the UfsUrl instance
*/
public static UfsUrl convertUfsPathToUfsUrl(String ufsPath) {
if (ufsPath.contains(UfsUrl.SCHEME_SEPARATOR)) {
return UfsUrl.createInstance(ufsPath);
}
return UfsUrl.createInstance("file" + UfsUrl.SCHEME_SEPARATOR + ufsPath);
}
}
6 changes: 6 additions & 0 deletions dora/core/server/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.CacheUsage;
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageInfo;
import alluxio.client.file.dora.netty.NettyDataReader;
import alluxio.client.file.options.UfsFileSystemOptions;
import alluxio.client.file.ufs.UfsBaseFileSystem;
Expand Down Expand Up @@ -89,6 +90,7 @@
import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.DeleteOptions;
import alluxio.underfs.options.MkdirsOptions;
import alluxio.uri.UfsUrl;
import alluxio.util.CommonUtils;
import alluxio.util.ModeUtils;
import alluxio.util.executor.ExecutorServiceFactories;
Expand Down Expand Up @@ -1294,4 +1296,13 @@ public static DoraMeta.FileStatus buildFileStatusFromUfsStatus(
.setTs(System.nanoTime())
.build();
}

/**
* Get page info by a prefix.
* @param ufsUrl the start UfsUrl
* @return the PageInfo set
*/
public List<PageInfo> getPageInfoByPrefix(UfsUrl ufsUrl) {
return mCacheManager.getPageInfoByPrefix(ufsUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.CacheManagerOptions;
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageInfo;
import alluxio.client.file.cache.PageMetaStore;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
Expand Down Expand Up @@ -53,7 +54,9 @@
import alluxio.metrics.MetricsSystem;
import alluxio.security.authorization.Mode;
import alluxio.underfs.UfsStatus;
import alluxio.uri.UfsUrl;
import alluxio.util.io.BufferUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.WorkerIdentity;
import alluxio.worker.block.BlockMasterClientPool;

Expand All @@ -62,6 +65,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -74,7 +78,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -87,6 +93,7 @@ public class PagedDoraWorkerTest {
private CacheManager mCacheManager;
private MembershipManager mMembershipManager;
private long mPageSize;
protected String mFileContent = "fileContent";
private static final GetStatusPOptions GET_STATUS_OPTIONS_MUST_SYNC =
GetStatusPOptions.newBuilder().setCommonOptions(
FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(0)).build();
Expand Down Expand Up @@ -1007,4 +1014,58 @@ public int readInternal(long position, ReadTargetBuffer buffer, int length) {
return size;
}
}

@Ignore("Remove this annotation once OS worker supports the prefix search.")
@Test
public void searchPagesByPrefix() throws Exception {
String prefix1 = "prefix1";
String prefix2 = "prefix2";
String prefix3 = "q";
String prefix4 = "~";

UfsUrl rootPrefix = PathUtils.convertUfsPathToUfsUrl(
mTestFolder.getRoot().getAbsolutePath() + "/");

int fileNum = 10;
createFilesWithPrefix(prefix1, fileNum);
createFilesWithPrefix(prefix2, fileNum);
createFilesWithPrefix(prefix3, fileNum);
createFilesWithPrefix(prefix4, fileNum);

checkPrefixSearch(rootPrefix, rootPrefix.join(prefix1));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix2));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix3));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix4));
}

private void checkPrefixSearch(UfsUrl rootUrl, UfsUrl prefixUrl) {
List<PageInfo> rootResult = mWorker.getPageInfoByPrefix(rootUrl);
Set<PageInfo> rootResultSet = new HashSet<>(rootResult);
Assert.assertEquals(rootResult.size(), rootResultSet.size());

List<PageInfo> prefixSearchResult = mWorker.getPageInfoByPrefix(prefixUrl);
Set<PageInfo> prefixSearchSet = new HashSet<>(prefixSearchResult);
Assert.assertEquals(prefixSearchResult.size(), prefixSearchSet.size());

for (PageInfo p : prefixSearchResult) {
Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
}
for (PageInfo p : rootResultSet) {
if (prefixSearchSet.contains(p)) {
Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
} else {
Assert.assertFalse(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
}
}
}

private void createFilesWithPrefix(String prefix, int fileNum)
throws AccessControlException, IOException, ExecutionException, InterruptedException,
TimeoutException {
for (int i = 0; i < fileNum; i++) {
File f = mTestFolder.newFile(prefix + "_" + i);
Files.write(f.toPath(), mFileContent.getBytes());
loadFileData(f.getPath());
}
}
}

0 comments on commit ff884b8

Please sign in to comment.