Skip to content

Commit

Permalink
Merge pull request #2978 from tediou5/chore/move-piece-cache-state-in…
Browse files Browse the repository at this point in the history
…to-submodule

chore: move `PieceCacheState` into submodule
  • Loading branch information
nazar-pc authored Aug 27, 2024
2 parents 465dd88 + de70bc1 commit 96eb7af
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 110 deletions.
142 changes: 32 additions & 110 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
//! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots.
mod metrics;
mod piece_cache_state;
#[cfg(test)]
mod tests;

use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
use crate::farmer_cache::metrics::FarmerCacheMetrics;
use crate::farmer_cache::piece_cache_state::PieceCachesState;
use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
Expand All @@ -17,7 +19,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
use prometheus_client::registry::Registry;
use rayon::prelude::*;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -111,60 +113,6 @@ impl CacheBackend {
}
}

#[derive(Debug, Clone)]
struct PieceCachesState<CacheIndex> {
stored_pieces: HashMap<RecordKey, FarmerCacheOffset<CacheIndex>>,
dangling_free_offsets: VecDeque<FarmerCacheOffset<CacheIndex>>,
backends: Vec<CacheBackend>,
}

impl<CacheIndex> PieceCachesState<CacheIndex>
where
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
{
fn pop_free_offset(&mut self) -> Option<FarmerCacheOffset<CacheIndex>> {
match self.dangling_free_offsets.pop_front() {
Some(free_offset) => {
debug!(?free_offset, "Popped dangling free offset");
Some(free_offset)
}
None => {
// Sort piece caches by number of stored pieces to fill those that are less
// populated first
let mut sorted_backends = self
.backends
.iter_mut()
.enumerate()
.filter_map(|(cache_index, backend)| {
Some((CacheIndex::try_from(cache_index).ok()?, backend))
})
.collect::<Vec<_>>();
sorted_backends.sort_unstable_by_key(|(_, backend)| backend.free_size());
sorted_backends
.into_iter()
.rev()
.find_map(|(cache_index, backend)| {
backend
.next_free()
.map(|free_offset| FarmerCacheOffset::new(cache_index, free_offset))
})
}
}
}
}

impl<CacheIndex> Default for PieceCachesState<CacheIndex> {
fn default() -> Self {
Self {
stored_pieces: HashMap::default(),
dangling_free_offsets: VecDeque::default(),
backends: Vec::default(),
}
}
}

#[derive(Debug)]
struct CacheState<CacheIndex> {
cache_stored_pieces: HashMap<RecordKey, FarmerCacheOffset<CacheIndex>>,
Expand Down Expand Up @@ -293,18 +241,19 @@ where
// TODO: Consider implementing optional re-sync of the piece instead of just forgetting
WorkerCommand::ForgetKey { key } => {
let mut caches = self.piece_caches.write().await;
let Some(offset) = caches.stored_pieces.remove(&key) else {
let Some(offset) = caches.remove_stored_piece(&key) else {
// Key not exist
return;
};
let cache_index = usize::from(offset.cache_index);

let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;
let Some(backend) = caches.backends.get(cache_index).cloned() else {
let Some(backend) = caches.get_backend(cache_index).cloned() else {
// Cache backend not exist
return;
};

caches.dangling_free_offsets.push_front(offset);
caches.push_dangling_free_offset(offset);
match backend.read_piece_index(piece_offset).await {
Ok(Some(piece_index)) => {
worker_state.heap.remove(KeyWrapper(piece_index));
Expand Down Expand Up @@ -342,13 +291,8 @@ where
info!("Initializing piece cache");

// Pull old cache state since it will be replaced with a new one and reuse its allocations
let PieceCachesState {
mut stored_pieces,
mut dangling_free_offsets,
backends: _,
} = mem::take(&mut *self.piece_caches.write().await);
stored_pieces.clear();
dangling_free_offsets.clear();
let (mut stored_pieces, mut dangling_free_offsets) =
mem::take(&mut *self.piece_caches.write().await).reuse();

debug!("Collecting pieces that were in the cache before");

Expand Down Expand Up @@ -487,11 +431,7 @@ where
};
}

let mut caches = PieceCachesState {
stored_pieces,
dangling_free_offsets,
backends,
};
let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);

info!("Synchronizing piece cache");

Expand Down Expand Up @@ -529,10 +469,7 @@ where

debug!(%last_segment_index, "Identified last segment index");

let limit = caches
.backends
.iter()
.fold(0usize, |acc, backend| acc + backend.total_capacity as usize);
let limit = caches.total_capacity();
worker_state.heap.clear();
// Change limit to number of pieces
worker_state.heap.set_limit(limit);
Expand All @@ -554,21 +491,14 @@ where
})
.collect::<HashMap<_, _>>();

let mut piece_caches_capacity_used = vec![0u32; caches.backends.len()];
let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
// Filter-out piece indices that are stored, but should not be as well as clean
// `inserted_piece_indices` from already stored piece indices, leaving just those that are
// still missing in cache
caches
.stored_pieces
.extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none())
.for_each(|(_piece_index, offset)| {
// There is no need to adjust the `last_stored_offset` of the `backend` here,
// as the free_offset will be preferentially taken from the dangling free offsets
caches.dangling_free_offsets.push_front(offset);
});
caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);

