Skip to content

Commit

Permalink
Merge pull request #2167 from subspace/fix-plotting-stack-overflow
Browse files Browse the repository at this point in the history
Make plotting/replotting thread pools distinct for farms to avoid stack overflow, still protected by semaphores
  • Loading branch information
nazar-pc authored Oct 27, 2023
2 parents fa9bf51 + 0b71ab8 commit e864961
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 39 deletions.
42 changes: 16 additions & 26 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::Mutex;
use rayon::ThreadPoolBuilder;
use std::fs;
use std::net::SocketAddr;
use std::num::{NonZeroU8, NonZeroUsize};
Expand All @@ -29,9 +28,7 @@ use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::ss58::parse_ss58_reward_address;
use subspace_farmer::utils::{
run_future_in_dedicated_thread, tokio_rayon_spawn_handler, AsyncJoinOnDrop,
};
use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
Expand Down Expand Up @@ -134,13 +131,21 @@ pub(crate) struct FarmingArgs {
/// the system
#[arg(long, default_value_t = available_parallelism())]
farming_thread_pool_size: usize,
/// Size of thread pool used for plotting, defaults to number of CPU cores available in the
/// system. This thread pool is global for all farms and generally doesn't need to be changed.
/// Size of PER FARM thread pool used for plotting, defaults to number of CPU cores available
/// in the system.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer will plot multiple
/// sectors concurrently, see `--sector-downloading-concurrency` and
/// `--sector-encoding-concurrency` options.
#[arg(long, default_value_t = available_parallelism())]
plotting_thread_pool_size: usize,
/// Size of thread pool used for replotting, typically smaller pool than for plotting to not
/// affect farming as much, defaults to half of the number of CPU cores available in the system.
/// This thread pool is global for all farms and generally doesn't need to be changed.
/// Size of PER FARM thread pool used for replotting, typically smaller pool than for plotting
/// to not affect farming as much, defaults to half of the number of CPU cores available in the
/// system.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer will replot multiple
/// sectors concurrently, see `--sector-downloading-concurrency` and
/// `--sector-encoding-concurrency` options.
#[arg(long, default_value_t = available_parallelism() / 2)]
replotting_thread_pool_size: usize,
}
Expand Down Expand Up @@ -423,21 +428,6 @@ where
None => farmer_app_info.protocol_info.max_pieces_in_sector,
};

let plotting_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("plotting#{thread_index}"))
.num_threads(plotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);
let replotting_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("replotting#{thread_index}"))
.num_threads(replotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);

let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get()));
let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));

Expand Down Expand Up @@ -467,8 +457,8 @@ where
encoding_semaphore: Arc::clone(&encoding_semaphore),
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool: Arc::clone(&plotting_thread_pool),
replotting_thread_pool: Arc::clone(&replotting_thread_pool),
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay: Some(plotting_delay_receiver),
},
disk_farm_index,
Expand Down
60 changes: 53 additions & 7 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::{select, FutureExt, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use rayon::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuilder};
use rayon::ThreadPoolBuilder;
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::fs::{File, OpenOptions};
Expand Down Expand Up @@ -268,11 +268,11 @@ pub struct SingleDiskFarmOptions<NC, PG> {
/// Thread pool size used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving)
pub farming_thread_pool_size: usize,
/// Thread pool used for plotting
pub plotting_thread_pool: Arc<ThreadPool>,
/// Thread pool used for replotting, typically smaller pool than for plotting to not affect
/// Thread pool size used for plotting
pub plotting_thread_pool_size: usize,
/// Thread pool size used for replotting, typically smaller pool than for plotting to not affect
/// farming as much
pub replotting_thread_pool: Arc<ThreadPool>,
pub replotting_thread_pool_size: usize,
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
Expand Down Expand Up @@ -592,8 +592,8 @@ impl SingleDiskFarm {
downloading_semaphore,
encoding_semaphore,
farming_thread_pool_size,
plotting_thread_pool,
replotting_thread_pool,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay,
farm_during_initial_plotting,
} = options;
Expand Down Expand Up @@ -877,6 +877,50 @@ impl SingleDiskFarm {

move || {
let _span_guard = span.enter();
let plotting_thread_pool = match ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("plotting-{disk_farm_index}.{thread_index}")
})
.num_threads(plotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()
.map_err(PlottingError::FailedToCreateThreadPool)
{
Ok(thread_pool) => thread_pool,
Err(error) => {
if let Some(error_sender) = error_sender.lock().take() {
if let Err(error) = error_sender.send(error.into()) {
error!(
%error,
"Plotting failed to send error to background task",
);
}
}
return;
}
};
let replotting_thread_pool = match ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("replotting-{disk_farm_index}.{thread_index}")
})
.num_threads(replotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()
.map_err(PlottingError::FailedToCreateThreadPool)
{
Ok(thread_pool) => thread_pool,
Err(error) => {
if let Some(error_sender) = error_sender.lock().take() {
if let Err(error) = error_sender.send(error.into()) {
error!(
%error,
"Plotting failed to send error to background task",
);
}
}
return;
}
};

let plotting_fut = async move {
if start_receiver.recv().await.is_err() {
Expand Down Expand Up @@ -984,6 +1028,7 @@ impl SingleDiskFarm {
let span = span.clone();

move || {
let _span_guard = span.enter();
let thread_pool = match ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("farming-{disk_farm_index}.{thread_index}")
Expand All @@ -1008,6 +1053,7 @@ impl SingleDiskFarm {
};

let handle = Handle::current();
let span = span.clone();
thread_pool.install(move || {
let _span_guard = span.enter();

Expand Down
15 changes: 9 additions & 6 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::channel::{mpsc, oneshot};
use futures::{select, FutureExt, SinkExt, StreamExt};
use lru::LruCache;
use parity_scale_codec::Encode;
use rayon::ThreadPool;
use rayon::{ThreadPool, ThreadPoolBuildError};
use std::collections::HashMap;
use std::fs::File;
use std::io;
Expand Down Expand Up @@ -81,12 +81,15 @@ pub enum PlottingError {
/// Farm is shutting down
#[error("Farm is shutting down")]
FarmIsShuttingDown,
/// I/O error occurred
#[error("I/O error: {0}")]
Io(#[from] io::Error),
/// Low-level plotting error
#[error("Low-level plotting error: {0}")]
LowLevel(#[from] plotting::PlottingError),
/// I/O error occurred
#[error("I/O error: {0}")]
Io(#[from] io::Error),
/// Failed to create thread pool
#[error("Failed to create thread pool: {0}")]
FailedToCreateThreadPool(#[from] ThreadPoolBuildError),
}

pub(super) struct PlottingOptions<NC, PG> {
Expand All @@ -111,8 +114,8 @@ pub(super) struct PlottingOptions<NC, PG> {
/// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
/// allow one permit at a time for efficient CPU utilization
pub(crate) encoding_semaphore: Arc<Semaphore>,
pub(super) plotting_thread_pool: Arc<ThreadPool>,
pub(super) replotting_thread_pool: Arc<ThreadPool>,
pub(super) plotting_thread_pool: ThreadPool,
pub(super) replotting_thread_pool: ThreadPool,
pub(super) stop_receiver: broadcast::Receiver<()>,
}

Expand Down

0 comments on commit e864961

Please sign in to comment.