Skip to content

Commit

Permalink
AzureFS: Guarantee listed FileInfos already come in sorted
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecrv committed Nov 30, 2023
1 parent 48f69f6 commit 9ddef84
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 89 deletions.
193 changes: 111 additions & 82 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -837,28 +837,37 @@ class AzureFileSystem::Impl {
return Status::OK();
}

/// \brief The last result name from the same container as the base_location.
///
/// The "<container>/" prefix is stripped from the path, so it can be
/// easily compared with blob names in a listing loop.
std::optional<std::string> LastResultBlobName(const AzureLocation& base_location,
const std::vector<FileInfo>& results) {
if (results.empty()) {
return {};
}
const auto& path = results.back().path();
// if path starts with "<container>/"
if (path.size() > base_location.container.size() &&
arrow::internal::StartsWith(path, base_location.container) &&
path[base_location.container.size()] == internal::kSep) {
auto blob_name = path.substr(base_location.container.size() + 1);
if (!blob_name.empty()) {
return std::move(blob_name);
}
static FileInfo FileInfoFromBlob(const std::string& container,
const Azure::Storage::Blobs::Models::BlobItem& blob) {
if (blob.Name.back() == internal::kSep) {
return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
}
return {};
std::string path;
path.reserve(container.size() + 1 + blob.Name.size());
path += container;
path += internal::kSep;
path += blob.Name;
FileInfo info{std::move(path), FileType::File};
info.set_size(blob.BlobSize);
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
return info;
}

static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
FileType::Directory};
}

static std::string_view BasenameView(std::string_view s) {
auto offset = s.find_last_of(internal::kSep);
auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
}

/// \brief List the blobs at the root of a container or some dir in a container.
///
/// \pre container_client is the client for the container named like the first
/// segment of select.base_dir.
Status GetFileInfoWithSelectorFromContainer(
const Azure::Storage::Blobs::BlobContainerClient& container_client,
const Azure::Core::Context& context, Azure::Nullable<int32_t> page_size_hint,
Expand All @@ -879,7 +888,63 @@ class AzureFileSystem::Impl {
options.PageSizeHint = page_size_hint;
options.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;

std::set<std::string> dir_markers_listed;
// When Prefix.Value() contains a trailing slash and we find a blob that
// matches it completely, it is an empty directory marker blob for the
// directory we're listing from, and we should skip it.
auto is_empty_dir_marker =
[&options](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept -> bool {
return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();
};

auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = base_location.container;
sub_select.base_dir += internal::kSep;
sub_select.base_dir += internal::RemoveTrailingSlash(blob_prefix);
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
return GetFileInfoWithSelectorFromContainer(
container_client, context, page_size_hint, sub_select, acc_results);
}
return Status::OK();
};

// (*acc_results)[*last_dir_reported] is the last FileType::Directory in the results
// produced through this loop over the response pages.
std::optional<size_t> last_dir_reported{};
auto matches_last_dir_reported = [&last_dir_reported,
acc_results](const FileInfo& info) noexcept {
if (!last_dir_reported.has_value() || info.type() != FileType::Directory) {
return false;
}
const auto& last_dir = (*acc_results)[*last_dir_reported];
return BasenameView(info.path()) == BasenameView(last_dir.path());
};

auto process_blob =
[&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
if (!is_empty_dir_marker(blob)) {
const auto& info = acc_results->emplace_back(
FileInfoFromBlob(base_location.container, blob));
if (info.type() == FileType::Directory) {
last_dir_reported = acc_results->size() - 1;
}
}
};
auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
const std::string path = base_location.container + internal::kSep + prefix;
const auto& info = acc_results->emplace_back(DirectoryFileInfoFromPath(path));
if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
acc_results->pop_back();
} else {
last_dir_reported = acc_results->size() - 1;
return recurse(prefix);
}
return Status::OK();
};