if let Some(metrics) = &self.metrics {
for offset in caches.stored_pieces.values() {
for offset in caches.stored_pieces_offests() {
piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
}

Expand Down Expand Up @@ -643,9 +573,9 @@ where
break;
};

let cache_index = usize::from(offset.cache_index);
let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;
if let Some(backend) = caches.backends.get(cache_index)
if let Some(backend) = caches.get_backend(cache_index)
&& let Err(error) = backend.write_piece(piece_offset, *piece_index, piece).await
{
// TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly
Expand All @@ -658,9 +588,7 @@ where
);
continue;
}
caches
.stored_pieces
.insert(RecordKey::from(piece_index.to_multihash()), offset);
caches.push_stored_piece(RecordKey::from(piece_index.to_multihash()), offset);

downloaded_pieces_count += 1;
// Do not print anything or send progress notification after last piece until piece
Expand Down Expand Up @@ -896,7 +824,7 @@ where
// Entry is already occupied, we need to find and replace old piece with new one
Some(KeyWrapper(old_piece_index)) => {
let old_record_key = RecordKey::from(old_piece_index.to_multihash());
let Some(offset) = caches.stored_pieces.remove(&old_record_key) else {
let Some(offset) = caches.remove_stored_piece(&old_record_key) else {
// Not this disk farm
warn!(
%old_piece_index,
Expand All @@ -906,9 +834,10 @@ where
);
return;
};
let cache_index = usize::from(offset.cache_index);

let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;
let Some(backend) = caches.backends.get(cache_index) else {
let Some(backend) = caches.get_backend(cache_index) else {
// Cache backend not exist
warn!(
%cache_index,
Expand All @@ -934,7 +863,7 @@ where
%piece_offset,
"Successfully replaced old cached piece"
);
caches.stored_pieces.insert(record_key, offset);
caches.push_stored_piece(record_key, offset);
}
}
// There is free space in cache, need to find a free spot and place piece there
Expand All @@ -947,9 +876,9 @@ where
);
return;
};
let cache_index = usize::from(offset.cache_index);
let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;
let Some(backend) = caches.backends.get(cache_index) else {
let Some(backend) = caches.get_backend(cache_index) else {
// Cache backend not exist
warn!(
%cache_index,
Expand Down Expand Up @@ -978,7 +907,7 @@ where
if let Some(metrics) = &self.metrics {
metrics.piece_cache_capacity_used.inc();
}
caches.stored_pieces.insert(record_key, offset);
caches.push_stored_piece(record_key, offset);
}
}
};
Expand Down Expand Up @@ -1134,14 +1063,13 @@ where
let maybe_piece_found = {
let caches = self.piece_caches.read().await;

caches.stored_pieces.get(&key).and_then(|offset| {
let cache_index = usize::from(offset.cache_index);
caches.get_stored_piece(&key).and_then(|offset| {
let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

Some((
piece_offset,
cache_index,
caches.backends.get(cache_index)?.clone(),
caches.get_backend(cache_index)?.clone(),
))
})
};
Expand Down Expand Up @@ -1212,17 +1140,16 @@ where
let key = RecordKey::from(piece_index.to_multihash());

let caches = self.piece_caches.read().await;
let Some(offset) = caches.stored_pieces.get(&key) else {
let Some(offset) = caches.get_stored_piece(&key) else {
if let Some(metrics) = &self.metrics {
metrics.cache_find_miss.inc();
}

return None;
};
let cache_index = usize::from(offset.cache_index);
let piece_offset = offset.piece_offset;

if let Some(backend) = caches.backends.get(cache_index) {
if let Some(backend) = caches.get_backend(offset.cache_index) {
if let Some(metrics) = &self.metrics {
metrics.cache_find_hit.inc();
}
Expand Down Expand Up @@ -1280,12 +1207,7 @@ where
CacheIndex: TryFrom<usize>,
{
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
if self
.piece_caches
.try_read()?
.stored_pieces
.contains_key(key)
{
if self.piece_caches.try_read()?.contains_stored_piece(key) {
// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead, these are
// acquired on demand when returning a `ProviderRecord` for the local node.
Expand Down
Loading

0 comments on commit 96eb7af

Please sign in to comment.