Skip to content

Commit

Permalink
Merge pull request #2849 from subspace/multithreaded-farming-cluster-…
Browse files Browse the repository at this point in the history
…services

Multithreaded farming cluster services
  • Loading branch information
nazar-pc authored Jun 14, 2024
2 parents c36dab3 + d1242d9 commit d1af0db
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 63 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primiti
subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" }
subspace-verification = { version = "0.1.0", path = "../subspace-verification" }
thiserror = "1.0.59"
tokio = { version = "1.37.0", features = ["sync", "time"] }
tokio = { version = "1.38.0", features = ["sync", "time"] }
tracing = "0.1.40"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-proof-of-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ sp-runtime = { git = "https://github.com/subspace/polkadot-sdk", rev = "6da3c45e
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-proof-of-time = { version = "0.1.0", path = "../subspace-proof-of-time" }
thread-priority = "1.1.0"
tokio = { version = "1.37.0", features = ["sync"] }
tokio = { version = "1.38.0", features = ["sync"] }
tracing = "0.1.40"
2 changes: 1 addition & 1 deletion crates/sp-domains-fraud-proof/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ subspace-test-client = { version = "0.1.0", path = "../../test/subspace-test-cli
subspace-test-service = { version = "0.1.0", path = "../../test/subspace-test-service" }
subspace-runtime-primitives = { version = "0.1.0", path = "../../crates/subspace-runtime-primitives" }
tempfile = "3.10.1"
tokio = "1.37.0"
tokio = "1.38.0"

[features]
default = ["std"]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-codin
subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space", features = ["parallel"] }
subspace-verification = { version = "0.1.0", path = "../subspace-verification" }
thiserror = "1.0.59"
tokio = { version = "1.37.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync"] }
tokio = { version = "1.38.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync"] }
tracing = "0.1.40"