try {
auto list_response =
container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, context);
Expand All @@ -888,70 +953,34 @@ class AzureFileSystem::Impl {
continue;
}
found = true;
// Emit the blobs first, so that potential BlobPrefix recursions are at the
// function tail.
for (const auto& blob : list_response.Blobs) {
if (ARROW_PREDICT_FALSE(options.Prefix.HasValue() &&
blob.Name == options.Prefix.Value())) {
// Prefix.Value() contains a trailing slash, so if we found a blob that
// matches it completely, then it is an empty directory marker blob for the
// directory we're listing from, so we should skip it.
continue;
}
if (blob.Name.back() == internal::kSep) {
// Defense against empty '/'-separated segments in the blob name.
if (ARROW_PREDICT_FALSE(blob.Name == "/")) {
continue;
}
// We include these now as they might be empty directories and not appear in
// the BlobPrefixes listing.
std::string out_path;
out_path.reserve(base_location.container.size() + blob.Name.size());
out_path += base_location.container;
out_path += internal::kSep;
out_path += internal::RemoveTrailingSlash(blob.Name, false);
FileInfo info{std::move(out_path), FileType::Directory};
acc_results->push_back(std::move(info));
// Remember the listed directory markers, so we don't duplicate
// them in the BlobPrefixes processing loop.
dir_markers_listed.emplace_hint(dir_markers_listed.end(), blob.Name);
continue;
// Blob and BlobPrefixes are sorted by name, so we can merge-iterate
// them to ensure returned results are all sorted.
size_t blob_index = 0;
size_t blob_prefix_index = 0;
while (blob_index < list_response.Blobs.size() &&
blob_prefix_index < list_response.BlobPrefixes.size()) {
const auto& blob = list_response.Blobs[blob_index];
const auto& prefix = list_response.BlobPrefixes[blob_prefix_index];
const int cmp = blob.Name.compare(prefix);
if (cmp < 0) {
process_blob(blob);
blob_index += 1;
} else if (cmp > 0) {
RETURN_NOT_OK(process_prefix(prefix));
blob_prefix_index += 1;
} else { // there is a blob (empty dir marker) and a prefix with the same name
DCHECK_EQ(blob.Name, prefix);
RETURN_NOT_OK(process_prefix(prefix));
blob_index += 1;
blob_prefix_index += 1;
}
// Add file path to results.
std::string out_path;
out_path.reserve(base_location.container.size() + 1 + blob.Name.size());
out_path += base_location.container;
out_path += internal::kSep;
out_path += blob.Name;
FileInfo info{std::move(out_path), FileType::File};
info.set_size(blob.BlobSize);
info.set_mtime(
std::chrono::system_clock::time_point{blob.Details.LastModified});
acc_results->push_back(std::move(info));
}
// Process the BlobPrefixes.
for (const auto& blob_prefix : list_response.BlobPrefixes) {
// Defense against empty '/'-separated segments in the blob name.
if (ARROW_PREDICT_FALSE(blob_prefix == "/")) {
continue;
}
auto path = base_location.container + internal::kSep + blob_prefix;
if (dir_markers_listed.find(path) == dir_markers_listed.end()) {
// Directory not added to the results yet, add it now.
FileInfo info{std::string{internal::RemoveTrailingSlash(path)},
FileType::Directory};
acc_results->push_back(std::move(info));
}
// Recurse into the directory.
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = internal::RemoveTrailingSlash(path);
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
RETURN_NOT_OK(GetFileInfoWithSelectorFromContainer(
container_client, context, page_size_hint, sub_select, acc_results));
}
for (; blob_index < list_response.Blobs.size(); blob_index++) {
process_blob(list_response.Blobs[blob_index]);
}
for (; blob_prefix_index < list_response.BlobPrefixes.size();
blob_prefix_index++) {
RETURN_NOT_OK(process_prefix(list_response.BlobPrefixes[blob_prefix_index]));
}
}
} catch (const Azure::Storage::StorageException& exception) {
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ TEST_F(AzuriteFileSystemTest, GetFileInfoSelector) {
select.base_dir = "";
ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select));
ASSERT_EQ(infos.size(), 3);
SortInfos(&infos);
ASSERT_EQ(infos, SortedInfos(infos));
AssertFileInfo(infos[0], "container", FileType::Directory);
AssertFileInfo(infos[1], "empty-container", FileType::Directory);
AssertFileInfo(infos[2], container_name_, FileType::Directory);
Expand All @@ -551,7 +551,7 @@ TEST_F(AzuriteFileSystemTest, GetFileInfoSelector) {
// Non-empty container
select.base_dir = "container";
ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select));
SortInfos(&infos);
ASSERT_EQ(infos, SortedInfos(infos));
ASSERT_EQ(infos.size(), 4);
AssertFileInfo(infos[0], "container/emptydir", FileType::Directory);
AssertFileInfo(infos[1], "container/otherdir", FileType::Directory);
Expand Down Expand Up @@ -587,7 +587,7 @@ TEST_F(AzuriteFileSystemTest, GetFileInfoSelector) {
ASSERT_RAISES(IOError, fs_->GetFileInfo(select));
select.base_dir = "container/";
ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select));
SortInfos(&infos);
ASSERT_EQ(infos, SortedInfos(infos));
ASSERT_EQ(infos.size(), 4);
}

