Skip to content

Commit

Permalink
Prevent inconsistencies between concurrent hash table and policy
Browse files Browse the repository at this point in the history
data structures caused by timing issues

- Replace the boolean `is_dirty` with two atomic u16 counters `entry_gen` and
  `policy_gen` to track the number of times an cached entry has been updated and the
  number of times its write log has been applied to the policy data structures.
- Update an internal `handle_upsert` method to check the `entry_gen` counter when
  removing an entry from the hash table.
  • Loading branch information
tatsuya6502 committed Nov 19, 2023
1 parent 3c4e920 commit 5670fef
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 63 deletions.
29 changes: 25 additions & 4 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ impl<K, V> ValueEntry<K, V> {
self.info.is_dirty()
}

pub(crate) fn set_dirty(&self, value: bool) {
self.info.set_dirty(value);
}

#[inline]
pub(crate) fn policy_weight(&self) -> u32 {
self.info.policy_weight()
Expand Down Expand Up @@ -314,6 +310,8 @@ pub(crate) enum WriteOp<K, V> {
Upsert {
key_hash: KeyHash<K>,
value_entry: TrioArc<ValueEntry<K, V>>,
/// Entry generation after the operation.
entry_gen: u16,
old_weight: u32,
new_weight: u32,
},
Expand All @@ -328,11 +326,13 @@ impl<K, V> Clone for WriteOp<K, V> {
Self::Upsert {
key_hash,
value_entry,
entry_gen,
old_weight,
new_weight,
} => Self::Upsert {
key_hash: key_hash.clone(),
value_entry: TrioArc::clone(value_entry),
entry_gen: *entry_gen,
old_weight: *old_weight,
new_weight: *new_weight,
},
Expand All @@ -350,6 +350,27 @@ impl<K, V> fmt::Debug for WriteOp<K, V> {
}
}

impl<K, V> WriteOp<K, V> {
pub(crate) fn new_upsert(
key: &Arc<K>,
hash: u64,
value_entry: &TrioArc<ValueEntry<K, V>>,
entry_generation: u16,
old_weight: u32,
new_weight: u32,
) -> Self {
let key_hash = KeyHash::new(Arc::clone(key), hash);
let value_entry = TrioArc::clone(value_entry);
Self::Upsert {
key_hash,
value_entry,
entry_gen: entry_generation,
old_weight,
new_weight,
}
}
}

pub(crate) struct OldEntryInfo<K, V> {
pub(crate) entry: TrioArc<ValueEntry<K, V>>,
pub(crate) last_accessed: Option<Instant>,
Expand Down
49 changes: 40 additions & 9 deletions src/common/concurrent/entry_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering};

use super::{AccessTime, KeyHash};
use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant};
Expand All @@ -10,10 +10,12 @@ pub(crate) struct EntryInfo<K> {
/// cache. When `false`, it means the entry is _temporary_ admitted to
/// the cache or evicted from the cache (so it should not have LRU nodes).
is_admitted: AtomicBool,
/// `is_dirty` indicates that the entry has been inserted (or updated)
/// in the hash table, but the history of the insertion has not yet
/// been applied to the LRU deques and LFU estimator.
is_dirty: AtomicBool,
/// `entry_gen` (entry generation) is incremented every time the entry is updated
/// in the concurrent hash table.
entry_gen: AtomicU16,
/// `policy_gen` (policy generation) is incremented every time entry's WriteOpe
/// is applied to the cache policies including the LRU deque and LFU estimator.
policy_gen: AtomicU16,
last_accessed: AtomicInstant,
last_modified: AtomicInstant,
expiration_time: AtomicInstant,
Expand All @@ -29,7 +31,8 @@ impl<K> EntryInfo<K> {
Self {
key_hash,
is_admitted: Default::default(),
is_dirty: AtomicBool::new(true),
entry_gen: AtomicU16::new(0),
policy_gen: AtomicU16::new(0),
last_accessed: AtomicInstant::new(timestamp),
last_modified: AtomicInstant::new(timestamp),
expiration_time: AtomicInstant::default(),
Expand All @@ -54,12 +57,40 @@ impl<K> EntryInfo<K> {

#[inline]
pub(crate) fn is_dirty(&self) -> bool {
self.is_dirty.load(Ordering::Acquire)
self.entry_gen.load(Ordering::Acquire) != self.policy_gen.load(Ordering::Acquire)
}

#[inline]
pub(crate) fn set_dirty(&self, value: bool) {
self.is_dirty.store(value, Ordering::Release);
pub(crate) fn entry_gen(&self) -> u16 {
self.entry_gen.load(Ordering::Acquire)
}

/// Increments the entry generation and returns the new value.
#[inline]
pub(crate) fn incr_entry_gen(&self) -> u16 {
// NOTE: This operation wraps around on overflow.
let prev = self.entry_gen.fetch_add(1, Ordering::AcqRel);
prev.wrapping_add(1)
}

/// Sets the policy generation to the given value.
#[inline]
pub(crate) fn set_policy_gen(&self, value: u16) {
let g = &self.policy_gen;
loop {
let current = g.load(Ordering::Acquire);
// TODO: Find a way to handle the case that the current value has been
// wrapped around. In such a case, `current < value`` actually means
// `current > value`.
if current >= value {
break;
}
if g.compare_exchange_weak(current, value, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
}

#[inline]
Expand Down
62 changes: 36 additions & 26 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,7 @@ where
// on_insert
|| {
let entry = self.new_value_entry(&key, hash, value.clone(), ts, weight);
let ins_op = WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(&entry),
old_weight: 0,
new_weight: weight,
};
let ins_op = WriteOp::new_upsert(&key, hash, &entry, 0, 0, weight);
let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed);
op1 = Some((cnt, ins_op));
entry
Expand All @@ -532,13 +527,8 @@ where
// that the OldEntryInfo can preserve the old EntryInfo's
// last_accessed and last_modified timestamps.
let old_info = OldEntryInfo::new(old_entry);
let entry = self.new_value_entry_from(value.clone(), ts, weight, old_entry);
let upd_op = WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(&entry),
old_weight,
new_weight: weight,
};
let (entry, gen) = self.new_value_entry_from(value.clone(), ts, weight, old_entry);
let upd_op = WriteOp::new_upsert(&key, hash, &entry, gen, old_weight, weight);
let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed);
op2 = Some((cnt, old_info, upd_op));
entry
Expand Down Expand Up @@ -784,15 +774,15 @@ impl<K, V, S> BaseCache<K, V, S> {
timestamp: Instant,
policy_weight: u32,
other: &ValueEntry<K, V>,
) -> TrioArc<ValueEntry<K, V>> {
) -> (TrioArc<ValueEntry<K, V>>, u16) {
let info = TrioArc::clone(other.entry_info());
// To prevent this updated ValueEntry from being evicted by an expiration policy,
// set the dirty flag to true. It will be reset to false when the write is applied.
info.set_dirty(true);
// increment the entry generation.
let gen = info.incr_entry_gen();
info.set_last_accessed(timestamp);
info.set_last_modified(timestamp);
info.set_policy_weight(policy_weight);
TrioArc::new(ValueEntry::new_from(value, info, other))
(TrioArc::new(ValueEntry::new_from(value, info, other)), gen)
}

fn expire_after_create(
Expand Down Expand Up @@ -1626,12 +1616,14 @@ where
Ok(Upsert {
key_hash: kh,
value_entry: entry,
entry_gen: gen,
old_weight,
new_weight,
}) => {
self.handle_upsert(
kh,
entry,
gen,
old_weight,
new_weight,
deqs,
Expand All @@ -1654,6 +1646,7 @@ where
&self,
kh: KeyHash<K>,
entry: TrioArc<ValueEntry<K, V>>,
gen: u16,
old_weight: u32,
new_weight: u32,
deqs: &mut Deques<K>,
Expand All @@ -1663,8 +1656,6 @@ where
) where
V: Clone,
{
entry.set_dirty(false);

{
let counters = &mut eviction_state.counters;

Expand All @@ -1675,13 +1666,15 @@ where
self.update_timer_wheel(&entry, timer_wheel);
deqs.move_to_back_ao(&entry);
deqs.move_to_back_wo(&entry);
entry.entry_info().set_policy_gen(gen);
return;
}

if self.has_enough_capacity(new_weight, counters) {
// There are enough room in the cache (or the cache is unbounded).
// Add the candidate to the deques.
self.handle_admit(&entry, new_weight, deqs, timer_wheel, counters);
entry.entry_info().set_policy_gen(gen);
return;
}
}
Expand All @@ -1698,7 +1691,11 @@ where
None
};

let removed = self.cache.remove(kh.hash, |k| k == &kh.key);
let removed = self.cache.remove_if(
kh.hash,
|k| k == &kh.key,
|_, entry| entry.entry_info().entry_gen() == gen,
);
if let Some(entry) = removed {
if eviction_state.is_notifier_enabled() {
let key = Arc::clone(&kh.key);
Expand All @@ -1707,6 +1704,7 @@ where
.await;
}
}
entry.entry_info().set_policy_gen(gen);
return;
}
}
Expand Down Expand Up @@ -1736,6 +1734,7 @@ where
};

if let Some((vic_key, vic_entry)) =
// TODO: Check if the entry generation matches.
self.cache.remove_entry(vic_hash, |k| k == &vic_key)
{
if eviction_state.is_notifier_enabled() {
Expand Down Expand Up @@ -1778,16 +1777,26 @@ where
None
};

// Remove the candidate from the cache (hash map).
// Remove the candidate from the cache (hash map) if the entry
// generation matches.
let key = Arc::clone(&kh.key);
self.cache.remove(kh.hash, |k| k == &key);
if eviction_state.is_notifier_enabled() {
eviction_state
.add_removed_entry(key, &entry, RemovalCause::Size)
.await;
let removed = self.cache.remove_if(
kh.hash,
|k| k == &key,
|_, entry| entry.entry_info().entry_gen() == gen,
);

if let Some(entry) = removed {
if eviction_state.is_notifier_enabled() && entry.entry_info().entry_gen() == gen
{
eviction_state
.add_removed_entry(key, &entry, RemovalCause::Size)
.await;
}
}
}
}
entry.entry_info().set_policy_gen(gen);
}

/// Performs size-aware admission explained in the paper:
Expand Down Expand Up @@ -1840,6 +1849,7 @@ where
if let Some(vic_entry) = cache.get(hash, |k| k == key) {
victims.add_policy_weight(vic_entry.policy_weight());
victims.add_frequency(freq, hash);
// TODO: Record the entry generation too.
victim_keys.push(KeyHash::new(Arc::clone(key), hash));
retries = 0;
} else {
Expand Down
Loading

0 comments on commit 5670fef

Please sign in to comment.