[target.'cfg(windows)'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ supports-color = "3.0.0"
tempfile = "3.10.1"
thiserror = "1.0.59"
thread-priority = "1.1.0"
tokio = { version = "1.37.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio = { version = "1.38.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
ulid = { version = "1.1.2", features = ["serde"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use anyhow::anyhow;
use bytesize::ByteSize;
use clap::Parser;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use prometheus_client::registry::Registry;
use std::fs;
use std::future::Future;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
use subspace_farmer::cluster::cache::cache_service;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::piece_cache::PieceCache;
use subspace_farmer::utils::AsyncJoinOnDrop;

/// Interval between cache self-identification broadcast messages
pub(super) const CACHE_IDENTIFICATION_BROADCAST_INTERVAL: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -98,6 +102,12 @@ pub(super) struct CacheArgs {
/// Cache group to use, the same cache group must be also specified on corresponding controller
#[arg(long, default_value = "default")]
cache_group: String,
/// Number of service instances.
///
/// Increasing number of services allows to process more concurrent requests, but increasing
/// beyond number of CPU cores doesn't make sense and will likely hurt performance instead.
#[arg(long, default_value = "32")]
service_instances: NonZeroUsize,
/// Additional cluster components
#[clap(raw = true)]
pub(super) additional_components: Vec<String>,
Expand All @@ -112,6 +122,7 @@ pub(super) async fn cache(
mut disk_caches,
tmp,
cache_group,
service_instances,
additional_components: _,
} = cache_args;

Expand Down Expand Up @@ -165,15 +176,39 @@ pub(super) async fn cache(
})
.collect::<Result<Vec<_>, _>>()?;

let mut cache_services = (0..service_instances.get())
.map(|index| {
let nats_client = nats_client.clone();
let caches = caches.clone();
let cache_group = cache_group.clone();

AsyncJoinOnDrop::new(
tokio::spawn(async move {
cache_service(
nats_client,
&caches,
&cache_group,
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
CACHE_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
)
.await
}),
true,
)
})
.collect::<FuturesUnordered<_>>();

Ok(Box::pin(async move {
cache_service(
nats_client,
&caches,
&cache_group,
CACHE_IDENTIFICATION_BROADCAST_INTERVAL,
)
.await
.map_err(|error| anyhow!("Cache service failed: {error}"))?;
cache_services
.next()
.await
.expect("Not empty; qed")
.map_err(|error| anyhow!("Cache service failed: {error}"))?
.map_err(|error| anyhow!("Cache service failed: {error}"))?;

drop(tmp_directory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use backoff::ExponentialBackoff;
use clap::{Parser, ValueHint};
use futures::{select, FutureExt};
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use prometheus_client::registry::Registry;
use std::future::Future;
use std::num::NonZeroUsize;
Expand All @@ -26,7 +27,7 @@ use subspace_farmer::node_client::NodeClient;
use subspace_farmer::utils::farmer_piece_getter::{DsnCacheRetryPolicy, FarmerPieceGetter};
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::plotted_pieces::PlottedPieces;
use subspace_farmer::utils::run_future_in_dedicated_thread;
use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use subspace_farmer::Identity;
use subspace_networking::utils::piece_provider::PieceProvider;
use tracing::info;
Expand Down Expand Up @@ -61,6 +62,12 @@ pub(super) struct ControllerArgs {
/// must be also specified on corresponding caches.
#[arg(long, default_value = "default")]
cache_group: String,
/// Number of service instances.
///
/// Increasing number of services allows to process more concurrent requests, but increasing
/// beyond number of CPU cores doesn't make sense and will likely hurt performance instead.
#[arg(long, default_value = "32")]
service_instances: NonZeroUsize,
/// Network parameters
#[clap(flatten)]
network_args: NetworkArgs,
Expand All @@ -85,6 +92,7 @@ pub(super) async fn controller(
base_path,
node_rpc_url,
cache_group,
service_instances,
mut network_args,
dev,
tmp,
Expand Down Expand Up @@ -187,26 +195,38 @@ pub(super) async fn controller(
"controller-cache-worker".to_string(),
)?;

let controller_service_fut = run_future_in_dedicated_thread(
{
let mut controller_services = (0..service_instances.get())
.map(|_| {
let nats_client = nats_client.clone();
let instance = instance.clone();
let node_client = node_client.clone();
let piece_getter = piece_getter.clone();
let farmer_cache = farmer_cache.clone();
let instance = instance.clone();

move || async move {
controller_service(
&nats_client,
&node_client,
&piece_getter,
&farmer_cache,
&instance,
)
.await
.map_err(|error| anyhow!("Controller service failed: {error}"))
}
},
"controller-service".to_string(),
)?;
AsyncJoinOnDrop::new(
tokio::spawn(async move {
controller_service(
&nats_client,
&node_client,
&piece_getter,
&farmer_cache,
&instance,
)
.await
}),
true,
)
})
.collect::<FuturesUnordered<_>>();

let controller_service_fut = async move {
controller_services
.next()
.await
.expect("Not empty; qed")
.map_err(|error| anyhow!("Controller service failed: {error}"))?
.map_err(|error| anyhow!("Controller service failed: {error}"))
};

let farms_fut = run_future_in_dedicated_thread(
{
Expand Down Expand Up @@ -264,7 +284,7 @@ pub(super) async fn controller(

// Controller service future
result = controller_service_fut.fuse() => {
result??;
result?;
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ pub(super) struct FarmerArgs {
/// By default, farmer will continue running if there are still other working farms.
#[arg(long)]
exit_on_farm_error: bool,
/// Number of service instances.
///
/// Increasing number of services allows to process more concurrent requests, but increasing
/// beyond number of CPU cores doesn't make sense and will likely hurt performance instead.
#[arg(long, default_value = "32")]
service_instances: NonZeroUsize,
/// Additional cluster components
#[clap(raw = true)]
pub(super) additional_components: Vec<String>,
Expand All @@ -129,6 +135,7 @@ where
disable_farm_locking,
create,
exit_on_farm_error,
service_instances,
additional_components: _,
} = farmer_args;

Expand Down Expand Up @@ -354,19 +361,32 @@ where
.try_collect::<Vec<_>>()
.await?;

let farmer_service_fut = farmer_service(
nats_client,
farms.as_slice(),
FARMER_IDENTIFICATION_BROADCAST_INTERVAL,
);
let farmer_service_fut = run_future_in_dedicated_thread(
move || async move {
farmer_service_fut
.await
.map_err(|error| anyhow!("Farmer service failed: {error}"))
},
"controller-service".to_string(),
)?;
let mut farmer_services = (0..service_instances.get())
.map(|index| {
AsyncJoinOnDrop::new(
tokio::spawn(farmer_service(
nats_client.clone(),
farms.as_slice(),
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
FARMER_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
)),
true,
)
})
.collect::<FuturesUnordered<_>>();

let farmer_service_fut = async move {
farmer_services
.next()
.await
.expect("Not empty; qed")
.map_err(|error| anyhow!("Farmer service failed: {error}"))?
.map_err(|error| anyhow!("Farmer service failed: {error}"))
};

let mut farms_stream = (0u8..)
.zip(farms)
Expand Down Expand Up @@ -512,7 +532,7 @@ where

// Piece cache worker future
result = farmer_service_fut.fuse() => {
result??;
result?;
},
}

Expand Down
Loading

0 comments on commit d1af0db

Please sign in to comment.