Skip to content

Commit

Permalink
kvs_backed_cache: Avoid no-op commit corrupting the cache
Browse files Browse the repository at this point in the history
The recent commit
a5ca269
introduced a bug whereby committing an AsyncCache::TransactionNode
that leaves the value unmodified could result in:
- the timestamp unnecessarily being set to `absl::InfinitePast()`,
  leading to unnecessary re-reads; or
- the data value incorrectly being set to nullptr, leading to
  incorrect read results.

PiperOrigin-RevId: 599336761
Change-Id: I75b6e91db691b48e8821ce172b7551cbcf1c2b69
  • Loading branch information
jbms authored and copybara-github committed Jan 18, 2024
1 parent cc3463e commit 70b16ae
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 30 deletions.
4 changes: 4 additions & 0 deletions tensorstore/internal/cache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ tensorstore_cc_library(
name = "kvs_backed_cache",
srcs = ["kvs_backed_cache.cc"],
hdrs = ["kvs_backed_cache.h"],
defines = select({
":async_cache_debug_setting": ["TENSORSTORE_ASYNC_CACHE_DEBUG"],
"//conditions:default": [],
}),
deps = [
":async_cache",
"//tensorstore:transaction",
Expand Down
44 changes: 29 additions & 15 deletions tensorstore/internal/cache/async_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ using PendingWritebackQueueAccessor =
TransactionNode::PendingWritebackQueueAccessor;
using PrepareForCommitState = TransactionNode::PrepareForCommitState;

// While the real epsilon is `absl::Nanoseconds(1) / 4`, `operator/` is not
// constexpr, and this value is sufficient for use here.
constexpr absl::Duration kEpsilonDuration = absl::Nanoseconds(1);

void AcquireReadRequestReference(Entry& entry) {
// Prevent the entry from being destroyed while the read is in progress.
internal::PinnedCacheEntry<AsyncCache>(&entry).release();
Expand Down Expand Up @@ -229,6 +233,7 @@ void SetReadState(EntryOrNode& entry_or_node, ReadState&& read_state,
return;
}
}
entry_or_node.read_request_state_.known_to_be_stale = false;
entry_or_node.read_request_state_.read_state = std::move(read_state);
size_t change =
read_state_size -
Expand All @@ -245,21 +250,27 @@ void SetReadState(EntryOrNode& entry_or_node, ReadState&& read_state,

template <typename EntryOrNode>
Future<const void> RequestRead(EntryOrNode& entry_or_node,
absl::Time staleness_bound) {
absl::Time staleness_bound,
bool must_not_be_known_to_be_stale) {
static_assert(std::is_same_v<EntryOrNode, Entry> ||
std::is_same_v<EntryOrNode, TransactionNode>);
auto& entry = GetOwningEntry(entry_or_node);
UniqueWriterLock lock(entry);

auto& request_state = entry_or_node.read_request_state_;
const auto existing_time =
GetEffectiveReadRequestState(entry_or_node).read_state.stamp.time;
auto& effective_request_state = GetEffectiveReadRequestState(entry_or_node);
const auto existing_time = effective_request_state.read_state.stamp.time;
if (existing_time != absl::InfinitePast() &&
existing_time >= staleness_bound) {
// `staleness_bound` satisfied by current data.
return MakeReadyFuture();
if (must_not_be_known_to_be_stale &&
effective_request_state.known_to_be_stale) {
staleness_bound = existing_time + kEpsilonDuration;
} else {
// `staleness_bound` satisfied by current data.
return MakeReadyFuture();
}
}

auto& request_state = entry_or_node.read_request_state_;
// `staleness_bound` not satisfied by current data.
request_state.queued_time = std::max(request_state.queued_time,
std::min(staleness_bound, absl::Now()));
Expand Down Expand Up @@ -423,10 +434,12 @@ size_t AsyncCache::DoGetSizeInBytes(Cache::Entry* base_entry) {
entry->read_request_state_.read_state_size;
}

Future<const void> AsyncCache::Entry::Read(absl::Time staleness_bound) {
Future<const void> AsyncCache::Entry::Read(absl::Time staleness_bound,
bool must_not_be_known_to_be_stale) {
ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG)
<< *this << "Read: staleness_bound=" << staleness_bound;
return RequestRead(*this, staleness_bound);
<< *this << "Read: staleness_bound=" << staleness_bound
<< ", must_not_be_known_to_be_stale=" << must_not_be_known_to_be_stale;
return RequestRead(*this, staleness_bound, must_not_be_known_to_be_stale);
}

void AsyncCache::Entry::ReadSuccess(ReadState&& read_state) {
Expand All @@ -448,15 +461,17 @@ AsyncCache::TransactionNode::TransactionNode(Entry& entry)
size_updated_(false) {}

Future<const void> AsyncCache::TransactionNode::Read(
absl::Time staleness_bound) {
absl::Time staleness_bound, bool must_not_be_known_to_be_stale) {
ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG)
<< *this << "Read: staleness_bound=" << staleness_bound;
<< *this << "Read: staleness_bound=" << staleness_bound
<< ", must_not_be_known_to_be_stale=" << must_not_be_known_to_be_stale;
if (reads_committed_ &&
(prepare_for_commit_state_.load(std::memory_order_acquire) !=
PrepareForCommitState::kReadyForCommitCalled)) {
return RequestRead(GetOwningEntry(*this), staleness_bound);
return RequestRead(GetOwningEntry(*this), staleness_bound,
must_not_be_known_to_be_stale);
}
return RequestRead(*this, staleness_bound);
return RequestRead(*this, staleness_bound, must_not_be_known_to_be_stale);
}

void AsyncCache::TransactionNode::ReadSuccess(ReadState&& read_state) {
Expand Down Expand Up @@ -531,8 +546,7 @@ void AsyncCache::TransactionNode::WritebackSuccess(ReadState&& read_state) {
assert(read_state_time >= request_state.read_state.stamp.time);
SetReadState(entry, std::move(read_state), read_state_size);
} else if (read_state_time > request_state.read_state.stamp.time) {
read_state_time = request_state.read_state.stamp.time =
absl::InfinitePast();
request_state.known_to_be_stale = true;
}

QueuedReadHandler queued_read_handler(request_state, read_state_time);
Expand Down
14 changes: 12 additions & 2 deletions tensorstore/internal/cache/async_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ class AsyncCache : public Cache {
/// The most recently-cached read state.
ReadState read_state;

/// `read_state` is known to be out-of-date, due to a local modification not
/// reflected in the cache.
bool known_to_be_stale = false;

/// The size in bytes consumed by `read_state.read_data`. This is the
/// cached result of calling
/// `Entry::ComputeReadDataSizeInBytes(read_state.read_data.get())`.
Expand Down Expand Up @@ -346,10 +350,15 @@ class AsyncCache : public Cache {
/// Requests data no older than `staleness_bound`.
///
/// \param staleness_bound Limit on data staleness.
/// \param must_not_be_known_to_be_stale Requests newer data if the existing
/// data is known to be out-of-date (e.g. due to a local write not
/// reflected in the cache), even if the existing data satisfies
/// `staleness_bound`.
/// \returns A future that resolves to a success state once data no older
/// than `staleness_bound` is available, or to an error state if the
/// request failed.
Future<const void> Read(absl::Time staleness_bound);
Future<const void> Read(absl::Time staleness_bound,
bool must_not_be_known_to_be_stale = true);

/// Obtains an existing or new transaction node for the specified entry and
/// transaction. May also be used to obtain an implicit transaction node.
Expand Down Expand Up @@ -616,7 +625,8 @@ class AsyncCache : public Cache {

/// Requests a read state for this transaction node that is current as of
/// the specified `staleness_bound`.
Future<const void> Read(absl::Time staleness_bound);
Future<const void> Read(absl::Time staleness_bound,
bool must_not_be_known_to_be_stale = true);

/// Requests initial or updated data from persistent storage for a single
/// `Entry`.
Expand Down
38 changes: 25 additions & 13 deletions tensorstore/internal/cache/kvs_backed_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class KvsBackedCache : public Parent {
}
struct EncodeReceiverImpl {
TransactionNode* self_;
AsyncCache::ReadState update_;
TimestampedStorageGeneration update_stamp_;
ReadModifyWriteSource::WritebackReceiver receiver_;
void set_error(absl::Status error) {
error = GetOwningEntry(*self_).AnnotateError(std::move(error),
Expand All @@ -295,12 +295,8 @@ class KvsBackedCache : public Parent {
void set_value(std::optional<absl::Cord> value) {
kvstore::ReadResult read_result =
value ? kvstore::ReadResult::Value(std::move(*value),
std::move(update_.stamp))
: kvstore::ReadResult::Missing(std::move(update_.stamp));

// FIXME: only save if committing, also could do this inside
// ApplyReceiverImpl
self_->new_data_ = std::move(update_.data);
std::move(update_stamp_))
: kvstore::ReadResult::Missing(std::move(update_stamp_));
execution::set_value(receiver_, std::move(read_result));
}
};
Expand Down Expand Up @@ -341,21 +337,32 @@ class KvsBackedCache : public Parent {
if (!StorageGeneration::IsInnerLayerDirty(update.stamp.generation) &&
writeback_mode_ !=
ReadModifyWriteSource::kSpecifyUnchangedWriteback) {
if (self_->transaction()->commit_started()) {
ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG)
<< *self_ << "DoApply: if_not_equal=" << if_not_equal_
<< ", mode=" << writeback_mode_
<< ", unmodified: " << update.stamp;
if (StorageGeneration::IsUnknown(update.stamp.generation)) {
self_->new_data_ = std::nullopt;
} else {
self_->new_data_ = std::move(update.data);
}
return execution::set_value(
receiver_,
kvstore::ReadResult::Unspecified(std::move(update.stamp)));
}
ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG)
<< *self_ << "DoApply: if_not_equal=" << if_not_equal_
<< ", mode=" << writeback_mode_ << ", encoding: " << update.stamp
<< ", commit_started=" << self_->transaction()->commit_started();
self_->new_data_ = update.data;
ABSL_LOG_IF(INFO, TENSORSTORE_ASYNC_CACHE_DEBUG)
<< *self_ << "DoEncode";
auto update_data =
std::static_pointer_cast<const typename Derived::ReadData>(
update.data);
std::move(update.data));
GetOwningEntry(*self_).DoEncode(
std::move(update_data),
EncodeReceiverImpl{self_, std::move(update),
EncodeReceiverImpl{self_, std::move(update.stamp),
std::move(receiver_)});
}
};
Expand All @@ -382,8 +389,13 @@ class KvsBackedCache : public Parent {
}

void KvsWritebackSuccess(TimestampedStorageGeneration new_stamp) override {
return this->WritebackSuccess(
AsyncCache::ReadState{std::move(new_data_), std::move(new_stamp)});
if (new_data_) {
this->WritebackSuccess(
AsyncCache::ReadState{std::move(*new_data_), std::move(new_stamp)});
} else {
// Unmodified.
this->WritebackSuccess(AsyncCache::ReadState{});
}
}
void KvsWritebackError() override { this->WritebackError(); }

Expand Down Expand Up @@ -416,7 +428,7 @@ class KvsBackedCache : public Parent {
ReadModifyWriteTarget* target_;

// New data for the cache if the writeback completes successfully.
std::shared_ptr<const void> new_data_;
std::optional<std::shared_ptr<const void>> new_data_;

// If not `StorageGeneration::Unknown()`, requires that the prior generation
// match this generation when the transaction is committed.
Expand Down

0 comments on commit 70b16ae

Please sign in to comment.