diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 00dbd00910..2916023b60 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -11,7 +11,6 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use lru::LruCache; use parking_lot::Mutex; -use rayon::ThreadPoolBuilder; use std::fs; use std::net::SocketAddr; use std::num::{NonZeroU8, NonZeroUsize}; @@ -29,9 +28,7 @@ use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; -use subspace_farmer::utils::{ - run_future_in_dedicated_thread, tokio_rayon_spawn_handler, AsyncJoinOnDrop, -}; +use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop}; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; @@ -134,13 +131,21 @@ pub(crate) struct FarmingArgs { /// the system #[arg(long, default_value_t = available_parallelism())] farming_thread_pool_size: usize, - /// Size of thread pool used for plotting, defaults to number of CPU cores available in the - /// system. This thread pool is global for all farms and generally doesn't need to be changed. + /// Size of PER FARM thread pool used for plotting, defaults to number of CPU cores available + /// in the system. + /// + /// NOTE: The fact that this parameter is per farm doesn't mean farmer will plot multiple + /// sectors concurrently, see `--sector-downloading-concurrency` and + /// `--sector-encoding-concurrency` options. #[arg(long, default_value_t = available_parallelism())] plotting_thread_pool_size: usize, - /// Size of thread pool used for replotting, typically smaller pool than for plotting to not - /// affect farming as much, defaults to half of the number of CPU cores available in the system. - /// This thread pool is global for all farms and generally doesn't need to be changed. + /// Size of PER FARM thread pool used for replotting, typically smaller pool than for plotting + /// to not affect farming as much, defaults to half of the number of CPU cores available in the + /// system. + /// + /// NOTE: The fact that this parameter is per farm doesn't mean farmer will replot multiple + /// sectors concurrently, see `--sector-downloading-concurrency` and + /// `--sector-encoding-concurrency` options. #[arg(long, default_value_t = available_parallelism() / 2)] replotting_thread_pool_size: usize, } @@ -423,21 +428,6 @@ where None => farmer_app_info.protocol_info.max_pieces_in_sector, }; - let plotting_thread_pool = Arc::new( - ThreadPoolBuilder::new() - .thread_name(move |thread_index| format!("plotting#{thread_index}")) - .num_threads(plotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build()?, - ); - let replotting_thread_pool = Arc::new( - ThreadPoolBuilder::new() - .thread_name(move |thread_index| format!("replotting#{thread_index}")) - .num_threads(replotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build()?, - ); - let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get())); let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get())); @@ -467,8 +457,8 @@ where encoding_semaphore: Arc::clone(&encoding_semaphore), farm_during_initial_plotting, farming_thread_pool_size, - plotting_thread_pool: Arc::clone(&plotting_thread_pool), - replotting_thread_pool: Arc::clone(&replotting_thread_pool), + plotting_thread_pool_size, + replotting_thread_pool_size, plotting_delay: Some(plotting_delay_receiver), }, disk_farm_index, diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index b6699666ef..9f033f4f15 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -26,7 +26,7 @@ use futures::{select, FutureExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use rayon::prelude::*; -use rayon::{ThreadPool, ThreadPoolBuilder}; +use rayon::ThreadPoolBuilder; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; use std::fs::{File, OpenOptions}; @@ -268,11 +268,11 @@ pub struct SingleDiskFarmOptions { /// Thread pool size used for farming (mostly for blocking I/O, but also for some /// compute-intensive operations during proving) pub farming_thread_pool_size: usize, - /// Thread pool used for plotting - pub plotting_thread_pool: Arc, - /// Thread pool used for replotting, typically smaller pool than for plotting to not affect + /// Thread pool size used for plotting + pub plotting_thread_pool_size: usize, + /// Thread pool size used for replotting, typically smaller pool than for plotting to not affect /// farming as much - pub replotting_thread_pool: Arc, + pub replotting_thread_pool_size: usize, /// Notification for plotter to start, can be used to delay plotting until some initialization /// has happened externally pub plotting_delay: Option>, @@ -592,8 +592,8 @@ impl SingleDiskFarm { downloading_semaphore, encoding_semaphore, farming_thread_pool_size, - plotting_thread_pool, - replotting_thread_pool, + plotting_thread_pool_size, + replotting_thread_pool_size, plotting_delay, farm_during_initial_plotting, } = options; @@ -877,6 +877,50 @@ impl SingleDiskFarm { move || { let _span_guard = span.enter(); + let plotting_thread_pool = match ThreadPoolBuilder::new() + .thread_name(move |thread_index| { + format!("plotting-{disk_farm_index}.{thread_index}") + }) + .num_threads(plotting_thread_pool_size) + .spawn_handler(tokio_rayon_spawn_handler()) + .build() + .map_err(PlottingError::FailedToCreateThreadPool) + { + Ok(thread_pool) => thread_pool, + Err(error) => { + if let Some(error_sender) = error_sender.lock().take() { + if let Err(error) = error_sender.send(error.into()) { + error!( + %error, + "Plotting failed to send error to background task", + ); + } + } + return; + } + }; + let replotting_thread_pool = match ThreadPoolBuilder::new() + .thread_name(move |thread_index| { + format!("replotting-{disk_farm_index}.{thread_index}") + }) + .num_threads(replotting_thread_pool_size) + .spawn_handler(tokio_rayon_spawn_handler()) + .build() + .map_err(PlottingError::FailedToCreateThreadPool) + { + Ok(thread_pool) => thread_pool, + Err(error) => { + if let Some(error_sender) = error_sender.lock().take() { + if let Err(error) = error_sender.send(error.into()) { + error!( + %error, + "Plotting failed to send error to background task", + ); + } + } + return; + } + }; let plotting_fut = async move { if start_receiver.recv().await.is_err() { @@ -984,6 +1028,7 @@ impl SingleDiskFarm { let span = span.clone(); move || { + let _span_guard = span.enter(); let thread_pool = match ThreadPoolBuilder::new() .thread_name(move |thread_index| { format!("farming-{disk_farm_index}.{thread_index}") @@ -1008,6 +1053,7 @@ impl SingleDiskFarm { }; let handle = Handle::current(); + let span = span.clone(); thread_pool.install(move || { let _span_guard = span.enter(); diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index a23d1a4199..3b6146f6a4 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -9,7 +9,7 @@ use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, SinkExt, StreamExt}; use lru::LruCache; use parity_scale_codec::Encode; -use rayon::ThreadPool; +use rayon::{ThreadPool, ThreadPoolBuildError}; use std::collections::HashMap; use std::fs::File; use std::io; @@ -81,12 +81,15 @@ pub enum PlottingError { /// Farm is shutting down #[error("Farm is shutting down")] FarmIsShuttingDown, - /// I/O error occurred - #[error("I/O error: {0}")] - Io(#[from] io::Error), /// Low-level plotting error #[error("Low-level plotting error: {0}")] LowLevel(#[from] plotting::PlottingError), + /// I/O error occurred + #[error("I/O error: {0}")] + Io(#[from] io::Error), + /// Failed to create thread pool + #[error("Failed to create thread pool: {0}")] + FailedToCreateThreadPool(#[from] ThreadPoolBuildError), } pub(super) struct PlottingOptions { @@ -111,8 +114,8 @@ pub(super) struct PlottingOptions { /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically /// allow one permit at a time for efficient CPU utilization pub(crate) encoding_semaphore: Arc, - pub(super) plotting_thread_pool: Arc, - pub(super) replotting_thread_pool: Arc, + pub(super) plotting_thread_pool: ThreadPool, + pub(super) replotting_thread_pool: ThreadPool, pub(super) stop_receiver: broadcast::Receiver<()>, }