Expand All @@ -602,7 +602,7 @@ TEST_F(AzuriteFileSystemTest, GetFileInfoSelectorRecursive) {
select.base_dir = "";
ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select));
ASSERT_EQ(infos.size(), 14);
SortInfos(&infos);
ASSERT_EQ(infos, SortedInfos(infos));
AssertInfoAllContainersRecursive(infos);

// Empty container
Expand All @@ -613,7 +613,7 @@ TEST_F(AzuriteFileSystemTest, GetFileInfoSelectorRecursive) {
// Non-empty container
select.base_dir = "container";
ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select));
SortInfos(&infos);
ASSERT_EQ(infos, SortedInfos(infos));
ASSERT_EQ(infos.size(), 10);
AssertFileInfo(infos[0], "container/emptydir", FileType::Directory);
AssertFileInfo(infos[1], "container/otherdir", FileType::Directory);
Expand All @@ -634,14 +634,14 @@ TEST_F(AzuriteFileSystemTest, GetFileInfoSelectorRecursive) {
// Non-empty "directories"
select.base_dir = "container/somedir";
ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select));
SortInfos(&infos);
ASSERT_EQ(infos, SortedInfos(infos));
ASSERT_EQ(infos.size(), 2);
AssertFileInfo(infos[0], "container/somedir/subdir", FileType::Directory);
AssertFileInfo(infos[1], "container/somedir/subdir/subfile", FileType::File, 8);

select.base_dir = "container/otherdir";
ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select));
SortInfos(&infos);
ASSERT_EQ(infos, SortedInfos(infos));
ASSERT_EQ(infos.size(), 4);
AssertFileInfo(infos[0], "container/otherdir/1", FileType::Directory);
AssertFileInfo(infos[1], "container/otherdir/1/2", FileType::Directory);
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/filesystem/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ void SortInfos(std::vector<FileInfo>* infos) {
std::sort(infos->begin(), infos->end(), FileInfo::ByPath{});
}

std::vector<FileInfo> SortedInfos(const std::vector<FileInfo>& infos) {
auto sorted = infos;
SortInfos(&sorted);
return sorted;
}

void CollectFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos) {
auto fut = CollectAsyncGenerator(gen);
ASSERT_FINISHES_OK_AND_ASSIGN(auto nested_infos, fut);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/filesystem/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ void CreateFile(FileSystem* fs, const std::string& path, const std::string& data
ARROW_TESTING_EXPORT
void SortInfos(FileInfoVector* infos);

// Create a copy of a FileInfo vector sorted by lexicographic path order
ARROW_TESTING_EXPORT
std::vector<FileInfo> SortedInfos(const std::vector<FileInfo>& infos);

ARROW_TESTING_EXPORT
void CollectFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos);

Expand Down

0 comments on commit 9ddef84

Please sign in to comment.