Skip to content

Commit

Permalink
neuroglancer_uint64_sharded: efficiently handle unmodified writeback …
Browse files Browse the repository at this point in the history
…case

Previously, neuroglancer_uint64_sharded performed an unnecessary read
when committing a transaction that reads but does not modify a shard.

PiperOrigin-RevId: 599337271
Change-Id: Ieed40bc14fafd663b242fe72aa81c955b02d8690
  • Loading branch information
jbms authored and copybara-github committed Jan 18, 2024
1 parent 956d0a2 commit ebccd9a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ class ShardedKeyValueStoreWriteCache
entry, std::move(read_result));
}

absl::Time apply_staleness_bound_;
ApplyReceiver apply_receiver_;
ApplyOptions apply_options_;
absl::Status apply_status_;
};

Expand Down Expand Up @@ -645,7 +645,7 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::WritebackError() {

namespace {
void StartApply(ShardedKeyValueStoreWriteCache::TransactionNode& node) {
RetryAtomicWriteback(node.phases_, node.apply_staleness_bound_);
RetryAtomicWriteback(node.phases_, node.apply_options_.staleness_bound);
}

/// Attempts to compute the new encoded shard state that merges any mutations
Expand Down Expand Up @@ -747,7 +747,7 @@ void MergeForWriteback(ShardedKeyValueStoreWriteCache::TransactionNode& node,
// However, even in that case, the extra retries aren't really an added
// cost, because those retries would still be needed anyway due to the
// concurrent modifications.
node.apply_staleness_bound_ = absl::Now();
node.apply_options_.staleness_bound = absl::Now();
StartApply(node);
return;
}
Expand All @@ -769,7 +769,7 @@ void MergeForWriteback(ShardedKeyValueStoreWriteCache::TransactionNode& node,
void ShardedKeyValueStoreWriteCache::TransactionNode::DoApply(
ApplyOptions options, ApplyReceiver receiver) {
apply_receiver_ = std::move(receiver);
apply_staleness_bound_ = options.staleness_bound;
apply_options_ = options;
apply_status_ = absl::Status();

GetOwningCache(*this).executor()([this] { StartApply(*this); });
Expand All @@ -786,6 +786,7 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::AllEntriesDone(
GetOwningCache(*this).executor()([&self] {
TimestampedStorageGeneration stamp;
bool mismatch = false;
bool modified = false;

size_t num_chunks = 0;

Expand All @@ -795,8 +796,12 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::AllEntriesDone(
auto& buffered_entry =
static_cast<AtomicMultiPhaseMutation::BufferedReadModifyWriteEntry&>(
entry);
if (buffered_entry.read_result_.state !=
kvstore::ReadResult::kUnspecified) {
modified = true;
++num_chunks;
}
auto& entry_stamp = buffered_entry.read_result_.stamp;
++num_chunks;
if (StorageGeneration::IsConditional(entry_stamp.generation)) {
if (!StorageGeneration::IsUnknown(stamp.generation) &&
StorageGeneration::Clean(stamp.generation) !=
Expand All @@ -810,17 +815,26 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::AllEntriesDone(
}

if (mismatch) {
self.apply_staleness_bound_ = absl::Now();
self.apply_options_.staleness_bound = absl::Now();
StartApply(self);
return;
}
auto& cache = GetOwningCache(self);
if (!modified && StorageGeneration::IsUnknown(stamp.generation) &&
self.apply_options_.apply_mode !=
ApplyOptions::ApplyMode::kSpecifyUnchanged) {
internal::AsyncCache::ReadState update;
update.stamp = TimestampedStorageGeneration::Unconditional();
execution::set_value(std::exchange(self.apply_receiver_, {}),
std::move(update));
return;
}
if (!StorageGeneration::IsUnknown(stamp.generation) ||
!cache.get_max_chunks_per_shard_ ||
cache.get_max_chunks_per_shard_(GetOwningEntry(self).shard()) !=
num_chunks) {
self.internal::AsyncCache::TransactionNode::Read(
self.apply_staleness_bound_)
self.apply_options_.staleness_bound)
.ExecuteWhenReady([&self](ReadyFuture<const void> future) {
if (!future.result().ok()) {
execution::set_error(std::exchange(self.apply_receiver_, {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,25 @@ TEST_F(UnderlyingKeyValueStoreTest, Read) {
}
}

// Verify that a read-only transaction does not do any I/O on commit.
TEST_F(UnderlyingKeyValueStoreTest, TransactionReadThenCommit) {
tensorstore::Transaction txn(tensorstore::isolated);
auto memory_store = tensorstore::GetMemoryKeyValueStore();
{
auto future = kvstore::Read(KvStore{store, txn}, GetChunkKey(0x50), {});
{
auto req = mock_store->read_requests.pop();
req(memory_store);
ASSERT_EQ(0, mock_store->read_requests.size());
}
EXPECT_THAT(future.result(),
::testing::Optional(MatchesKvsReadResultNotFound()));
}

auto commit_future = txn.CommitAsync();
TENSORSTORE_ASSERT_OK(commit_future.result());
}

// Tests issuing read for chunk in uncached minishard index while there is a
// concurrent modification.
TEST_F(UnderlyingKeyValueStoreTest,
Expand Down

0 comments on commit ebccd9a

Please sign in to comment.