Skip to content

Commit

Permalink
Implement AzureFileSystem::GetFileInfoWithSelector()
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecrv committed Nov 30, 2023
1 parent fb27108 commit 48f69f6
Show file tree
Hide file tree
Showing 2 changed files with 375 additions and 2 deletions.
205 changes: 204 additions & 1 deletion cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,204 @@ class AzureFileSystem::Impl {
}
}

private:
template <typename OnContainer>
Status ListContainers(const Azure::Core::Context& context,
OnContainer&& on_container) const {
Azure::Storage::Blobs::ListBlobContainersOptions options;
// Deleted containers are not returned.
options.Include = Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
try {
auto container_list_response =
blob_service_client_->ListBlobContainers(options, context);
for (; container_list_response.HasPage();
container_list_response.MoveToNextPage(context)) {
for (const auto& container : container_list_response.BlobContainers) {
RETURN_NOT_OK(on_container(container));
}
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus("Failed to list account containers.", exception);
}
return Status::OK();
}

/// \brief The last result name from the same container as the base_location.
///
/// The "<container>/" prefix is stripped from the path, so it can be
/// easily compared with blob names in a listing loop.
std::optional<std::string> LastResultBlobName(const AzureLocation& base_location,
const std::vector<FileInfo>& results) {
if (results.empty()) {
return {};
}
const auto& path = results.back().path();
// if path starts with "<container>/"
if (path.size() > base_location.container.size() &&
arrow::internal::StartsWith(path, base_location.container) &&
path[base_location.container.size()] == internal::kSep) {
auto blob_name = path.substr(base_location.container.size() + 1);
if (!blob_name.empty()) {
return std::move(blob_name);
}
}
return {};
}

Status GetFileInfoWithSelectorFromContainer(
const Azure::Storage::Blobs::BlobContainerClient& container_client,
const Azure::Core::Context& context, Azure::Nullable<int32_t> page_size_hint,
const FileSelector& select, FileInfoVector* acc_results) {
ARROW_ASSIGN_OR_RAISE(auto base_location, AzureLocation::FromString(select.base_dir));

bool found = false;
Azure::Storage::Blobs::ListBlobsOptions options;
if (internal::GetAbstractPathDepth(base_location.path) == 0) {
// If the base_dir is the root of the container, then we want to list all blobs in
// the container and the Prefix should be empty and not even include the trailing
// slash because the container itself represents the `<container>/` directory.
options.Prefix = {};
found = true; // Unless the container itself is not found later!
} else {
options.Prefix = internal::EnsureTrailingSlash(base_location.path);
}
options.PageSizeHint = page_size_hint;
options.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;

std::set<std::string> dir_markers_listed;
try {
auto list_response =
container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, context);
for (; list_response.HasPage(); list_response.MoveToNextPage(context)) {
if (list_response.Blobs.empty() && list_response.BlobPrefixes.empty()) {
continue;
}
found = true;
// Emit the blobs first, so that potential BlobPrefix recursions are at the
// function tail.
for (const auto& blob : list_response.Blobs) {
if (ARROW_PREDICT_FALSE(options.Prefix.HasValue() &&
blob.Name == options.Prefix.Value())) {
// Prefix.Value() contains a trailing slash, so if we found a blob that
// matches it completely, then it is an empty directory marker blob for the
// directory we're listing from, so we should skip it.
continue;
}
if (blob.Name.back() == internal::kSep) {
// Defense against empty '/'-separated segments in the blob name.
if (ARROW_PREDICT_FALSE(blob.Name == "/")) {
continue;
}
// We include these now as they might be empty directories and not appear in
// the BlobPrefixes listing.
std::string out_path;
out_path.reserve(base_location.container.size() + blob.Name.size());
out_path += base_location.container;
out_path += internal::kSep;
out_path += internal::RemoveTrailingSlash(blob.Name, false);
FileInfo info{std::move(out_path), FileType::Directory};
acc_results->push_back(std::move(info));
// Remember the listed directory markers, so we don't duplicate
// them in the BlobPrefixes processing loop.
dir_markers_listed.emplace_hint(dir_markers_listed.end(), blob.Name);
continue;
}
// Add file path to results.
std::string out_path;
out_path.reserve(base_location.container.size() + 1 + blob.Name.size());
out_path += base_location.container;
out_path += internal::kSep;
out_path += blob.Name;
FileInfo info{std::move(out_path), FileType::File};
info.set_size(blob.BlobSize);
info.set_mtime(
std::chrono::system_clock::time_point{blob.Details.LastModified});
acc_results->push_back(std::move(info));
}
// Process the BlobPrefixes.
for (const auto& blob_prefix : list_response.BlobPrefixes) {
// Defense against empty '/'-separated segments in the blob name.
if (ARROW_PREDICT_FALSE(blob_prefix == "/")) {
continue;
}
auto path = base_location.container + internal::kSep + blob_prefix;
if (dir_markers_listed.find(path) == dir_markers_listed.end()) {
// Directory not added to the results yet, add it now.
FileInfo info{std::string{internal::RemoveTrailingSlash(path)},
FileType::Directory};
acc_results->push_back(std::move(info));
}
// Recurse into the directory.
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = internal::RemoveTrailingSlash(path);
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
RETURN_NOT_OK(GetFileInfoWithSelectorFromContainer(
container_client, context, page_size_hint, sub_select, acc_results));
}
}
}
} catch (const Azure::Storage::StorageException& exception) {
if (exception.ErrorCode == "ContainerNotFound") {
found = false;
} else {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + select.base_dir + ": " +
container_client.GetUrl(),
exception);
}
}

return found || select.allow_not_found
? Status::OK()
: ::arrow::fs::internal::PathNotFound(select.base_dir);
}

public:
Status GetFileInfoWithSelector(const Azure::Core::Context& context,
Azure::Nullable<int32_t> page_size_hint,
const FileSelector& select,
FileInfoVector* acc_results) {
ARROW_ASSIGN_OR_RAISE(auto base_location, AzureLocation::FromString(select.base_dir));

if (base_location.container.empty()) {
// Without a container, the base_location is equivalent to the filesystem
// root -- `/`. FileSelector::allow_not_found doesn't matter in this case
// because the root always exists.
auto on_container =
[&](const Azure::Storage::Blobs::Models::BlobContainerItem& container) {
// Deleted containers are not listed by ListContainers.
DCHECK(!container.IsDeleted);

// Every container is considered a directory.
FileInfo info{container.Name, FileType::Directory};
info.set_mtime(
std::chrono::system_clock::time_point{container.Details.LastModified});
acc_results->push_back(std::move(info));

// Recurse into containers (subdirectories) if requested.
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = container.Name;
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
ARROW_RETURN_NOT_OK(GetFileInfoWithSelector(context, page_size_hint,
sub_select, acc_results));
}
return Status::OK();
};
return ListContainers(context, std::move(on_container));
}

auto container_client =
blob_service_client_->GetBlobContainerClient(base_location.container);
return GetFileInfoWithSelectorFromContainer(container_client, context, page_size_hint,
select, acc_results);
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const AzureLocation& location,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
Expand Down Expand Up @@ -1103,7 +1301,12 @@ Result<FileInfo> AzureFileSystem::GetFileInfo(const std::string& path) {
}

Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& select) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
Azure::Core::Context context;
Azure::Nullable<int32_t> page_size_hint; // unspecified
FileInfoVector results;
RETURN_NOT_OK(
impl_->GetFileInfoWithSelector(context, page_size_hint, select, &results));
return std::move(results);
}

Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) {
Expand Down
Loading

0 comments on commit 48f69f6

Please sign in to comment.