Skip to content

Commit

Permalink
Merge pull request #2448 from subspace/exact-plotting-cpu-cores
Browse files Browse the repository at this point in the history
Allow specifying exact plotting/replotting CPU cores
  • Loading branch information
nazar-pc authored Jan 24, 2024
2 parents cbd11c2 + 34dfbec commit ecfb6bc
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 15 deletions.
72 changes: 58 additions & 14 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::ss58::parse_ss58_reward_address;
use subspace_farmer::utils::{
all_cpu_cores, create_plotting_thread_pool_manager, run_future_in_dedicated_thread,
thread_pool_core_indices, AsyncJoinOnDrop,
all_cpu_cores, create_plotting_thread_pool_manager, parse_cpu_cores_sets,
run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop,
};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
Expand Down Expand Up @@ -138,6 +138,17 @@ pub(crate) struct FarmingArgs {
/// Threads will be pinned to corresponding CPU cores at creation.
#[arg(long)]
plotting_thread_pool_size: Option<NonZeroUsize>,
/// Specify exact CPU cores to be used for plotting bypassing any custom logic farmer might use
/// otherwise. It replaces both `--sector-encoding-concurrency` and
/// `--plotting-thread-pool-size` options if specified. Requires `--replotting-cpu-cores` to be
/// specified with the same number of CPU cores groups (or not specified at all, in which case
/// it'll use the same thread pool as plotting).
///
/// Cores are coma-separated, with whitespace separating different thread pools/encoding
/// instances. For example "0,1 2,3" will result in two sectors being encoded at the same time,
/// each with a pair of CPU cores.
#[arg(long, conflicts_with_all = &["sector_encoding_concurrency", "plotting_thread_pool_size"])]
plotting_cpu_cores: Option<String>,
/// Size of one thread pool used for replotting, typically smaller pool than for plotting
/// to not affect farming as much, defaults to half of the number of logical CPUs available on
/// UMA system and number of logical CPUs available in NUMA node on NUMA system.
Expand All @@ -148,6 +159,15 @@ pub(crate) struct FarmingArgs {
/// Threads will be pinned to corresponding CPU cores at creation.
#[arg(long)]
replotting_thread_pool_size: Option<NonZeroUsize>,
/// Specify exact CPU cores to be used for replotting bypassing any custom logic farmer might
/// use otherwise. It replaces `--replotting-thread_pool_size` options if specified. Requires
/// `--plotting-cpu-cores` to be specified with the same number of CPU cores groups.
///
/// Cores are coma-separated, with whitespace separating different thread pools/encoding
/// instances. For example "0,1 2,3" will result in two sectors being encoded at the same time,
/// each with a pair of CPU cores.
#[arg(long, conflicts_with_all = &["sector_encoding_concurrency", "replotting_thread_pool_size"])]
replotting_cpu_cores: Option<String>,
}

fn cache_percentage_parser(s: &str) -> anyhow::Result<NonZeroU8> {
Expand Down Expand Up @@ -286,7 +306,9 @@ where
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool_size,
plotting_cpu_cores,
replotting_thread_pool_size,
replotting_cpu_cores,
} = farming_args;

// Override flags with `--dev`
Expand Down Expand Up @@ -431,19 +453,41 @@ where
None => farmer_app_info.protocol_info.max_pieces_in_sector,
};

let plotting_thread_pool_core_indices =
thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency);
let replotting_thread_pool_core_indices = {
let mut replotting_thread_pool_core_indices =
thread_pool_core_indices(replotting_thread_pool_size, sector_encoding_concurrency);
if replotting_thread_pool_size.is_none() {
// The default behavior is to use all CPU cores, but for replotting we just want half
replotting_thread_pool_core_indices
.iter_mut()
.for_each(|set| set.truncate(set.cpu_cores().len() / 2));
let plotting_thread_pool_core_indices;
let replotting_thread_pool_core_indices;
if let Some(plotting_cpu_cores) = plotting_cpu_cores {
plotting_thread_pool_core_indices = parse_cpu_cores_sets(&plotting_cpu_cores)
.map_err(|error| anyhow::anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?;
replotting_thread_pool_core_indices = match replotting_cpu_cores {
Some(replotting_cpu_cores) => {
parse_cpu_cores_sets(&replotting_cpu_cores).map_err(|error| {
anyhow::anyhow!("Failed to parse `--replotting-cpu-cores`: {error}")
})?
}
None => plotting_thread_pool_core_indices.clone(),
};
if plotting_thread_pool_core_indices.len() != replotting_thread_pool_core_indices.len() {
return Err(anyhow::anyhow!(
"Number of plotting thread pools ({}) is not the same as for replotting ({})",
plotting_thread_pool_core_indices.len(),
replotting_thread_pool_core_indices.len()
));
}
replotting_thread_pool_core_indices
};
} else {
plotting_thread_pool_core_indices =
thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency);
replotting_thread_pool_core_indices = {
let mut replotting_thread_pool_core_indices =
thread_pool_core_indices(replotting_thread_pool_size, sector_encoding_concurrency);
if replotting_thread_pool_size.is_none() {
// The default behavior is to use all CPU cores, but for replotting we just want half
replotting_thread_pool_core_indices
.iter_mut()
.for_each(|set| set.truncate(set.cpu_cores().len() / 2));
}
replotting_thread_pool_core_indices
};
}

let downloading_semaphore = Arc::new(Semaphore::new(
sector_downloading_concurrency
Expand Down
22 changes: 21 additions & 1 deletion crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use futures::channel::oneshot::Canceled;
use futures::future::Either;
use rayon::{ThreadBuilder, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder};
use std::future::Future;
use std::num::NonZeroUsize;
use std::num::{NonZeroUsize, ParseIntError};
use std::ops::Deref;
use std::pin::{pin, Pin};
use std::str::FromStr;
use std::task::{Context, Poll};
use std::{io, thread};
use tokio::runtime::Handle;
Expand Down Expand Up @@ -223,6 +224,25 @@ pub fn all_cpu_cores() -> Vec<CpuCoreSet> {
}]
}

/// Parse space-separated set of groups of CPU cores (individual cores are coma-separated) into
/// vector of CPU core sets that can be used for creation of plotting/replotting thread pools.
pub fn parse_cpu_cores_sets(s: &str) -> Result<Vec<CpuCoreSet>, ParseIntError> {
s.split(' ')
.map(|s| {
let cores = s
.split(',')
.map(usize::from_str)
.collect::<Result<Vec<usize>, _>>()?;

Ok(CpuCoreSet {
cores,
#[cfg(feature = "numa")]
topology: hwlocality::Topology::new().map(std::sync::Arc::new).ok(),
})
})
.collect()
}

/// Thread indices for each thread pool
pub fn thread_pool_core_indices(
thread_pool_size: Option<NonZeroUsize>,
Expand Down

0 comments on commit ecfb6bc

Please sign in to comment.