Skip to content

Commit

Permalink
Fix bugs in kvs_backed_cache / kvstore transaction mechanism
Browse files Browse the repository at this point in the history
- Previously, if within a single transaction, the same key was written
by one ChunkCache entry and then read (but not modified) by another
ChunkCache entry, then write would be lost.

- Previously, if one cache entry performed a transactional read that
was handled by a writeback request to a prior cache entry for the same
key in the same transaction, and an `if_not_equal` condition was
specified and not satisfied, a `set_cancel` callback that was supposed
to be unreachable would incorrectly be invoked, leading to a crash.

PiperOrigin-RevId: 595784166
Change-Id: Ie060614623c9541a31934e037bc757dab5e53695
  • Loading branch information
jbms authored and copybara-github committed Jan 4, 2024
1 parent eaa7371 commit a5ca269
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 71 deletions.
36 changes: 28 additions & 8 deletions tensorstore/internal/cache/async_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,16 +753,34 @@ class AsyncCache : public Cache {
using ApplyReceiver = AnyReceiver<absl::Status, ReadState>;

struct ApplyOptions {
/// Returned `ReadStateUpdate` must reflect an existing read state that is
/// current as of `staleness_bound`.
// Returned `ReadStateUpdate` must reflect an existing read state that is
// current as of `staleness_bound`.
absl::Time staleness_bound;

/// If `true`, the `data` returned in the `ReadState` will be ignored.
/// This option is used if `DoApply` is called solely to validate the read
/// state. If no validation is necessary, the `stamp` field of the
/// `ReadState` may be set to
/// `TimestampedStorageGeneration::Unconditional()`.
bool validate_only = false;
enum ApplyMode {
// The `stamp` field of the `ReadState` may be set to
// `TimestampedStorageGeneration::Unconditional()` to indicate that this
// transaction node does nothing (e.g. read-only and does not
// validation). In this case the `data` returned in the `ReadState`
// will be ignored. Otherwise, the `stamp` must not be
// `StorageGeneration::Unknown()`, and should be marked "dirty" if the
// data has been modified by this transaction node.
kNormal,

// The `data` returned in the `ReadState` must be valid even if this
// transaction node does not modify it. The `stamp` field of the
// `ReadState` must not be
// `TimestampedStorageGeneration::Unconditional()`.
kSpecifyUnchanged,

// The `data` returned in the `ReadState` will be ignored. This option
// is used if `DoApply` is called solely to validate the read state. If
// no validation is necessary, the `stamp` field of the `ReadState` may
// be set to `TimestampedStorageGeneration::Unconditional()`.
kValidateOnly,
};

ApplyMode apply_mode = kNormal;
};

/// Requests an updated read state that reflects the modifications made by
Expand All @@ -778,6 +796,8 @@ class AsyncCache : public Cache {
/// This is not invoked directly by `AsyncCache` (and therefore does not
/// necessarily need to be implemented), but is required by
/// `KvsBackedCache`.
///
/// Must not call `set_cancel`.
virtual void DoApply(ApplyOptions option, ApplyReceiver receiver);

/// Invoked by the `TransactionState` implementation to commit this node.
Expand Down
24 changes: 11 additions & 13 deletions tensorstore/internal/cache/chunk_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -557,33 +557,29 @@ absl::Status ChunkCache::TransactionNode::OnModified() {
return absl::OkStatus();
}

namespace {
bool IsCommitUnconditional(ChunkCache::TransactionNode& node) {
return node.IsUnconditional() || !node.is_modified;
}
} // namespace

void ChunkCache::TransactionNode::DoApply(ApplyOptions options,
ApplyReceiver receiver) {
if (options.validate_only) {
if (options.apply_mode == ApplyOptions::kValidateOnly) {
execution::set_value(
receiver, ReadState{{}, TimestampedStorageGeneration::Unconditional()});
return;
}
auto continuation = WithExecutor(
GetOwningCache(*this).executor(),
[this, receiver = std::move(receiver)](
[this, receiver = std::move(receiver),
specify_unchanged =
options.apply_mode == ApplyOptions::kSpecifyUnchanged](
tensorstore::ReadyFuture<const void> future) mutable {
if (!future.result().ok()) {
return execution::set_error(receiver, future.result().status());
}
AsyncCache::ReadState read_state;
if (!IsCommitUnconditional(*this)) {
read_state = AsyncCache::ReadLock<void>(*this).read_state();
} else {
if (this->IsUnconditional() ||
(!this->is_modified && !specify_unchanged)) {
read_state.stamp = TimestampedStorageGeneration::Unconditional();
} else {
read_state = AsyncCache::ReadLock<void>(*this).read_state();
}
std::shared_ptr<const void> new_data;
if (is_modified) {
// Protect against concurrent calls to `DoApply`, since this may
// modify the write arrays to incorporate the read state.
Expand All @@ -595,7 +591,9 @@ void ChunkCache::TransactionNode::DoApply(ApplyOptions options,
}
execution::set_value(receiver, std::move(read_state));
});
if (IsCommitUnconditional(*this)) {
if (this->IsUnconditional() ||
(!this->is_modified &&
options.apply_mode != ApplyOptions::kSpecifyUnchanged)) {
continuation(MakeReadyFuture());
} else {
this->Read(options.staleness_bound)
Expand Down
127 changes: 127 additions & 0 deletions tensorstore/internal/cache/chunk_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1595,4 +1595,131 @@ TEST_F(ChunkCacheTest, SelfCopySameChunkSeparateCachesWithExistingData) {
EXPECT_THAT(GetChunk({0}), ElementsAre(MakeArray<int>({42, 42})));
}

TEST_F(ChunkCacheTest, SeparateCachesTransactionalReadThenWrite) {
// Dimension 0 is chunked with a size of 2.
grid = ChunkGridSpecification({ChunkGridSpecification::Component{
SharedArray<const void>(MakeArray<int>({1, 2})), Box<>(1)}});
auto cache1 = MakeChunkCache();
auto cache2 = MakeChunkCache();

Transaction transaction(tensorstore::isolated);

auto read_future =
tensorstore::Read(GetTensorStore(cache1, {}, 0, transaction) |
tensorstore::Dims(0).TranslateSizedInterval(0, 2));
{
auto r = mock_store->read_requests.pop();
EXPECT_THAT(ParseKey(r.key), ElementsAre(0));
r(memory_store);
}

EXPECT_THAT(read_future.result(),
::testing::Optional(MakeArray<int>({1, 2})));

TENSORSTORE_ASSERT_OK(tensorstore::Write(
MakeArray<int>({42, 43}),
GetTensorStore(cache2, {}, 0, transaction) |
tensorstore::Dims(0).TranslateSizedInterval(0, 2)));

transaction.CommitAsync().IgnoreFuture();

{
auto r = mock_store->write_requests.pop();
EXPECT_THAT(ParseKey(r.key), ElementsAre(0));
r(memory_store);
}

TENSORSTORE_EXPECT_OK(transaction.future());

EXPECT_THAT(GetChunk({0}), ElementsAre(MakeArray<int>({42, 43})));
}

TEST_F(ChunkCacheTest, SeparateCachesTransactionalWriteThenRead) {
// Dimension 0 is chunked with a size of 2.
grid = ChunkGridSpecification({ChunkGridSpecification::Component{
SharedArray<const void>(MakeArray<int>({1, 2})), Box<>(1)}});
auto cache1 = MakeChunkCache();
auto cache2 = MakeChunkCache();

Transaction transaction(tensorstore::isolated);

TENSORSTORE_ASSERT_OK(tensorstore::Write(
MakeArray<int>({42, 43}),
GetTensorStore(cache1, {}, 0, transaction) |
tensorstore::Dims(0).TranslateSizedInterval(0, 2)));

EXPECT_THAT(
tensorstore::Read(GetTensorStore(cache2, {}, 0, transaction) |
tensorstore::Dims(0).TranslateSizedInterval(0, 2))
.result(),
::testing::Optional(MakeArray<int>({42, 43})));

transaction.CommitAsync().IgnoreFuture();

{
auto r = mock_store->write_requests.pop();
EXPECT_THAT(ParseKey(r.key), ElementsAre(0));
r(memory_store);
}

TENSORSTORE_EXPECT_OK(transaction.future());

EXPECT_THAT(GetChunk({0}), ElementsAre(MakeArray<int>({42, 43})));
}

TEST_F(ChunkCacheTest, SeparateCachesReadIfNotEqualAbort) {
// Dimension 0 is chunked with a size of 2.
grid = ChunkGridSpecification({ChunkGridSpecification::Component{
SharedArray<const void>(MakeArray<int>({1, 2})), Box<>(1)}});
auto cache1 = MakeChunkCache();
auto cache2 = MakeChunkCache();

Transaction transaction(tensorstore::isolated);

// Read to populate entry in cache1.
{
auto read_future =
tensorstore::Read(GetTensorStore(cache1, {}, 0, transaction) |
tensorstore::Dims(0).TranslateSizedInterval(0, 2));
{
auto r = mock_store->read_requests.pop();
EXPECT_THAT(ParseKey(r.key), ElementsAre(0));
EXPECT_EQ(StorageGeneration::Unknown(), r.options.if_not_equal);
r(memory_store);
}
EXPECT_THAT(read_future.result(),
::testing::Optional(MakeArray<int>({1, 2})));
}

// Read to populate read state of transaction node in cache2.
{
auto read_future =
tensorstore::Read(GetTensorStore(cache2, {}, 0, transaction) |
tensorstore::Dims(0).TranslateSizedInterval(0, 2));
{
auto r = mock_store->read_requests.pop();
EXPECT_THAT(ParseKey(r.key), ElementsAre(0));
EXPECT_EQ(StorageGeneration::NoValue(), r.options.if_not_equal);
r(memory_store);
}
EXPECT_THAT(read_future.result(),
::testing::Optional(MakeArray<int>({1, 2})));
}

// Re-read transaction node in cache2.
{
auto read_future =
tensorstore::Read(GetTensorStore(cache2, {}, 0, transaction) |
tensorstore::Dims(0).TranslateSizedInterval(0, 2));
{
auto r = mock_store->read_requests.pop();
EXPECT_THAT(ParseKey(r.key), ElementsAre(0));
EXPECT_EQ(StorageGeneration::NoValue(), r.options.if_not_equal);
r(memory_store);
}
EXPECT_THAT(read_future.result(),
::testing::Optional(MakeArray<int>({1, 2})));
}
}

} // namespace
27 changes: 22 additions & 5 deletions tensorstore/internal/cache/kvs_backed_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class KvsBackedCache : public Parent {
virtual void DoEncode(
std::shared_ptr<const typename Derived::ReadData> read_data,
EncodeReceiver receiver) {
ABSL_UNREACHABLE(); // COV_NF_LINE;
ABSL_UNREACHABLE(); // COV_NF_LINE
}

absl::Status AnnotateError(const absl::Status& error, bool reading) {
Expand Down Expand Up @@ -303,7 +303,9 @@ class KvsBackedCache : public Parent {
void set_value(AsyncCache::ReadState update) {
if (!StorageGeneration::NotEqualOrUnspecified(update.stamp.generation,
if_not_equal_)) {
return execution::set_cancel(receiver_);
return execution::set_value(
receiver_,
kvstore::ReadResult::Unspecified(std::move(update.stamp)));
}
if (!StorageGeneration::IsInnerLayerDirty(update.stamp.generation) &&
writeback_mode_ !=
Expand All @@ -312,7 +314,8 @@ class KvsBackedCache : public Parent {
self_->new_data_ = std::move(update.data);
}
return execution::set_value(
receiver_, kvstore::ReadResult::Unspecified(update.stamp));
receiver_,
kvstore::ReadResult::Unspecified(std::move(update.stamp)));
}
ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG)
<< *self_ << "DoEncode";
Expand All @@ -327,8 +330,20 @@ class KvsBackedCache : public Parent {
};
AsyncCache::TransactionNode::ApplyOptions apply_options;
apply_options.staleness_bound = options.staleness_bound;
apply_options.validate_only =
options.writeback_mode == ReadModifyWriteSource::kValidateOnly;
switch (options.writeback_mode) {
case ReadModifyWriteSource::kValidateOnly:
apply_options.apply_mode =
AsyncCache::TransactionNode::ApplyOptions::kValidateOnly;
break;
case ReadModifyWriteSource::kSpecifyUnchangedWriteback:
apply_options.apply_mode =
AsyncCache::TransactionNode::ApplyOptions::kSpecifyUnchanged;
break;
case ReadModifyWriteSource::kNormalWriteback:
apply_options.apply_mode =
AsyncCache::TransactionNode::ApplyOptions::kNormal;
break;
}
this->DoApply(
std::move(apply_options),
ApplyReceiverImpl{this, std::move(options.if_not_equal),
Expand All @@ -348,6 +363,8 @@ class KvsBackedCache : public Parent {

// Target to which this `ReadModifyWriteSource` is bound.
ReadModifyWriteTarget* target_;

// New data for the cache if the writeback completes successfully.
std::shared_ptr<const void> new_data_;
};

Expand Down
2 changes: 1 addition & 1 deletion tensorstore/internal/cache/kvs_backed_cache_testutil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Future<absl::Cord> KvsBackedTestCache::Entry::ReadValue(

void KvsBackedTestCache::TransactionNode::DoApply(ApplyOptions options,
ApplyReceiver receiver) {
if (options.validate_only && validators.empty()) {
if (options.apply_mode == ApplyOptions::kValidateOnly && validators.empty()) {
execution::set_value(
receiver, ReadState{{}, TimestampedStorageGeneration::Unconditional()});
return;
Expand Down
5 changes: 5 additions & 0 deletions tensorstore/kvstore/read_modify_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ class ReadModifyWriteSource {
/// This is called by the `ReadModifyWriteTarget` either during commit, or
/// in response to a read request by a subsequent read-modify-write
/// operation layered on top of this operation.
///
/// If `options.if_not_equal` is not satisfied, invokes `set_value` on
/// `receiver` with a `ReadResult` with a state of `ReadResult::kUnspecified`.
///
/// Must not invoke `set_cancel` on `receiver`.
virtual void KvsWriteback(WritebackOptions options,
WritebackReceiver receiver) = 0;

Expand Down
Loading

0 comments on commit a5ca269

Please sign in to comment.