diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 0fb7ed4cbf..b6e10f0507 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -3,7 +3,7 @@ use crate::commands::shared::network::{configure_network, NetworkArgs}; use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority}; use crate::utils::shutdown_signal; use anyhow::anyhow; -use async_lock::Mutex as AsyncMutex; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::{Parser, ValueHint}; @@ -287,7 +287,7 @@ where None }; - let plotted_pieces = Arc::new(Mutex::new(None)); + let plotted_pieces = Arc::new(AsyncRwLock::new(PlottedPieces::default())); info!(url = %node_rpc_url, "Connecting to node RPC"); let node_client = NodeRpcClient::new(&node_rpc_url).await?; @@ -643,17 +643,11 @@ where .await; drop(farmer_cache); - // Store piece readers so we can reference them later - let piece_readers = farms - .iter() - .map(|farm| farm.piece_reader()) - .collect::>(); - info!("Collecting already plotted pieces (this will take some time)..."); // Collect already plotted pieces { - let mut future_plotted_pieces = PlottedPieces::new(piece_readers); + let mut plotted_pieces = plotted_pieces.write().await; for (farm_index, farm) in farms.iter().enumerate() { let farm_index = farm_index.try_into().map_err(|_error| { @@ -663,13 +657,15 @@ where ) })?; + plotted_pieces.add_farm(farm_index, farm.piece_reader()); + for (sector_index, mut plotted_sectors) in (0 as SectorIndex..).zip(farm.plotted_sectors().await) { while let Some(plotted_sector_result) = plotted_sectors.next().await { match plotted_sector_result { Ok(plotted_sector) => { - future_plotted_pieces.add_sector(farm_index, &plotted_sector); + plotted_pieces.add_sector(farm_index, &plotted_sector); } Err(error) => { error!( @@ -683,8 +679,6 @@ where } } } - - plotted_pieces.lock().replace(future_plotted_pieces); } info!("Finished collecting already plotted pieces successfully"); @@ -721,10 +715,7 @@ where let _span_guard = span.enter(); { - let mut plotted_pieces = plotted_pieces.lock(); - let plotted_pieces = plotted_pieces - .as_mut() - .expect("Initial value was populated above; qed"); + let mut plotted_pieces = plotted_pieces.write_blocking(); if let Some(old_plotted_sector) = &maybe_old_plotted_sector { plotted_pieces.delete_sector(farm_index, old_plotted_sector); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 12f710ad56..9210d12656 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -1,7 +1,9 @@ +use async_lock::RwLock as AsyncRwLock; use clap::Parser; -use parking_lot::Mutex; use prometheus_client::registry::Registry; use std::collections::HashSet; +use std::fmt; +use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::path::Path; use std::sync::{Arc, Weak}; @@ -68,7 +70,7 @@ pub(in super::super) struct NetworkArgs { } #[allow(clippy::too_many_arguments)] -pub(in super::super) fn configure_network( +pub(in super::super) fn configure_network( protocol_prefix: String, base_path: &Path, keypair: Keypair, @@ -83,11 +85,15 @@ pub(in super::super) fn configure_network( pending_out_connections, external_addresses, }: NetworkArgs, - weak_plotted_pieces: Weak>>, + weak_plotted_pieces: Weak>>, node_client: NodeRpcClient, farmer_cache: FarmerCache, prometheus_metrics_registry: Option<&mut Registry>, -) -> Result<(Node, NodeRunner), anyhow::Error> { +) -> Result<(Node, NodeRunner), anyhow::Error> +where + FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, + usize: From, +{ let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig { path: Some(base_path.join("known_addresses.bin").into_boxed_path()), ignore_peer_list: strip_peer_id(bootstrap_nodes.clone()) @@ -129,27 +135,16 @@ pub(in super::super) fn configure_network( "No piece in the cache. Trying archival storage..." ); - let read_piece_fut = { - let plotted_pieces = match weak_plotted_pieces.upgrade() { - Some(plotted_pieces) => plotted_pieces, - None => { - debug!("A readers and pieces are already dropped"); - return None; - } - }; - let plotted_pieces = plotted_pieces.lock(); - let plotted_pieces = match plotted_pieces.as_ref() { - Some(plotted_pieces) => plotted_pieces, - None => { - debug!( - ?piece_index, - "Readers and pieces are not initialized yet" - ); - return None; - } - }; - - plotted_pieces.read_piece(piece_index)?.in_current_span() + let read_piece_fut = match weak_plotted_pieces.upgrade() { + Some(plotted_pieces) => plotted_pieces + .read() + .await + .read_piece(piece_index)? + .in_current_span(), + None => { + debug!("A readers and pieces are already dropped"); + return None; + } }; let piece = read_piece_fut.await; diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 4869d5765f..deaa0529ac 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -1,7 +1,7 @@ use crate::farmer_cache::FarmerCache; use crate::utils::plotted_pieces::PlottedPieces; use crate::NodeClient; -use async_lock::Mutex as AsyncMutex; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; use backoff::backoff::Backoff; use backoff::future::retry; @@ -10,6 +10,7 @@ use parking_lot::Mutex; use std::collections::HashMap; use std::error::Error; use std::fmt; +use std::hash::Hash; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; use subspace_core_primitives::{Piece, PieceIndex}; @@ -29,27 +30,27 @@ pub struct DsnCacheRetryPolicy { pub backoff: ExponentialBackoff, } -struct Inner { +struct Inner { piece_provider: PieceProvider, farmer_cache: FarmerCache, node_client: NC, - plotted_pieces: Arc>>, + plotted_pieces: Arc>>, dsn_cache_retry_policy: DsnCacheRetryPolicy, in_progress_pieces: Mutex>>>>, } -pub struct FarmerPieceGetter { - inner: Arc>, +pub struct FarmerPieceGetter { + inner: Arc>, } -impl fmt::Debug for FarmerPieceGetter { +impl fmt::Debug for FarmerPieceGetter { #[inline] fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("FarmerPieceGetter").finish_non_exhaustive() } } -impl Clone for FarmerPieceGetter { +impl Clone for FarmerPieceGetter { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), @@ -57,8 +58,10 @@ impl Clone for FarmerPieceGetter { } } -impl FarmerPieceGetter +impl FarmerPieceGetter where + FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, + usize: From, PV: PieceValidator + Send + 'static, NC: NodeClient, { @@ -66,7 +69,7 @@ where piece_provider: PieceProvider, farmer_cache: FarmerCache, node_client: NC, - plotted_pieces: Arc>>, + plotted_pieces: Arc>>, dsn_cache_retry_policy: DsnCacheRetryPolicy, ) -> Self { Self { @@ -209,11 +212,7 @@ where let inner = &self.inner; trace!(%piece_index, "Getting piece from local plot"); - let maybe_read_piece_fut = inner - .plotted_pieces - .lock() - .as_ref() - .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index)); + let maybe_read_piece_fut = inner.plotted_pieces.read().await.read_piece(piece_index); if let Some(read_piece_fut) = maybe_read_piece_fut { if let Some(piece) = read_piece_fut.await { @@ -248,7 +247,7 @@ where /// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally /// used [`Arc`] - pub fn downgrade(&self) -> WeakFarmerPieceGetter { + pub fn downgrade(&self) -> WeakFarmerPieceGetter { WeakFarmerPieceGetter { inner: Arc::downgrade(&self.inner), } @@ -256,8 +255,10 @@ where } #[async_trait] -impl PieceGetter for FarmerPieceGetter +impl PieceGetter for FarmerPieceGetter where + FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, + usize: From, PV: PieceValidator + Send + 'static, NC: NodeClient, { @@ -351,11 +352,11 @@ where } /// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`] -pub struct WeakFarmerPieceGetter { - inner: Weak>, +pub struct WeakFarmerPieceGetter { + inner: Weak>, } -impl fmt::Debug for WeakFarmerPieceGetter { +impl fmt::Debug for WeakFarmerPieceGetter { #[inline] fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("WeakFarmerPieceGetter") @@ -363,7 +364,7 @@ impl fmt::Debug for WeakFarmerPieceGetter { } } -impl Clone for WeakFarmerPieceGetter { +impl Clone for WeakFarmerPieceGetter { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -372,8 +373,10 @@ impl Clone for WeakFarmerPieceGetter { } #[async_trait] -impl PieceGetter for WeakFarmerPieceGetter +impl PieceGetter for WeakFarmerPieceGetter where + FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, + usize: From, PV: PieceValidator + Send + 'static, NC: NodeClient, { @@ -390,9 +393,9 @@ where } } -impl WeakFarmerPieceGetter { +impl WeakFarmerPieceGetter { /// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive - pub fn upgrade(&self) -> Option> { + pub fn upgrade(&self) -> Option> { Some(FarmerPieceGetter { inner: self.inner.upgrade()?, }) diff --git a/crates/subspace-farmer/src/utils/plotted_pieces.rs b/crates/subspace-farmer/src/utils/plotted_pieces.rs index c88fab264c..3bc23d7b45 100644 --- a/crates/subspace-farmer/src/utils/plotted_pieces.rs +++ b/crates/subspace-farmer/src/utils/plotted_pieces.rs @@ -1,36 +1,50 @@ -use crate::farm::PieceReader; +use crate::farm::{FarmError, PieceReader}; +use async_trait::async_trait; use rand::prelude::*; +use rayon::prelude::*; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::fmt; use std::future::Future; +use std::hash::Hash; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex}; use subspace_farmer_components::plotting::PlottedSector; use tracing::{trace, warn}; +#[derive(Debug)] +struct DummyReader; + +#[async_trait] +impl PieceReader for DummyReader { + async fn read_piece( + &self, + _sector_index: SectorIndex, + _piece_offset: PieceOffset, + ) -> Result, FarmError> { + Ok(None) + } +} + #[derive(Debug, Copy, Clone, Eq, PartialEq)] -struct PieceDetails { - farm_index: u8, +struct PieceDetails { + farm_index: FarmIndex, sector_index: SectorIndex, piece_offset: PieceOffset, } /// Wrapper data structure for pieces plotted under multiple plots. -#[derive(Debug)] -pub struct PlottedPieces { +#[derive(Debug, Default)] +pub struct PlottedPieces { readers: Vec>, - pieces: HashMap>, + pieces: HashMap>>, } -impl PlottedPieces { - /// Initialize with readers for each farm - pub fn new(readers: Vec>) -> Self { - Self { - readers, - pieces: HashMap::new(), - } - } - +impl PlottedPieces +where + FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, + usize: From, +{ /// Check if piece is known and can be retrieved pub fn contains_piece(&self, piece_index: &PieceIndex) -> bool { self.pieces.contains_key(piece_index) @@ -60,7 +74,11 @@ impl PlottedPieces { let reader = match self.readers.get(usize::from(piece_details.farm_index)) { Some(reader) => reader.clone(), None => { - warn!(?piece_index, ?piece_details, "Plot offset is invalid"); + warn!( + ?piece_index, + ?piece_details, + "No piece reader for associated farm index" + ); return None; } }; @@ -73,7 +91,7 @@ impl PlottedPieces { warn!( %error, %piece_index, - farm_index = piece_details.farm_index, + farm_index = ?piece_details.farm_index, sector_index = piece_details.sector_index, "Failed to retrieve piece" ); @@ -83,7 +101,7 @@ impl PlottedPieces { } /// Add new sector to collect plotted pieces - pub fn add_sector(&mut self, farm_index: u8, plotted_sector: &PlottedSector) { + pub fn add_sector(&mut self, farm_index: FarmIndex, plotted_sector: &PlottedSector) { for (piece_offset, &piece_index) in (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter()) { @@ -105,7 +123,7 @@ impl PlottedPieces { } /// Add old sector from plotted pieces (happens on replotting) - pub fn delete_sector(&mut self, farm_index: u8, plotted_sector: &PlottedSector) { + pub fn delete_sector(&mut self, farm_index: FarmIndex, plotted_sector: &PlottedSector) { for (piece_offset, &piece_index) in (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter()) { @@ -136,6 +154,41 @@ impl PlottedPieces { } } + /// Add new farm with corresponding piece reader + pub fn add_farm(&mut self, farm_index: FarmIndex, piece_reader: Arc) { + let farm_index = usize::from(farm_index); + + if self.readers.len() <= farm_index { + self.readers.resize(farm_index, Arc::new(DummyReader)); + self.readers.push(piece_reader); + } else { + self.readers[farm_index] = piece_reader; + } + } + + /// Add all sectors of the farm + pub fn delete_farm(&mut self, farm_index: FarmIndex) { + if let Some(reader) = self.readers.get_mut(usize::from(farm_index)) { + // Replace reader with a dummy one to maintain farm order + *reader = Arc::new(DummyReader); + + let piece_indices_to_remove = self + .pieces + .par_iter_mut() + .filter_map(|(&piece_index, piece_details)| { + piece_details.retain(|piece_details| piece_details.farm_index != farm_index); + + piece_details.is_empty().then_some(piece_index) + }) + .collect::>(); + + // Remove pieces for which this was the only farm storing them + for piece_index in piece_indices_to_remove { + self.pieces.remove(&piece_index); + } + } + } + /// Iterator over all unique piece indices plotted pub fn piece_indices(&self) -> impl Iterator { self.pieces.keys()