From e741cb175e91d665516cf031174c5d551532967f Mon Sep 17 00:00:00 2001 From: tedio5 Date: Tue, 20 Aug 2024 18:32:25 +0800 Subject: [PATCH 1/6] chore: move PieceCacheState into submodule --- crates/subspace-farmer/src/farmer_cache.rs | 140 ++++------------ .../src/farmer_cache/piece_cache_state.rs | 150 ++++++++++++++++++ 2 files changed, 182 insertions(+), 108 deletions(-) create mode 100644 crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index fac97939d0..8dc58b6336 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -4,11 +4,13 @@ //! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots. mod metrics; +mod piece_cache_state; #[cfg(test)] mod tests; use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache}; use crate::farmer_cache::metrics::FarmerCacheMetrics; +use crate::farmer_cache::piece_cache_state::PieceCachesState; use crate::node_client::NodeClient; use crate::utils::run_future_in_dedicated_thread; use async_lock::RwLock as AsyncRwLock; @@ -17,7 +19,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{select, FutureExt, StreamExt}; use prometheus_client::registry::Registry; use rayon::prelude::*; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::hash::Hash; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -111,60 +113,6 @@ impl CacheBackend { } } -#[derive(Debug, Clone)] -struct PieceCachesState { - stored_pieces: HashMap>, - dangling_free_offsets: VecDeque>, - backends: Vec, -} - -impl PieceCachesState -where - CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, - usize: From, - CacheIndex: TryFrom, -{ - fn pop_free_offset(&mut self) -> Option> { - match self.dangling_free_offsets.pop_front() { - Some(free_offset) => { - debug!(?free_offset, "Popped dangling free offset"); - Some(free_offset) - } - None => { - // Sort piece caches by number of stored pieces to fill those that are less - // populated first - let mut sorted_backends = self - .backends - .iter_mut() - .enumerate() - .filter_map(|(cache_index, backend)| { - Some((CacheIndex::try_from(cache_index).ok()?, backend)) - }) - .collect::>(); - sorted_backends.sort_unstable_by_key(|(_, backend)| backend.free_size()); - sorted_backends - .into_iter() - .rev() - .find_map(|(cache_index, backend)| { - backend - .next_free() - .map(|free_offset| FarmerCacheOffset::new(cache_index, free_offset)) - }) - } - } - } -} - -impl Default for PieceCachesState { - fn default() -> Self { - Self { - stored_pieces: HashMap::default(), - dangling_free_offsets: VecDeque::default(), - backends: Vec::default(), - } - } -} - #[derive(Debug)] struct CacheState { cache_stored_pieces: HashMap>, @@ -293,18 +241,19 @@ where // TODO: Consider implementing optional re-sync of the piece instead of just forgetting WorkerCommand::ForgetKey { key } => { let mut caches = self.piece_caches.write().await; - let Some(offset) = caches.stored_pieces.remove(&key) else { + let Some(offset) = caches.remove_stored_piece(&key) else { // Key not exist return; }; - let cache_index = usize::from(offset.cache_index); + + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - let Some(backend) = caches.backends.get(cache_index).cloned() else { + let Some(backend) = caches.get_backend(cache_index).cloned() else { // Cache backend not exist return; }; - caches.dangling_free_offsets.push_front(offset); + caches.push_dangling_free_offset(offset); match backend.read_piece_index(piece_offset).await { Ok(Some(piece_index)) => { worker_state.heap.remove(KeyWrapper(piece_index)); @@ -342,13 +291,8 @@ where info!("Initializing piece cache"); // Pull old cache state since it will be replaced with a new one and reuse its allocations - let PieceCachesState { - mut stored_pieces, - mut dangling_free_offsets, - backends: _, - } = mem::take(&mut *self.piece_caches.write().await); - stored_pieces.clear(); - dangling_free_offsets.clear(); + let (mut stored_pieces, mut dangling_free_offsets) = + mem::take(&mut *self.piece_caches.write().await).clear(); debug!("Collecting pieces that were in the cache before"); @@ -487,11 +431,7 @@ where }; } - let mut caches = PieceCachesState { - stored_pieces, - dangling_free_offsets, - backends, - }; + let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends); info!("Synchronizing piece cache"); @@ -530,8 +470,7 @@ where debug!(%last_segment_index, "Identified last segment index"); let limit = caches - .backends - .iter() + .backends() .fold(0usize, |acc, backend| acc + backend.total_capacity as usize); worker_state.heap.clear(); // Change limit to number of pieces @@ -554,21 +493,14 @@ where }) .collect::>(); - let mut piece_caches_capacity_used = vec![0u32; caches.backends.len()]; + let mut piece_caches_capacity_used = vec![0u32; caches.backends_count()]; // Filter-out piece indices that are stored, but should not be as well as clean // `inserted_piece_indices` from already stored piece indices, leaving just those that are // still missing in cache - caches - .stored_pieces - .extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none()) - .for_each(|(_piece_index, offset)| { - // There is no need to adjust the `last_stored_offset` of the `backend` here, - // as the free_offset will be preferentially taken from the dangling free offsets - caches.dangling_free_offsets.push_front(offset); - }); + caches.free_stored_piece_if(|key, _offset| piece_indices_to_store.remove(key).is_none()); if let Some(metrics) = &self.metrics { - for offset in caches.stored_pieces.values() { + for offset in caches.stored_pieces() { piece_caches_capacity_used[usize::from(offset.cache_index)] += 1; } @@ -643,9 +575,9 @@ where break; }; - let cache_index = usize::from(offset.cache_index); + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - if let Some(backend) = caches.backends.get(cache_index) + if let Some(backend) = caches.get_backend(cache_index) && let Err(error) = backend.write_piece(piece_offset, *piece_index, piece).await { // TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly @@ -658,9 +590,7 @@ where ); continue; } - caches - .stored_pieces - .insert(RecordKey::from(piece_index.to_multihash()), offset); + caches.push_stored_piece(RecordKey::from(piece_index.to_multihash()), offset); downloaded_pieces_count += 1; // Do not print anything or send progress notification after last piece until piece @@ -896,7 +826,7 @@ where // Entry is already occupied, we need to find and replace old piece with new one Some(KeyWrapper(old_piece_index)) => { let old_record_key = RecordKey::from(old_piece_index.to_multihash()); - let Some(offset) = caches.stored_pieces.remove(&old_record_key) else { + let Some(offset) = caches.remove_stored_piece(&old_record_key) else { // Not this disk farm warn!( %old_piece_index, @@ -906,9 +836,10 @@ where ); return; }; - let cache_index = usize::from(offset.cache_index); + + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - let Some(backend) = caches.backends.get(cache_index) else { + let Some(backend) = caches.get_backend(cache_index) else { // Cache backend not exist warn!( %cache_index, @@ -934,7 +865,7 @@ where %piece_offset, "Successfully replaced old cached piece" ); - caches.stored_pieces.insert(record_key, offset); + caches.push_stored_piece(record_key, offset); } } // There is free space in cache, need to find a free spot and place piece there @@ -947,9 +878,9 @@ where ); return; }; - let cache_index = usize::from(offset.cache_index); + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - let Some(backend) = caches.backends.get(cache_index) else { + let Some(backend) = caches.get_backend(cache_index) else { // Cache backend not exist warn!( %cache_index, @@ -978,7 +909,7 @@ where if let Some(metrics) = &self.metrics { metrics.piece_cache_capacity_used.inc(); } - caches.stored_pieces.insert(record_key, offset); + caches.push_stored_piece(record_key, offset); } } }; @@ -1134,14 +1065,13 @@ where let maybe_piece_found = { let caches = self.piece_caches.read().await; - caches.stored_pieces.get(&key).and_then(|offset| { - let cache_index = usize::from(offset.cache_index); + caches.get_stored_piece(&key).and_then(|offset| { + let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - Some(( piece_offset, cache_index, - caches.backends.get(cache_index)?.clone(), + caches.get_backend(cache_index)?.clone(), )) }) }; @@ -1212,17 +1142,16 @@ where let key = RecordKey::from(piece_index.to_multihash()); let caches = self.piece_caches.read().await; - let Some(offset) = caches.stored_pieces.get(&key) else { + let Some(offset) = caches.get_stored_piece(&key) else { if let Some(metrics) = &self.metrics { metrics.cache_find_miss.inc(); } return None; }; - let cache_index = usize::from(offset.cache_index); let piece_offset = offset.piece_offset; - if let Some(backend) = caches.backends.get(cache_index) { + if let Some(backend) = caches.get_backend(offset.cache_index) { if let Some(metrics) = &self.metrics { metrics.cache_find_hit.inc(); } @@ -1280,12 +1209,7 @@ where CacheIndex: TryFrom, { fn record(&self, key: &RecordKey) -> Option { - if self - .piece_caches - .try_read()? - .stored_pieces - .contains_key(key) - { + if self.piece_caches.try_read()?.contains_stored_piece(key) { // Note: We store our own provider records locally without local addresses // to avoid redundant storage and outdated addresses. Instead, these are // acquired on demand when returning a `ProviderRecord` for the local node. diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs new file mode 100644 index 0000000000..46c87dc0e0 --- /dev/null +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -0,0 +1,150 @@ +use crate::farmer_cache::{CacheBackend, FarmerCacheOffset}; +use std::collections::hash_map::Values; +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::hash::Hash; +use subspace_networking::libp2p::kad::RecordKey; +use tracing::{debug, trace}; + +#[derive(Debug, Clone)] +pub(super) struct PieceCachesState { + stored_pieces: HashMap>, + dangling_free_offsets: VecDeque>, + backends: Vec, +} + +impl PieceCachesState +where + CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static, + usize: From, + CacheIndex: TryFrom, +{ + pub(super) fn new( + stored_pieces: HashMap>, + dangling_free_offsets: VecDeque>, + backends: Vec, + ) -> Self { + Self { + stored_pieces, + dangling_free_offsets, + backends, + } + } + + pub(super) fn pop_free_offset(&mut self) -> Option> { + match self.dangling_free_offsets.pop_front() { + Some(free_offset) => { + debug!(?free_offset, "Popped dangling free offset"); + Some(free_offset) + } + None => { + // Sort piece caches by number of stored pieces to fill those that are less + // populated first + let mut sorted_backends = self + .backends + .iter_mut() + .enumerate() + .filter_map(|(cache_index, backend)| { + Some((CacheIndex::try_from(cache_index).ok()?, backend)) + }) + .collect::>(); + sorted_backends.sort_unstable_by_key(|(_, backend)| backend.free_size()); + sorted_backends + .into_iter() + .rev() + .find_map(|(cache_index, backend)| { + backend + .next_free() + .map(|free_offset| FarmerCacheOffset::new(cache_index, free_offset)) + }) + } + } + } + + pub(super) fn get_stored_piece( + &self, + key: &RecordKey, + ) -> Option<&FarmerCacheOffset> { + self.stored_pieces.get(key) + } + + pub(super) fn contains_stored_piece(&self, key: &RecordKey) -> bool { + self.stored_pieces.contains_key(key) + } + + pub(super) fn push_stored_piece( + &mut self, + key: RecordKey, + cache_offset: FarmerCacheOffset, + ) -> Option> { + self.stored_pieces.insert(key, cache_offset) + } + + pub(super) fn stored_pieces(&self) -> Values<'_, RecordKey, FarmerCacheOffset> { + self.stored_pieces.values() + } + + pub(super) fn remove_stored_piece( + &mut self, + key: &RecordKey, + ) -> Option> { + self.stored_pieces.remove(key) + } + + pub(super) fn free_stored_piece_if(&mut self, pred: F) + where + F: FnMut(&RecordKey, &mut FarmerCacheOffset) -> bool, + { + self.stored_pieces + .extract_if(pred) + .for_each(|(_piece_index, offset)| { + // There is no need to adjust the `last_stored_offset` of the `backend` here, + // as the free_offset will be preferentially taken from the dangling free offsets + self.dangling_free_offsets.push_back(offset); + }) + } + + pub(super) fn push_dangling_free_offset(&mut self, offset: FarmerCacheOffset) { + trace!(?offset, "Pushing dangling free offset"); + self.dangling_free_offsets.push_back(offset); + } + + pub(super) fn get_backend(&self, cache_index: CacheIndex) -> Option<&CacheBackend> { + self.backends.get(usize::from(cache_index)) + } + + pub(super) fn backends(&self) -> impl Iterator { + self.backends.iter() + } + + pub(super) fn backends_count(&self) -> usize { + self.backends.len() + } + + pub(super) fn clear( + self, + ) -> ( + HashMap>, + VecDeque>, + ) { + let Self { + mut stored_pieces, + mut dangling_free_offsets, + backends: _, + } = self; + + stored_pieces.clear(); + dangling_free_offsets.clear(); + (stored_pieces, dangling_free_offsets) + } +} + +impl Default for PieceCachesState { + fn default() -> Self { + Self { + stored_pieces: HashMap::default(), + dangling_free_offsets: VecDeque::default(), + backends: Vec::default(), + } + } +} From c2de7d916d2519ed44c6288bab3c631504710a32 Mon Sep 17 00:00:00 2001 From: tedio5 Date: Wed, 21 Aug 2024 09:10:44 +0800 Subject: [PATCH 2/6] chore: rename PieceCacheState::clear => reuse to avoid ambiguity --- crates/subspace-farmer/src/farmer_cache.rs | 2 +- crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 8dc58b6336..0d9c461f48 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -292,7 +292,7 @@ where // Pull old cache state since it will be replaced with a new one and reuse its allocations let (mut stored_pieces, mut dangling_free_offsets) = - mem::take(&mut *self.piece_caches.write().await).clear(); + mem::take(&mut *self.piece_caches.write().await).reuse(); debug!("Collecting pieces that were in the cache before"); diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs index 46c87dc0e0..8d07ee7c73 100644 --- a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -121,7 +121,7 @@ where self.backends.len() } - pub(super) fn clear( + pub(super) fn reuse( self, ) -> ( HashMap>, From ae2007a7b397808e5184d6676f06fcb886b309ae Mon Sep 17 00:00:00 2001 From: tedio5 Date: Fri, 23 Aug 2024 09:40:57 +0800 Subject: [PATCH 3/6] chore: replace backends Iterator with ExactSizeIterator --- crates/subspace-farmer/src/farmer_cache.rs | 2 +- .../subspace-farmer/src/farmer_cache/piece_cache_state.rs | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 0d9c461f48..535da84707 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -493,7 +493,7 @@ where }) .collect::>(); - let mut piece_caches_capacity_used = vec![0u32; caches.backends_count()]; + let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()]; // Filter-out piece indices that are stored, but should not be as well as clean // `inserted_piece_indices` from already stored piece indices, leaving just those that are // still missing in cache diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs index 8d07ee7c73..0a8959ef5c 100644 --- a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -113,14 +113,10 @@ where self.backends.get(usize::from(cache_index)) } - pub(super) fn backends(&self) -> impl Iterator { + pub(super) fn backends(&self) -> impl ExactSizeIterator { self.backends.iter() } - pub(super) fn backends_count(&self) -> usize { - self.backends.len() - } - pub(super) fn reuse( self, ) -> ( From f33bf45f6260a4f2199331ffd1904f757d1d4738 Mon Sep 17 00:00:00 2001 From: tedio5 Date: Fri, 23 Aug 2024 09:59:21 +0800 Subject: [PATCH 4/6] chore: add total_capacity fn for PieceCacheState --- crates/subspace-farmer/src/farmer_cache.rs | 4 +--- crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs | 5 +++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 535da84707..a2ea1a6719 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -469,9 +469,7 @@ where debug!(%last_segment_index, "Identified last segment index"); - let limit = caches - .backends() - .fold(0usize, |acc, backend| acc + backend.total_capacity as usize); + let limit = caches.total_capacity(); worker_state.heap.clear(); // Change limit to number of pieces worker_state.heap.set_limit(limit); diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs index 0a8959ef5c..25f480b53e 100644 --- a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -31,6 +31,11 @@ where } } + pub(super) fn total_capacity(&self) -> usize { + self.backends() + .fold(0usize, |acc, backend| acc + backend.total_capacity as usize) + } + pub(super) fn pop_free_offset(&mut self) -> Option> { match self.dangling_free_offsets.pop_front() { Some(free_offset) => { From 4f744c92bb93768a95c74b3e2fd0c4464550ae37 Mon Sep 17 00:00:00 2001 From: tedio5 Date: Fri, 23 Aug 2024 10:03:28 +0800 Subject: [PATCH 5/6] chore: rename PieceCacheState::stored_pieces => stored_pieces_offests to avoid ambiguty --- crates/subspace-farmer/src/farmer_cache.rs | 2 +- crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index a2ea1a6719..508708a746 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -498,7 +498,7 @@ where caches.free_stored_piece_if(|key, _offset| piece_indices_to_store.remove(key).is_none()); if let Some(metrics) = &self.metrics { - for offset in caches.stored_pieces() { + for offset in caches.stored_pieces_offests() { piece_caches_capacity_used[usize::from(offset.cache_index)] += 1; } diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs index 25f480b53e..6cdac58a82 100644 --- a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -85,7 +85,7 @@ where self.stored_pieces.insert(key, cache_offset) } - pub(super) fn stored_pieces(&self) -> Values<'_, RecordKey, FarmerCacheOffset> { + pub(super) fn stored_pieces_offests(&self) -> Values<'_, RecordKey, FarmerCacheOffset> { self.stored_pieces.values() } From de70bc1e2f9d9613e834e35afd4370f7c4d58324 Mon Sep 17 00:00:00 2001 From: tedio5 Date: Fri, 23 Aug 2024 10:20:34 +0800 Subject: [PATCH 6/6] chore: replace PieceCacheState::free_stored_piece_if with free_unneeded_stored_pieces --- crates/subspace-farmer/src/farmer_cache.rs | 2 +- .../src/farmer_cache/piece_cache_state.rs | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 508708a746..53c0568d4b 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -495,7 +495,7 @@ where // Filter-out piece indices that are stored, but should not be as well as clean // `inserted_piece_indices` from already stored piece indices, leaving just those that are // still missing in cache - caches.free_stored_piece_if(|key, _offset| piece_indices_to_store.remove(key).is_none()); + caches.free_unneeded_stored_pieces(&mut piece_indices_to_store); if let Some(metrics) = &self.metrics { for offset in caches.stored_pieces_offests() { diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs index 6cdac58a82..2e2982a05b 100644 --- a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -3,6 +3,7 @@ use std::collections::hash_map::Values; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::hash::Hash; +use subspace_core_primitives::PieceIndex; use subspace_networking::libp2p::kad::RecordKey; use tracing::{debug, trace}; @@ -85,7 +86,9 @@ where self.stored_pieces.insert(key, cache_offset) } - pub(super) fn stored_pieces_offests(&self) -> Values<'_, RecordKey, FarmerCacheOffset> { + pub(super) fn stored_pieces_offests( + &self, + ) -> Values<'_, RecordKey, FarmerCacheOffset> { self.stored_pieces.values() } @@ -96,12 +99,12 @@ where self.stored_pieces.remove(key) } - pub(super) fn free_stored_piece_if(&mut self, pred: F) - where - F: FnMut(&RecordKey, &mut FarmerCacheOffset) -> bool, - { + pub(super) fn free_unneeded_stored_pieces( + &mut self, + piece_indices_to_store: &mut HashMap, + ) { self.stored_pieces - .extract_if(pred) + .extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none()) .for_each(|(_piece_index, offset)| { // There is no need to adjust the `last_stored_offset` of the `backend` here, // as the free_offset will be preferentially taken from the dangling free offsets