diff --git a/tensorstore/internal/cache/BUILD b/tensorstore/internal/cache/BUILD index 813971589..6e3aa78d5 100644 --- a/tensorstore/internal/cache/BUILD +++ b/tensorstore/internal/cache/BUILD @@ -104,6 +104,10 @@ tensorstore_cc_library( name = "kvs_backed_cache", srcs = ["kvs_backed_cache.cc"], hdrs = ["kvs_backed_cache.h"], + defines = select({ + ":async_cache_debug_setting": ["TENSORSTORE_ASYNC_CACHE_DEBUG"], + "//conditions:default": [], + }), deps = [ ":async_cache", "//tensorstore:transaction", diff --git a/tensorstore/internal/cache/async_cache.cc b/tensorstore/internal/cache/async_cache.cc index 4b9e21d57..dac565def 100644 --- a/tensorstore/internal/cache/async_cache.cc +++ b/tensorstore/internal/cache/async_cache.cc @@ -50,6 +50,10 @@ using PendingWritebackQueueAccessor = TransactionNode::PendingWritebackQueueAccessor; using PrepareForCommitState = TransactionNode::PrepareForCommitState; +// While the real epsilon is `absl::Nanoseconds(1) / 4`, `operator/` is not +// constexpr, and this value is sufficient for use here. +constexpr absl::Duration kEpsilonDuration = absl::Nanoseconds(1); + void AcquireReadRequestReference(Entry& entry) { // Prevent the entry from being destroyed while the read is in progress. internal::PinnedCacheEntry(&entry).release(); @@ -229,6 +233,7 @@ void SetReadState(EntryOrNode& entry_or_node, ReadState&& read_state, return; } } + entry_or_node.read_request_state_.known_to_be_stale = false; entry_or_node.read_request_state_.read_state = std::move(read_state); size_t change = read_state_size - @@ -245,21 +250,27 @@ void SetReadState(EntryOrNode& entry_or_node, ReadState&& read_state, template Future RequestRead(EntryOrNode& entry_or_node, - absl::Time staleness_bound) { + absl::Time staleness_bound, + bool must_not_be_known_to_be_stale) { static_assert(std::is_same_v || std::is_same_v); auto& entry = GetOwningEntry(entry_or_node); UniqueWriterLock lock(entry); - auto& request_state = entry_or_node.read_request_state_; - const auto existing_time = - GetEffectiveReadRequestState(entry_or_node).read_state.stamp.time; + auto& effective_request_state = GetEffectiveReadRequestState(entry_or_node); + const auto existing_time = effective_request_state.read_state.stamp.time; if (existing_time != absl::InfinitePast() && existing_time >= staleness_bound) { - // `staleness_bound` satisfied by current data. - return MakeReadyFuture(); + if (must_not_be_known_to_be_stale && + effective_request_state.known_to_be_stale) { + staleness_bound = existing_time + kEpsilonDuration; + } else { + // `staleness_bound` satisfied by current data. + return MakeReadyFuture(); + } } + auto& request_state = entry_or_node.read_request_state_; // `staleness_bound` not satisfied by current data. request_state.queued_time = std::max(request_state.queued_time, std::min(staleness_bound, absl::Now())); @@ -423,10 +434,12 @@ size_t AsyncCache::DoGetSizeInBytes(Cache::Entry* base_entry) { entry->read_request_state_.read_state_size; } -Future AsyncCache::Entry::Read(absl::Time staleness_bound) { +Future AsyncCache::Entry::Read(absl::Time staleness_bound, + bool must_not_be_known_to_be_stale) { ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG) - << *this << "Read: staleness_bound=" << staleness_bound; - return RequestRead(*this, staleness_bound); + << *this << "Read: staleness_bound=" << staleness_bound + << ", must_not_be_known_to_be_stale=" << must_not_be_known_to_be_stale; + return RequestRead(*this, staleness_bound, must_not_be_known_to_be_stale); } void AsyncCache::Entry::ReadSuccess(ReadState&& read_state) { @@ -448,15 +461,17 @@ AsyncCache::TransactionNode::TransactionNode(Entry& entry) size_updated_(false) {} Future AsyncCache::TransactionNode::Read( - absl::Time staleness_bound) { + absl::Time staleness_bound, bool must_not_be_known_to_be_stale) { ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG) - << *this << "Read: staleness_bound=" << staleness_bound; + << *this << "Read: staleness_bound=" << staleness_bound + << ", must_not_be_known_to_be_stale=" << must_not_be_known_to_be_stale; if (reads_committed_ && (prepare_for_commit_state_.load(std::memory_order_acquire) != PrepareForCommitState::kReadyForCommitCalled)) { - return RequestRead(GetOwningEntry(*this), staleness_bound); + return RequestRead(GetOwningEntry(*this), staleness_bound, + must_not_be_known_to_be_stale); } - return RequestRead(*this, staleness_bound); + return RequestRead(*this, staleness_bound, must_not_be_known_to_be_stale); } void AsyncCache::TransactionNode::ReadSuccess(ReadState&& read_state) { @@ -531,8 +546,7 @@ void AsyncCache::TransactionNode::WritebackSuccess(ReadState&& read_state) { assert(read_state_time >= request_state.read_state.stamp.time); SetReadState(entry, std::move(read_state), read_state_size); } else if (read_state_time > request_state.read_state.stamp.time) { - read_state_time = request_state.read_state.stamp.time = - absl::InfinitePast(); + request_state.known_to_be_stale = true; } QueuedReadHandler queued_read_handler(request_state, read_state_time); diff --git a/tensorstore/internal/cache/async_cache.h b/tensorstore/internal/cache/async_cache.h index 184b3aa6b..06961d9d0 100644 --- a/tensorstore/internal/cache/async_cache.h +++ b/tensorstore/internal/cache/async_cache.h @@ -195,6 +195,10 @@ class AsyncCache : public Cache { /// The most recently-cached read state. ReadState read_state; + /// `read_state` is known to be out-of-date, due to a local modification not + /// reflected in the cache. + bool known_to_be_stale = false; + /// The size in bytes consumed by `read_state.read_data`. This is the /// cached result of calling /// `Entry::ComputeReadDataSizeInBytes(read_state.read_data.get())`. @@ -346,10 +350,15 @@ class AsyncCache : public Cache { /// Requests data no older than `staleness_bound`. /// /// \param staleness_bound Limit on data staleness. + /// \param must_not_be_known_to_be_stale Requests newer data if the existing + /// data is known to be out-of-date (e.g. due to a local write not + /// reflected in the cache), even if the existing data satisfies + /// `staleness_bound`. /// \returns A future that resolves to a success state once data no older /// than `staleness_bound` is available, or to an error state if the /// request failed. - Future Read(absl::Time staleness_bound); + Future Read(absl::Time staleness_bound, + bool must_not_be_known_to_be_stale = true); /// Obtains an existing or new transaction node for the specified entry and /// transaction. May also be used to obtain an implicit transaction node. @@ -616,7 +625,8 @@ class AsyncCache : public Cache { /// Requests a read state for this transaction node that is current as of /// the specified `staleness_bound`. - Future Read(absl::Time staleness_bound); + Future Read(absl::Time staleness_bound, + bool must_not_be_known_to_be_stale = true); /// Requests initial or updated data from persistent storage for a single /// `Entry`. diff --git a/tensorstore/internal/cache/kvs_backed_cache.h b/tensorstore/internal/cache/kvs_backed_cache.h index fa68235f0..34af95839 100644 --- a/tensorstore/internal/cache/kvs_backed_cache.h +++ b/tensorstore/internal/cache/kvs_backed_cache.h @@ -284,7 +284,7 @@ class KvsBackedCache : public Parent { } struct EncodeReceiverImpl { TransactionNode* self_; - AsyncCache::ReadState update_; + TimestampedStorageGeneration update_stamp_; ReadModifyWriteSource::WritebackReceiver receiver_; void set_error(absl::Status error) { error = GetOwningEntry(*self_).AnnotateError(std::move(error), @@ -295,12 +295,8 @@ class KvsBackedCache : public Parent { void set_value(std::optional value) { kvstore::ReadResult read_result = value ? kvstore::ReadResult::Value(std::move(*value), - std::move(update_.stamp)) - : kvstore::ReadResult::Missing(std::move(update_.stamp)); - - // FIXME: only save if committing, also could do this inside - // ApplyReceiverImpl - self_->new_data_ = std::move(update_.data); + std::move(update_stamp_)) + : kvstore::ReadResult::Missing(std::move(update_stamp_)); execution::set_value(receiver_, std::move(read_result)); } }; @@ -341,21 +337,32 @@ class KvsBackedCache : public Parent { if (!StorageGeneration::IsInnerLayerDirty(update.stamp.generation) && writeback_mode_ != ReadModifyWriteSource::kSpecifyUnchangedWriteback) { - if (self_->transaction()->commit_started()) { + ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG) + << *self_ << "DoApply: if_not_equal=" << if_not_equal_ + << ", mode=" << writeback_mode_ + << ", unmodified: " << update.stamp; + if (StorageGeneration::IsUnknown(update.stamp.generation)) { + self_->new_data_ = std::nullopt; + } else { self_->new_data_ = std::move(update.data); } return execution::set_value( receiver_, kvstore::ReadResult::Unspecified(std::move(update.stamp))); } + ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG) + << *self_ << "DoApply: if_not_equal=" << if_not_equal_ + << ", mode=" << writeback_mode_ << ", encoding: " << update.stamp + << ", commit_started=" << self_->transaction()->commit_started(); + self_->new_data_ = update.data; ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG) << *self_ << "DoEncode"; auto update_data = std::static_pointer_cast( - update.data); + std::move(update.data)); GetOwningEntry(*self_).DoEncode( std::move(update_data), - EncodeReceiverImpl{self_, std::move(update), + EncodeReceiverImpl{self_, std::move(update.stamp), std::move(receiver_)}); } }; @@ -382,8 +389,13 @@ class KvsBackedCache : public Parent { } void KvsWritebackSuccess(TimestampedStorageGeneration new_stamp) override { - return this->WritebackSuccess( - AsyncCache::ReadState{std::move(new_data_), std::move(new_stamp)}); + if (new_data_) { + this->WritebackSuccess( + AsyncCache::ReadState{std::move(*new_data_), std::move(new_stamp)}); + } else { + // Unmodified. + this->WritebackSuccess(AsyncCache::ReadState{}); + } } void KvsWritebackError() override { this->WritebackError(); } @@ -416,7 +428,7 @@ class KvsBackedCache : public Parent { ReadModifyWriteTarget* target_; // New data for the cache if the writeback completes successfully. - std::shared_ptr new_data_; + std::optional> new_data_; // If not `StorageGeneration::Unknown()`, requires that the prior generation // match this generation when the transaction is committed.