diff --git a/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc b/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc index dd80c0623..22d5298f0 100644 --- a/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc +++ b/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded.cc @@ -584,8 +584,8 @@ class ShardedKeyValueStoreWriteCache entry, std::move(read_result)); } - absl::Time apply_staleness_bound_; ApplyReceiver apply_receiver_; + ApplyOptions apply_options_; absl::Status apply_status_; }; @@ -645,7 +645,7 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::WritebackError() { namespace { void StartApply(ShardedKeyValueStoreWriteCache::TransactionNode& node) { - RetryAtomicWriteback(node.phases_, node.apply_staleness_bound_); + RetryAtomicWriteback(node.phases_, node.apply_options_.staleness_bound); } /// Attempts to compute the new encoded shard state that merges any mutations @@ -747,7 +747,7 @@ void MergeForWriteback(ShardedKeyValueStoreWriteCache::TransactionNode& node, // However, even in that case, the extra retries aren't really an added // cost, because those retries would still be needed anyway due to the // concurrent modifications. - node.apply_staleness_bound_ = absl::Now(); + node.apply_options_.staleness_bound = absl::Now(); StartApply(node); return; } @@ -769,7 +769,7 @@ void MergeForWriteback(ShardedKeyValueStoreWriteCache::TransactionNode& node, void ShardedKeyValueStoreWriteCache::TransactionNode::DoApply( ApplyOptions options, ApplyReceiver receiver) { apply_receiver_ = std::move(receiver); - apply_staleness_bound_ = options.staleness_bound; + apply_options_ = options; apply_status_ = absl::Status(); GetOwningCache(*this).executor()([this] { StartApply(*this); }); @@ -786,6 +786,7 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::AllEntriesDone( GetOwningCache(*this).executor()([&self] { TimestampedStorageGeneration stamp; bool mismatch = false; + bool modified = false; size_t num_chunks = 0; @@ -795,8 +796,12 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::AllEntriesDone( auto& buffered_entry = static_cast( entry); + if (buffered_entry.read_result_.state != + kvstore::ReadResult::kUnspecified) { + modified = true; + ++num_chunks; + } auto& entry_stamp = buffered_entry.read_result_.stamp; - ++num_chunks; if (StorageGeneration::IsConditional(entry_stamp.generation)) { if (!StorageGeneration::IsUnknown(stamp.generation) && StorageGeneration::Clean(stamp.generation) != @@ -810,17 +815,26 @@ void ShardedKeyValueStoreWriteCache::TransactionNode::AllEntriesDone( } if (mismatch) { - self.apply_staleness_bound_ = absl::Now(); + self.apply_options_.staleness_bound = absl::Now(); StartApply(self); return; } auto& cache = GetOwningCache(self); + if (!modified && StorageGeneration::IsUnknown(stamp.generation) && + self.apply_options_.apply_mode != + ApplyOptions::ApplyMode::kSpecifyUnchanged) { + internal::AsyncCache::ReadState update; + update.stamp = TimestampedStorageGeneration::Unconditional(); + execution::set_value(std::exchange(self.apply_receiver_, {}), + std::move(update)); + return; + } if (!StorageGeneration::IsUnknown(stamp.generation) || !cache.get_max_chunks_per_shard_ || cache.get_max_chunks_per_shard_(GetOwningEntry(self).shard()) != num_chunks) { self.internal::AsyncCache::TransactionNode::Read( - self.apply_staleness_bound_) + self.apply_options_.staleness_bound) .ExecuteWhenReady([&self](ReadyFuture future) { if (!future.result().ok()) { execution::set_error(std::exchange(self.apply_receiver_, {}), diff --git a/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded_test.cc b/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded_test.cc index 33a515ca7..9bc7a9644 100644 --- a/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded_test.cc +++ b/tensorstore/kvstore/neuroglancer_uint64_sharded/neuroglancer_uint64_sharded_test.cc @@ -892,6 +892,25 @@ TEST_F(UnderlyingKeyValueStoreTest, Read) { } } +// Verify that a read-only transaction does not do any I/O on commit. +TEST_F(UnderlyingKeyValueStoreTest, TransactionReadThenCommit) { + tensorstore::Transaction txn(tensorstore::isolated); + auto memory_store = tensorstore::GetMemoryKeyValueStore(); + { + auto future = kvstore::Read(KvStore{store, txn}, GetChunkKey(0x50), {}); + { + auto req = mock_store->read_requests.pop(); + req(memory_store); + ASSERT_EQ(0, mock_store->read_requests.size()); + } + EXPECT_THAT(future.result(), + ::testing::Optional(MatchesKvsReadResultNotFound())); + } + + auto commit_future = txn.CommitAsync(); + TENSORSTORE_ASSERT_OK(commit_future.result()); +} + // Tests issuing read for chunk in uncached minishard index while there is a // concurrent modification. TEST_F(UnderlyingKeyValueStoreTest,