diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 8e785dd687..4e868f87c3 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -12,6 +12,7 @@ use clap::{Parser, ValueHint}; use futures::{select, FutureExt}; use prometheus_client::registry::Registry; use std::future::Future; +use std::num::NonZeroUsize; use std::path::PathBuf; use std::pin::{pin, Pin}; use std::sync::Arc; @@ -41,6 +42,11 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40); /// Arguments for controller #[derive(Debug, Parser)] pub(super) struct ControllerArgs { + /// Piece getter concurrency. + /// + /// Increase will result in higher memory usage. + #[arg(long, default_value = "128")] + piece_getter_concurrency: NonZeroUsize, /// Base path where to store P2P network identity #[arg(long, value_hint = ValueHint::DirPath)] base_path: Option, @@ -75,6 +81,7 @@ pub(super) async fn controller( controller_args: ControllerArgs, ) -> anyhow::Result>>>> { let ControllerArgs { + piece_getter_concurrency, base_path, node_rpc_url, cache_group, @@ -164,6 +171,7 @@ pub(super) async fn controller( ..ExponentialBackoff::default() }, }, + piece_getter_concurrency, ); let farmer_cache_worker_fut = run_future_in_dedicated_thread( diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index 4ce94a2db1..07caa52a14 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -28,7 +28,7 @@ pub(super) struct PlotterArgs { /// Increase can result in NATS communication issues if too many messages arrive via NATS, but /// are not processed quickly enough for some reason and might require increasing cluster-level /// `--nats-pool-size` parameter. - #[arg(long, default_value = "100")] + #[arg(long, default_value = "32")] piece_getter_concurrency: NonZeroUsize, /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of /// the plotting process, defaults to `--sector-encoding-concurrency` + 1 to download future diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 323e1533a8..67a550c147 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -116,6 +116,11 @@ pub(crate) struct FarmingArgs { /// one specified endpoint. Format: 127.0.0.1:8080 #[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])] prometheus_listen_on: Vec, + /// Piece getter concurrency. + /// + /// Increase will result in higher memory usage. + #[arg(long, default_value = "128")] + piece_getter_concurrency: NonZeroUsize, /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of /// the plotting process, defaults to `--sector-encoding-concurrency` + 1 to download future /// sector ahead of time. @@ -242,6 +247,7 @@ where tmp, mut disk_farms, prometheus_listen_on, + piece_getter_concurrency, sector_downloading_concurrency, sector_encoding_concurrency, record_encoding_concurrency, @@ -398,6 +404,7 @@ where ..ExponentialBackoff::default() }, }, + piece_getter_concurrency, ); let farmer_cache_worker_fut = run_future_in_dedicated_thread( diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index daabded59e..7da4c94045 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -1,7 +1,9 @@ use crate::farmer_cache::FarmerCache; use crate::node_client::NodeClient; use crate::utils::plotted_pieces::PlottedPieces; -use async_lock::{Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock}; +use async_lock::{ + Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore, +}; use async_trait::async_trait; use backoff::backoff::Backoff; use backoff::future::retry; @@ -11,6 +13,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt; use std::hash::Hash; +use std::num::NonZeroUsize; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; use subspace_core_primitives::{Piece, PieceIndex}; @@ -102,6 +105,7 @@ struct Inner { plotted_pieces: Arc>>, dsn_cache_retry_policy: DsnCacheRetryPolicy, in_progress_pieces: Mutex>>>>, + request_semaphore: Arc, } pub struct FarmerPieceGetter { @@ -137,7 +141,9 @@ where node_client: NC, plotted_pieces: Arc>>, dsn_cache_retry_policy: DsnCacheRetryPolicy, + request_concurrency: NonZeroUsize, ) -> Self { + let request_semaphore = Arc::new(Semaphore::new(request_concurrency.get())); Self { inner: Arc::new(Inner { piece_provider, @@ -146,12 +152,15 @@ where plotted_pieces, dsn_cache_retry_policy, in_progress_pieces: Mutex::default(), + request_semaphore, }), } } /// Fast way to get piece using various caches pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option { + let _guard = self.inner.request_semaphore.acquire().await; + match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) { InProgressPiece::Getting(in_progress_piece_getting) => { // Try to get the piece without releasing lock to make sure successfully @@ -221,6 +230,8 @@ where /// Slow way to get piece using archival storage pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option { + let _guard = self.inner.request_semaphore.acquire().await; + match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) { InProgressPiece::Getting(in_progress_piece_getting) => { // Try to get the piece without releasing lock to make sure successfully @@ -358,6 +369,8 @@ where &self, piece_index: PieceIndex, ) -> Result, Box> { + let _guard = self.inner.request_semaphore.acquire().await; + match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) { InProgressPiece::Getting(in_progress_piece_getting) => { // Try to get the piece without releasing lock to make sure successfully