diff --git a/crates/subspace-farmer-components/src/reading.rs b/crates/subspace-farmer-components/src/reading.rs index 2320af71a6..0c211ec770 100644 --- a/crates/subspace-farmer-components/src/reading.rs +++ b/crates/subspace-farmer-components/src/reading.rs @@ -7,9 +7,10 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use parity_scale_codec::Decode; use rayon::prelude::*; -use std::io; use std::mem::ManuallyDrop; use std::simd::Simd; +use std::str::FromStr; +use std::{fmt, io}; use subspace_core_primitives::crypto::{blake3_hash, Scalar}; use subspace_core_primitives::{Piece, PieceOffset, Record, SBucket, SectorId}; use subspace_erasure_coding::ErasureCoding; @@ -102,6 +103,31 @@ pub enum ReadSectorRecordChunksMode { WholeSector, } +impl fmt::Display for ReadSectorRecordChunksMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::ConcurrentChunks => { + write!(f, "ConcurrentChunks") + } + Self::WholeSector => { + write!(f, "WholeSector") + } + } + } +} + +impl FromStr for ReadSectorRecordChunksMode { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "ConcurrentChunks" => Ok(Self::ConcurrentChunks), + "WholeSector" => Ok(Self::WholeSector), + s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")), + } + } +} + /// Read sector record chunks, only plotted s-buckets are returned (in decoded form). /// /// NOTE: This is an async function, but it also does CPU-intensive operation internally, while it diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index d5fc854343..5ef328f459 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -73,7 +73,9 @@ pub(crate) struct FarmingArgs { /// /// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that /// farmer will make sure to not exceed (and will pre-allocated all the space on startup to - /// ensure it will not run out of space in runtime). + /// ensure it will not run out of space in runtime). Also optionally `record-chunks-mode` can be + /// set to `ConcurrentChunks` or `WholeSector` in order to avoid internal benchmarking during + /// startup. disk_farms: Vec, /// WebSocket RPC URL of the Subspace node to connect to #[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")] @@ -265,6 +267,7 @@ where disk_farms = vec![DiskFarm { directory: tmp_directory.as_ref().to_path_buf(), allocated_plotting_space: plot_size.as_u64(), + read_sector_record_chunks_mode: None, }]; Some(tmp_directory) @@ -533,6 +536,8 @@ where plotting_delay: Some(plotting_delay_receiver), global_mutex, disable_farm_locking, + read_sector_record_chunks_mode: disk_farm + .read_sector_record_chunks_mode, faster_read_sector_record_chunks_mode_barrier, faster_read_sector_record_chunks_mode_concurrency, plotter, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs index 9a5b935ac2..05eaf548fb 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs @@ -7,6 +7,7 @@ use std::fmt; use std::path::PathBuf; use std::str::FromStr; use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary}; +use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_networking::libp2p::identity::{ed25519, Keypair}; use thread_priority::ThreadPriority; use zeroize::Zeroizing; @@ -61,6 +62,8 @@ pub(in super::super) struct DiskFarm { pub(in super::super) directory: PathBuf, /// How much space in bytes can farm use for plots (metadata space is not included) pub(in super::super) allocated_plotting_space: u64, + /// Which mode to use for reading of sector record chunks + pub(in super::super) read_sector_record_chunks_mode: Option, } impl FromStr for DiskFarm { @@ -68,12 +71,13 @@ impl FromStr for DiskFarm { fn from_str(s: &str) -> anyhow::Result { let parts = s.split(',').collect::>(); - if parts.len() != 2 { - return Err("Must contain 2 coma-separated components".to_string()); + if parts.len() < 2 { + return Err("Must contain 2 or more coma-separated components".to_string()); } let mut plot_directory = None; let mut allocated_plotting_space = None; + let mut read_sector_record_chunks_mode = None; for part in parts { let part = part.splitn(2, '=').collect::>(); @@ -98,9 +102,19 @@ impl FromStr for DiskFarm { .as_u64(), ); } + "record-chunks-mode" => { + read_sector_record_chunks_mode.replace( + value + .parse::() + .map_err(|error| { + format!("Failed to parse `record-chunks-mode` \"{value}\": {error}") + })?, + ); + } key => { return Err(format!( - "Key \"{key}\" is not supported, only `path` or `size`" + "Key \"{key}\" is not supported, only `path`, `size` or \ + `record-chunks-mode` are allowed" )); } } @@ -113,6 +127,7 @@ impl FromStr for DiskFarm { allocated_plotting_space: allocated_plotting_space.ok_or({ "`size` key is required with path to directory where plots will be stored" })?, + read_sector_record_chunks_mode, }) } } diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 2e4c8c2eb8..7ce997069a 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -290,6 +290,9 @@ pub struct SingleDiskFarmOptions { pub global_mutex: Arc>, /// Disable farm locking, for example if file system doesn't support it pub disable_farm_locking: bool, + /// Explicit mode to use for reading of sector record chunks instead of doing internal + /// benchmarking + pub read_sector_record_chunks_mode: Option, /// Barrier before internal benchmarking between different farms pub faster_read_sector_record_chunks_mode_barrier: Arc, /// Limit concurrency of internal benchmarking between different farms @@ -710,6 +713,7 @@ impl SingleDiskFarm { farming_thread_pool_size, plotting_delay, global_mutex, + read_sector_record_chunks_mode, faster_read_sector_record_chunks_mode_barrier, faster_read_sector_record_chunks_mode_concurrency, .. @@ -766,32 +770,35 @@ impl SingleDiskFarm { faster_read_sector_record_chunks_mode_barrier.wait().await; - let (read_sector_record_chunks_mode, farming_plot, farming_thread_pool) = { - // Error doesn't matter here - let _permit = faster_read_sector_record_chunks_mode_concurrency - .acquire() - .await; - let span = span.clone(); - let plot_file = Arc::clone(&plot_file); - - let read_sector_record_chunks_mode_fut = tokio::task::spawn_blocking(move || { - farming_thread_pool - .install(move || { - let _span_guard = span.enter(); - - faster_read_sector_record_chunks_mode( - &*plot_file, - &farming_plot, - sector_size, - metadata_header.plotted_sector_count, - ) - .map(|mode| (mode, farming_plot)) - }) - .map(|(mode, farming_plot)| (mode, farming_plot, farming_thread_pool)) - }); + let (read_sector_record_chunks_mode, farming_plot, farming_thread_pool) = + if let Some(mode) = read_sector_record_chunks_mode { + (mode, farming_plot, farming_thread_pool) + } else { + // Error doesn't matter here + let _permit = faster_read_sector_record_chunks_mode_concurrency + .acquire() + .await; + let span = span.clone(); + let plot_file = Arc::clone(&plot_file); + + let read_sector_record_chunks_mode_fut = tokio::task::spawn_blocking(move || { + farming_thread_pool + .install(move || { + let _span_guard = span.enter(); + + faster_read_sector_record_chunks_mode( + &*plot_file, + &farming_plot, + sector_size, + metadata_header.plotted_sector_count, + ) + .map(|mode| (mode, farming_plot)) + }) + .map(|(mode, farming_plot)| (mode, farming_plot, farming_thread_pool)) + }); - AsyncJoinOnDrop::new(read_sector_record_chunks_mode_fut, false).await?? - }; + AsyncJoinOnDrop::new(read_sector_record_chunks_mode_fut, false).await?? + }; faster_read_sector_record_chunks_mode_barrier.wait().await;