diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index fac97939d0..53c0568d4b 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -4,11 +4,13 @@ //! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots. mod metrics; +mod piece_cache_state; #[cfg(test)] mod tests; use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache}; use crate::farmer_cache::metrics::FarmerCacheMetrics; +use crate::farmer_cache::piece_cache_state::PieceCachesState; use crate::node_client::NodeClient; use crate::utils::run_future_in_dedicated_thread; use async_lock::RwLock as AsyncRwLock; @@ -17,7 +19,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{select, FutureExt, StreamExt}; use prometheus_client::registry::Registry; use rayon::prelude::*; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::hash::Hash; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -111,60 +113,6 @@ impl CacheBackend { } } -#[derive(Debug, Clone)] -struct PieceCachesState { - stored_pieces: HashMap>, - dangling_free_offsets: VecDeque>, - backends: Vec, -} - -impl PieceCachesState -where - CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, - usize: From, - CacheIndex: TryFrom, -{ - fn pop_free_offset(&mut self) -> Option> { - match self.dangling_free_offsets.pop_front() { - Some(free_offset) => { - debug!(?free_offset, "Popped dangling free offset"); - Some(free_offset) - } - None => { - // Sort piece caches by number of stored pieces to fill those that are less - // populated first - let mut sorted_backends = self - .backends - .iter_mut() - .enumerate() - .filter_map(|(cache_index, backend)| { - Some((CacheIndex::try_from(cache_index).ok()?, backend)) - }) - .collect::>(); - sorted_backends.sort_unstable_by_key(|(_, backend)| backend.free_size()); - sorted_backends - .into_iter() - .rev() - .find_map(|(cache_index, backend)| { - backend - .next_free() - .map(|free_offset| FarmerCacheOffset::new(cache_index, free_offset)) - }) - } - } - } -} - -impl Default for PieceCachesState { - fn default() -> Self { - Self { - stored_pieces: HashMap::default(), - dangling_free_offsets: VecDeque::default(), - backends: Vec::default(), - } - } -} - #[derive(Debug)] struct CacheState { cache_stored_pieces: HashMap>, @@ -293,18 +241,19 @@ where // TODO: Consider implementing optional re-sync of the piece instead of just forgetting WorkerCommand::ForgetKey { key } => { let mut caches = self.piece_caches.write().await; - let Some(offset) = caches.stored_pieces.remove(&key) else { + let Some(offset) = caches.remove_stored_piece(&key) else { // Key not exist return; }; - let cache_index = usize::from(offset.cache_index); + + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - let Some(backend) = caches.backends.get(cache_index).cloned() else { + let Some(backend) = caches.get_backend(cache_index).cloned() else { // Cache backend not exist return; }; - caches.dangling_free_offsets.push_front(offset); + caches.push_dangling_free_offset(offset); match backend.read_piece_index(piece_offset).await { Ok(Some(piece_index)) => { worker_state.heap.remove(KeyWrapper(piece_index)); @@ -342,13 +291,8 @@ where info!("Initializing piece cache"); // Pull old cache state since it will be replaced with a new one and reuse its allocations - let PieceCachesState { - mut stored_pieces, - mut dangling_free_offsets, - backends: _, - } = mem::take(&mut *self.piece_caches.write().await); - stored_pieces.clear(); - dangling_free_offsets.clear(); + let (mut stored_pieces, mut dangling_free_offsets) = + mem::take(&mut *self.piece_caches.write().await).reuse(); debug!("Collecting pieces that were in the cache before"); @@ -487,11 +431,7 @@ where }; } - let mut caches = PieceCachesState { - stored_pieces, - dangling_free_offsets, - backends, - }; + let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends); info!("Synchronizing piece cache"); @@ -529,10 +469,7 @@ where debug!(%last_segment_index, "Identified last segment index"); - let limit = caches - .backends - .iter() - .fold(0usize, |acc, backend| acc + backend.total_capacity as usize); + let limit = caches.total_capacity(); worker_state.heap.clear(); // Change limit to number of pieces worker_state.heap.set_limit(limit); @@ -554,21 +491,14 @@ where }) .collect::>(); - let mut piece_caches_capacity_used = vec![0u32; caches.backends.len()]; + let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()]; // Filter-out piece indices that are stored, but should not be as well as clean // `inserted_piece_indices` from already stored piece indices, leaving just those that are // still missing in cache - caches - .stored_pieces - .extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none()) - .for_each(|(_piece_index, offset)| { - // There is no need to adjust the `last_stored_offset` of the `backend` here, - // as the free_offset will be preferentially taken from the dangling free offsets - caches.dangling_free_offsets.push_front(offset); - }); + caches.free_unneeded_stored_pieces(&mut piece_indices_to_store); if let Some(metrics) = &self.metrics { - for offset in caches.stored_pieces.values() { + for offset in caches.stored_pieces_offests() { piece_caches_capacity_used[usize::from(offset.cache_index)] += 1; } @@ -643,9 +573,9 @@ where break; }; - let cache_index = usize::from(offset.cache_index); + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - if let Some(backend) = caches.backends.get(cache_index) + if let Some(backend) = caches.get_backend(cache_index) && let Err(error) = backend.write_piece(piece_offset, *piece_index, piece).await { // TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly @@ -658,9 +588,7 @@ where ); continue; } - caches - .stored_pieces - .insert(RecordKey::from(piece_index.to_multihash()), offset); + caches.push_stored_piece(RecordKey::from(piece_index.to_multihash()), offset); downloaded_pieces_count += 1; // Do not print anything or send progress notification after last piece until piece @@ -896,7 +824,7 @@ where // Entry is already occupied, we need to find and replace old piece with new one Some(KeyWrapper(old_piece_index)) => { let old_record_key = RecordKey::from(old_piece_index.to_multihash()); - let Some(offset) = caches.stored_pieces.remove(&old_record_key) else { + let Some(offset) = caches.remove_stored_piece(&old_record_key) else { // Not this disk farm warn!( %old_piece_index, @@ -906,9 +834,10 @@ where ); return; }; - let cache_index = usize::from(offset.cache_index); + + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - let Some(backend) = caches.backends.get(cache_index) else { + let Some(backend) = caches.get_backend(cache_index) else { // Cache backend not exist warn!( %cache_index, @@ -934,7 +863,7 @@ where %piece_offset, "Successfully replaced old cached piece" ); - caches.stored_pieces.insert(record_key, offset); + caches.push_stored_piece(record_key, offset); } } // There is free space in cache, need to find a free spot and place piece there @@ -947,9 +876,9 @@ where ); return; }; - let cache_index = usize::from(offset.cache_index); + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - let Some(backend) = caches.backends.get(cache_index) else { + let Some(backend) = caches.get_backend(cache_index) else { // Cache backend not exist warn!( %cache_index, @@ -978,7 +907,7 @@ where if let Some(metrics) = &self.metrics { metrics.piece_cache_capacity_used.inc(); } - caches.stored_pieces.insert(record_key, offset); + caches.push_stored_piece(record_key, offset); } } }; @@ -1134,14 +1063,13 @@ where let maybe_piece_found = { let caches = self.piece_caches.read().await; - caches.stored_pieces.get(&key).and_then(|offset| { - let cache_index = usize::from(offset.cache_index); + caches.get_stored_piece(&key).and_then(|offset| { + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - Some(( piece_offset, cache_index, - caches.backends.get(cache_index)?.clone(), + caches.get_backend(cache_index)?.clone(), )) }) }; @@ -1212,17 +1140,16 @@ where let key = RecordKey::from(piece_index.to_multihash()); let caches = self.piece_caches.read().await; - let Some(offset) = caches.stored_pieces.get(&key) else { + let Some(offset) = caches.get_stored_piece(&key) else { if let Some(metrics) = &self.metrics { metrics.cache_find_miss.inc(); } return None; }; - let cache_index = usize::from(offset.cache_index); let piece_offset = offset.piece_offset; - if let Some(backend) = caches.backends.get(cache_index) { + if let Some(backend) = caches.get_backend(offset.cache_index) { if let Some(metrics) = &self.metrics { metrics.cache_find_hit.inc(); } @@ -1280,12 +1207,7 @@ where CacheIndex: TryFrom, { fn record(&self, key: &RecordKey) -> Option { - if self - .piece_caches - .try_read()? - .stored_pieces - .contains_key(key) - { + if self.piece_caches.try_read()?.contains_stored_piece(key) { // Note: We store our own provider records locally without local addresses // to avoid redundant storage and outdated addresses. Instead, these are // acquired on demand when returning a `ProviderRecord` for the local node. diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs new file mode 100644 index 0000000000..2e2982a05b --- /dev/null +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -0,0 +1,154 @@ +use crate::farmer_cache::{CacheBackend, FarmerCacheOffset}; +use std::collections::hash_map::Values; +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::hash::Hash; +use subspace_core_primitives::PieceIndex; +use subspace_networking::libp2p::kad::RecordKey; +use tracing::{debug, trace}; + +#[derive(Debug, Clone)] +pub(super) struct PieceCachesState { + stored_pieces: HashMap>, + dangling_free_offsets: VecDeque>, + backends: Vec, +} + +impl PieceCachesState +where + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, +{ + pub(super) fn new( + stored_pieces: HashMap>, + dangling_free_offsets: VecDeque>, + backends: Vec, + ) -> Self { + Self { + stored_pieces, + dangling_free_offsets, + backends, + } + } + + pub(super) fn total_capacity(&self) -> usize { + self.backends() + .fold(0usize, |acc, backend| acc + backend.total_capacity as usize) + } + + pub(super) fn pop_free_offset(&mut self) -> Option> { + match self.dangling_free_offsets.pop_front() { + Some(free_offset) => { + debug!(?free_offset, "Popped dangling free offset"); + Some(free_offset) + } + None => { + // Sort piece caches by number of stored pieces to fill those that are less + // populated first + let mut sorted_backends = self + .backends + .iter_mut() + .enumerate() + .filter_map(|(cache_index, backend)| { + Some((CacheIndex::try_from(cache_index).ok()?, backend)) + }) + .collect::>(); + sorted_backends.sort_unstable_by_key(|(_, backend)| backend.free_size()); + sorted_backends + .into_iter() + .rev() + .find_map(|(cache_index, backend)| { + backend + .next_free() + .map(|free_offset| FarmerCacheOffset::new(cache_index, free_offset)) + }) + } + } + } + + pub(super) fn get_stored_piece( + &self, + key: &RecordKey, + ) -> Option<&FarmerCacheOffset> { + self.stored_pieces.get(key) + } + + pub(super) fn contains_stored_piece(&self, key: &RecordKey) -> bool { + self.stored_pieces.contains_key(key) + } + + pub(super) fn push_stored_piece( + &mut self, + key: RecordKey, + cache_offset: FarmerCacheOffset, + ) -> Option> { + self.stored_pieces.insert(key, cache_offset) + } + + pub(super) fn stored_pieces_offests( + &self, + ) -> Values<'_, RecordKey, FarmerCacheOffset> { + self.stored_pieces.values() + } + + pub(super) fn remove_stored_piece( + &mut self, + key: &RecordKey, + ) -> Option> { + self.stored_pieces.remove(key) + } + + pub(super) fn free_unneeded_stored_pieces( + &mut self, + piece_indices_to_store: &mut HashMap, + ) { + self.stored_pieces + .extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none()) + .for_each(|(_piece_index, offset)| { + // There is no need to adjust the `last_stored_offset` of the `backend` here, + // as the free_offset will be preferentially taken from the dangling free offsets + self.dangling_free_offsets.push_back(offset); + }) + } + + pub(super) fn push_dangling_free_offset(&mut self, offset: FarmerCacheOffset) { + trace!(?offset, "Pushing dangling free offset"); + self.dangling_free_offsets.push_back(offset); + } + + pub(super) fn get_backend(&self, cache_index: CacheIndex) -> Option<&CacheBackend> { + self.backends.get(usize::from(cache_index)) + } + + pub(super) fn backends(&self) -> impl ExactSizeIterator { + self.backends.iter() + } + + pub(super) fn reuse( + self, + ) -> ( + HashMap>, + VecDeque>, + ) { + let Self { + mut stored_pieces, + mut dangling_free_offsets, + backends: _, + } = self; + + stored_pieces.clear(); + dangling_free_offsets.clear(); + (stored_pieces, dangling_free_offsets) + } +} + +impl Default for PieceCachesState { + fn default() -> Self { + Self { + stored_pieces: HashMap::default(), + dangling_free_offsets: VecDeque::default(), + backends: Vec::default(), + } + } +}