Skip to content

Commit

Permalink
Merge branch 'master' into aws-credentials-expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Nov 17, 2023
2 parents 4295487 + a617bf3 commit 0901374
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 102 deletions.
1 change: 0 additions & 1 deletion tensorstore/kvstore/gcs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ tensorstore_cc_library(
"//tensorstore/proto:parse_text_proto_or_die",
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/log:absl_check",
Expand Down
26 changes: 13 additions & 13 deletions tensorstore/kvstore/gcs/gcs_testbench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "tensorstore/kvstore/gcs/gcs_testbench.h"

#include <memory>
#include <optional>
#include <string>

Expand All @@ -37,7 +38,6 @@
#include "tensorstore/proto/parse_text_proto_or_die.h"
#include "tensorstore/util/future.h"
#include "tensorstore/util/result.h"
#include "tensorstore/util/status.h"

// protos
#include "google/storage/v2/storage.grpc.pb.h"
Expand All @@ -48,14 +48,14 @@ ABSL_FLAG(std::string, testbench_binary, "",

namespace gcs_testbench {

using ::google::storage::v2::Storage;
using ::tensorstore::internal::GrpcStatusToAbslStatus;
using ::tensorstore::internal::SpawnSubprocess;
using ::tensorstore::internal::Subprocess;
using ::tensorstore::internal::SubprocessOptions;
using ::tensorstore::internal_http::GetDefaultHttpTransport;
using ::tensorstore::internal_http::HttpRequestBuilder;
using ::tensorstore::transport_test_utils::TryPickUnusedPort;
using ::google::storage::v2::Storage;

StorageTestbench::StorageTestbench() = default;

Expand Down Expand Up @@ -140,15 +140,9 @@ StorageTestbench::~StorageTestbench() {
}
}

void StorageTestbench::CreateBucket(std::string bucket) {
ABSL_CHECK(running);

std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(
grpc_address(), grpc::InsecureChannelCredentials()); // NOLINT

auto stub = Storage::NewStub(channel);

grpc::ClientContext client_context;
/* static */
absl::Status StorageTestbench::CreateBucket(std::string grpc_endpoint,
std::string bucket) {
google::storage::v2::CreateBucketRequest bucket_request =
tensorstore::ParseTextProtoOrDie(R"pb(
parent: 'projects/12345'
Expand All @@ -158,11 +152,17 @@ void StorageTestbench::CreateBucket(std::string bucket) {
predefined_default_object_acl: 'publicReadWrite'
)pb");
bucket_request.set_bucket_id(bucket);

google::storage::v2::Bucket bucket_response;

std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(
grpc_endpoint, grpc::InsecureChannelCredentials()); // NOLINT

auto stub = Storage::NewStub(channel);

grpc::ClientContext client_context;
grpc::Status status =
stub->CreateBucket(&client_context, bucket_request, &bucket_response);
ABSL_LOG(INFO) << GrpcStatusToAbslStatus(status);
return GrpcStatusToAbslStatus(status);
}

} // namespace gcs_testbench
4 changes: 3 additions & 1 deletion tensorstore/kvstore/gcs/gcs_testbench.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <optional>
#include <string>

#include "absl/status/status.h"
#include "tensorstore/internal/os/subprocess.h"

namespace gcs_testbench {
Expand All @@ -32,7 +33,8 @@ class StorageTestbench {
void SpawnProcess();

// Issues a gRPC CreateBucket request against the testbench.
void CreateBucket(std::string bucket);
static absl::Status CreateBucket(std::string grpc_endpoint,
std::string bucket);

std::string http_address();
std::string grpc_address();
Expand Down
3 changes: 3 additions & 0 deletions tensorstore/kvstore/gcs_grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ tensorstore_cc_test(
"//tensorstore/kvstore:test_util",
"//tensorstore/kvstore/gcs:gcs_testbench",
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:cord",
Expand Down
91 changes: 61 additions & 30 deletions tensorstore/kvstore/gcs_grpc/gcs_grpc_testbench_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <vector>

#include <gtest/gtest.h>
#include "absl/flags/flag.h"
#include "absl/log/absl_check.h"
#include "absl/log/absl_log.h"
#include "absl/strings/cord.h"
#include "absl/strings/str_cat.h"
Expand All @@ -31,8 +33,12 @@
#include "tensorstore/kvstore/operations.h"
#include "tensorstore/kvstore/test_util.h"
#include "tensorstore/util/future.h"
#include "tensorstore/util/result.h"
#include "tensorstore/util/status_testutil.h"

// Connect to an already-running testbench server on the grpc port.
ABSL_FLAG(std::string, testbench_grpc_endpoint, "", "testbench endpoint");

namespace kvstore = ::tensorstore::kvstore;

using ::gcs_testbench::StorageTestbench;
Expand All @@ -42,20 +48,30 @@ using ::tensorstore::internal::NoDestructor;

namespace {

StorageTestbench& GetTestBench() {
std::string GetTestBenchEndpoint() {
static NoDestructor<StorageTestbench> testbench;
testbench->SpawnProcess();
return *testbench;
static std::string endpoint = [&] {
std::string grpc_endpoint = absl::GetFlag(FLAGS_testbench_grpc_endpoint);
if (grpc_endpoint.empty()) {
testbench->SpawnProcess();
grpc_endpoint = testbench->grpc_address();
}
ABSL_LOG(INFO) << "Using " << grpc_endpoint;
ABSL_LOG(INFO) << "Creating bucket: "
<< StorageTestbench::CreateBucket(grpc_endpoint,
"test_bucket");
return grpc_endpoint;
}();

return endpoint;
}

class GcsGrpcTestbenchTest : public testing::Test {
public:
tensorstore::KvStore OpenStore(std::string path = "") {
auto& testbench = GetTestBench();
testbench.CreateBucket("test_bucket");
ABSL_LOG(INFO) << "Using " << testbench.grpc_address();
std::string testbench_grpc_endpoint = GetTestBenchEndpoint();
return kvstore::Open({{"driver", "gcs_grpc"},
{"endpoint", testbench.grpc_address()},
{"endpoint", testbench_grpc_endpoint},
{"timeout", "200ms"},
{"num_channels", 1},
{"bucket", "test_bucket"},
Expand Down Expand Up @@ -130,37 +146,52 @@ TEST_F(GcsGrpcTestbenchTest, CancellationDoesNotCrash) {

struct ConcurrentWriteFn {
static constexpr char kKey[] = "test";
static constexpr size_t kNumIterations = 100;
static constexpr size_t kNumIterations = 0x7f;

const size_t offset;
mutable std::string value;
mutable StorageGeneration generation;
tensorstore::KvStore store;

void operator()() const {
for (size_t i = 0; i < kNumIterations; ++i) {
while (true) {
size_t x;
std::memcpy(&x, &value[offset], sizeof(size_t));
ASSERT_EQ(i, x);
std::string new_value = value;
x = i + 1;
std::memcpy(&new_value[offset], &x, sizeof(size_t));
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto write_result,
kvstore::Write(store, kKey, absl::Cord(new_value), {generation})
.result());
if (!StorageGeneration::IsUnknown(write_result.generation)) {
generation = write_result.generation;
value = new_value;
break;
bool read = false;
for (size_t i = 0; i < kNumIterations; /**/) {
if (read) {
auto read_result = kvstore::Read(store, kKey).result();
if (!read_result.ok()) {
// NOTE: This should always be .ok(), however there appears to be a
// corruption bug somewhere. The symptoms are that when verbose
// logging is on (--tensorstore_verbose_logging=gcs_grpc), the
// response.checksummed_data.content differs from the equivalent log
// from storage_testbench.
ABSL_LOG(INFO) << read_result.status();
continue;
}
TENSORSTORE_ASSERT_OK_AND_ASSIGN(auto read_result,
kvstore::Read(store, kKey).result());
ASSERT_FALSE(read_result.aborted() || read_result.not_found());
ASSERT_EQ(read_result.value.size(), value.size());
value = std::string(read_result.value);
generation = read_result.stamp.generation;

ABSL_CHECK(!read_result->aborted());
ABSL_CHECK(!read_result->not_found());
ABSL_CHECK_EQ(read_result->value.size(), value.size());
value = std::string(read_result->value);
generation = read_result->stamp.generation;
}

size_t x;
std::memcpy(&x, &value[offset], sizeof(size_t));
ABSL_CHECK_EQ(i, x);
std::string new_value = value;
x = i + 1;
std::memcpy(&new_value[offset], &x, sizeof(size_t));
TENSORSTORE_CHECK_OK_AND_ASSIGN(
auto write_result,
kvstore::Write(store, kKey, absl::Cord(new_value), {generation})
.result());
if (!StorageGeneration::IsUnknown(write_result.generation)) {
generation = write_result.generation;
value = new_value;
i = x;
read = false;
} else {
read = true;
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions tensorstore/kvstore/gcs_http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ tensorstore_cc_test(
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"@com_google_absl//absl/base",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:cord",
Expand Down
Loading

0 comments on commit 0901374

Please sign in to comment.