Skip to content

Commit

Permalink
Constrain farmer piece getter concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Jun 7, 2024
1 parent 6d13125 commit e5a03f7
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use clap::{Parser, ValueHint};
use futures::{select, FutureExt};
use prometheus_client::registry::Registry;
use std::future::Future;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::pin::{pin, Pin};
use std::sync::Arc;
Expand Down Expand Up @@ -41,6 +42,11 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40);
/// Arguments for controller
#[derive(Debug, Parser)]
pub(super) struct ControllerArgs {
/// Piece getter concurrency.
///
/// Increase will result in higher memory usage.
#[arg(long, default_value = "128")]
piece_getter_concurrency: NonZeroUsize,
/// Base path where to store P2P network identity
#[arg(long, value_hint = ValueHint::DirPath)]
base_path: Option<PathBuf>,
Expand Down Expand Up @@ -75,6 +81,7 @@ pub(super) async fn controller(
controller_args: ControllerArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>> {
let ControllerArgs {
piece_getter_concurrency,
base_path,
node_rpc_url,
cache_group,
Expand Down Expand Up @@ -164,6 +171,7 @@ pub(super) async fn controller(
..ExponentialBackoff::default()
},
},
piece_getter_concurrency,
);

let farmer_cache_worker_fut = run_future_in_dedicated_thread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(super) struct PlotterArgs {
/// Increase can result in NATS communication issues if too many messages arrive via NATS, but
/// are not processed quickly enough for some reason and might require increasing cluster-level
/// `--nats-pool-size` parameter.
#[arg(long, default_value = "100")]
#[arg(long, default_value = "32")]
piece_getter_concurrency: NonZeroUsize,
/// Defines how many sectors farmer will download concurrently, allows to limit memory usage of
/// the plotting process, defaults to `--sector-encoding-concurrency` + 1 to download future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ pub(crate) struct FarmingArgs {
/// one specified endpoint. Format: 127.0.0.1:8080
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])]
prometheus_listen_on: Vec<SocketAddr>,
/// Piece getter concurrency.
///
/// Increase will result in higher memory usage.
#[arg(long, default_value = "128")]
piece_getter_concurrency: NonZeroUsize,
/// Defines how many sectors farmer will download concurrently, allows to limit memory usage of
/// the plotting process, defaults to `--sector-encoding-concurrency` + 1 to download future
/// sector ahead of time.
Expand Down Expand Up @@ -242,6 +247,7 @@ where
tmp,
mut disk_farms,
prometheus_listen_on,
piece_getter_concurrency,
sector_downloading_concurrency,
sector_encoding_concurrency,
record_encoding_concurrency,
Expand Down Expand Up @@ -398,6 +404,7 @@ where
..ExponentialBackoff::default()
},
},
piece_getter_concurrency,
);

let farmer_cache_worker_fut = run_future_in_dedicated_thread(
Expand Down
15 changes: 14 additions & 1 deletion crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use crate::utils::plotted_pieces::PlottedPieces;
use async_lock::{Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock};
use async_lock::{
Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore,
};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
Expand All @@ -11,6 +13,7 @@ use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use subspace_core_primitives::{Piece, PieceIndex};
Expand Down Expand Up @@ -102,6 +105,7 @@ struct Inner<FarmIndex, PV, NC> {
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
in_progress_pieces: Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
request_semaphore: Arc<Semaphore>,
}

pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
Expand Down Expand Up @@ -137,7 +141,9 @@ where
node_client: NC,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
request_concurrency: NonZeroUsize,
) -> Self {
let request_semaphore = Arc::new(Semaphore::new(request_concurrency.get()));
Self {
inner: Arc::new(Inner {
piece_provider,
Expand All @@ -146,12 +152,15 @@ where
plotted_pieces,
dsn_cache_retry_policy,
in_progress_pieces: Mutex::default(),
request_semaphore,
}),
}
}

/// Fast way to get piece using various caches
pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
InProgressPiece::Getting(in_progress_piece_getting) => {
// Try to get the piece without releasing lock to make sure successfully
Expand Down Expand Up @@ -221,6 +230,8 @@ where

/// Slow way to get piece using archival storage
pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
InProgressPiece::Getting(in_progress_piece_getting) => {
// Try to get the piece without releasing lock to make sure successfully
Expand Down Expand Up @@ -358,6 +369,8 @@ where
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
InProgressPiece::Getting(in_progress_piece_getting) => {
// Try to get the piece without releasing lock to make sure successfully
Expand Down

0 comments on commit e5a03f7

Please sign in to comment.