diff --git a/Cargo.lock b/Cargo.lock index 7ae4ad091fb..57edf453b51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -946,6 +946,39 @@ dependencies = [ "event-listener 2.5.3", ] +[[package]] +name = "async-nats" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eea7b126ebfa4db78e9e788b2a792b6329f35b4f2fdd56dbc646dedc2beec7a5" +dependencies = [ + "base64 0.22.0", + "bytes", + "futures", + "memchr", + "nkeys", + "nuid", + "once_cell", + "portable-atomic", + "rand", + "regex", + "ring 0.17.8", + "rustls-native-certs 0.7.0", + "rustls-pemfile 2.1.2", + "rustls-webpki 0.102.3", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror", + "time", + "tokio", + "tokio-rustls 0.25.0", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-net" version = "1.8.0" @@ -2378,6 +2411,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" dependencies = [ "const-oid", + "pem-rfc7468", "zeroize", ] @@ -2416,6 +2450,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -3013,6 +3048,7 @@ dependencies = [ "rand_core 0.6.4", "serde", "sha2 0.10.8", + "signature", "subtle 2.5.0", "zeroize", ] @@ -7143,6 +7179,21 @@ dependencies = [ "libc", ] +[[package]] +name = "nkeys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc522a19199a0795776406619aa6aa78e1e55690fbeb3181b8db5265fd0e89ce" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.14", + "log", + "rand", + "signatory", +] + [[package]] name = "no-std-compat" version = "0.4.1" @@ -7187,6 +7238,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand", +] + [[package]] name = "num" version = "0.4.2" @@ -8113,6 +8173,15 @@ dependencies = [ "serde", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -10820,6 +10889,26 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_repr" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "serde_spanned" version = "0.6.5" @@ -10927,6 +11016,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -12198,6 +12299,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-lock 3.3.0", + "async-nats", "async-trait", "backoff", "base58", @@ -12220,6 +12322,7 @@ dependencies = [ "num_cpus", "parity-scale-codec", "parking_lot 0.12.2", + "pin-project", "prometheus-client 0.22.2", "rand", "rayon", @@ -13577,6 +13680,17 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9f0a709784e86923586cff0d872dba54cd2d2e116b3bc57587d15737cfce9d" +dependencies = [ + "futures", + "pin-project-lite 0.2.14", + "tokio", +] + [[package]] name = "tt-call" version = "1.0.9" diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index b886de533be..b33c483d2d8 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -14,6 +14,7 @@ include = [ [dependencies] anyhow = "1.0.82" async-lock = "3.3.0" +async-nats = "0.34.0" async-trait = "0.1.80" backoff = { version = "0.4.0", features = ["futures", "tokio"] } base58 = "0.2.0" @@ -36,6 +37,7 @@ mimalloc = "0.1.41" num_cpus = "1.16.0" parity-scale-codec = "3.6.9" parking_lot = "0.12.2" +pin-project = "1.1.5" prometheus-client = "0.22.2" rand = "0.8.5" rayon = "1.10.0" diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs index 5f5ad446651..0d6936e9c83 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs @@ -1,4 +1,5 @@ pub(crate) mod benchmark; +pub(crate) mod cluster; pub(crate) mod farm; mod info; mod scrub; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs new file mode 100644 index 00000000000..23984ff9f92 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs @@ -0,0 +1,111 @@ +mod cache; +mod controller; +mod farmer; +mod plotter; + +use crate::commands::cluster::cache::{cache, CacheArgs}; +use crate::commands::cluster::controller::{controller, ControllerArgs}; +use crate::commands::cluster::farmer::{farmer, FarmerArgs}; +use crate::commands::cluster::plotter::{plotter, PlotterArgs}; +use crate::utils::shutdown_signal; +use anyhow::anyhow; +use async_nats::ServerAddr; +use clap::{Parser, Subcommand}; +use futures::{select, FutureExt}; +use prometheus_client::registry::Registry; +use std::net::SocketAddr; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_proof_of_space::Table; + +/// Arguments for farmer +#[derive(Debug, Parser)] +pub(crate) struct ClusterArgs { + /// Shared arguments for all subcommands + #[clap(flatten)] + shared_args: SharedArgs, + /// Cluster subcommands + #[clap(subcommand)] + subcommand: ClusterSubcommand, +} + +/// Shared arguments +#[derive(Debug, Parser)] +struct SharedArgs { + /// NATS server address, typically in `nats://server1:port1` format, can be specified multiple + /// times. + /// + /// NOTE: NATS must be configured for message sizes of 2MiB or larger (1MiB is the default). + #[arg(long, alias = "nats-server")] + nats_servers: Vec, + /// Defines endpoints for the prometheus metrics server. It doesn't start without at least + /// one specified endpoint. Format: 127.0.0.1:8080 + #[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])] + prometheus_listen_on: Vec, +} + +/// Arguments for cluster +#[derive(Debug, Subcommand)] +enum ClusterSubcommand { + /// Farming cluster controller + Controller(ControllerArgs), + /// Farming cluster farmer + Farmer(FarmerArgs), + /// Farming cluster plotter + Plotter(PlotterArgs), + /// Farming cluster cache + Cache(CacheArgs), +} + +pub(crate) async fn cluster(cluster_args: ClusterArgs) -> anyhow::Result<()> +where + PosTable: Table, +{ + let signal = shutdown_signal(); + + let nats_client = NatsClient::new(cluster_args.shared_args.nats_servers) + .await + .map_err(|error| anyhow!(error))?; + let mut registry = Registry::default(); + + let run_fut = async move { + match cluster_args.subcommand { + ClusterSubcommand::Controller(controller_args) => { + controller(nats_client, &mut registry, controller_args).await + } + ClusterSubcommand::Farmer(farmer_args) => { + farmer::(nats_client, &mut registry, farmer_args).await + } + ClusterSubcommand::Plotter(plotter_args) => { + plotter::(nats_client, &mut registry, plotter_args).await + } + ClusterSubcommand::Cache(cache_args) => { + cache(nats_client, &mut registry, cache_args).await + } + } + }; + + // TODO: Run + // let _prometheus_worker = if !cluster_args.shared_args.prometheus_listen_on.is_empty() { + // let prometheus_task = start_prometheus_metrics_server( + // cluster_args.shared_args.prometheus_listen_on, + // RegistryAdapter::PrometheusClient(registry), + // )?; + // + // let join_handle = tokio::spawn(prometheus_task); + // Some(AsyncJoinOnDrop::new(join_handle, true)) + // } else { + // None + // }; + + select! { + // Signal future + _ = signal.fuse() => { + Ok(()) + }, + + // Run future + result = run_fut.fuse() => { + result + }, + } +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs new file mode 100644 index 00000000000..50edaf3f8ad --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs @@ -0,0 +1,20 @@ +use clap::Parser; +use prometheus_client::registry::Registry; +use subspace_farmer::cluster::nats_client::NatsClient; + +/// Arguments for cache +#[derive(Debug, Parser)] +pub(super) struct CacheArgs { + // TODO: Paths + /// Cache group to use, the same cache group must be also specified on corresponding controller + #[arg(long, default_value = "default")] + cache_group: String, +} + +pub(super) async fn cache( + _nats_client: NatsClient, + _registry: &mut Registry, + _cache_args: CacheArgs, +) -> anyhow::Result<()> { + todo!() +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs new file mode 100644 index 00000000000..5a530909b39 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -0,0 +1,561 @@ +use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_NOTIFICATION_INTERVAL; +use crate::commands::shared::derive_libp2p_keypair; +use crate::commands::shared::network::{configure_network, NetworkArgs}; +use anyhow::anyhow; +use async_lock::RwLock as AsyncRwLock; +use backoff::ExponentialBackoff; +use clap::{Parser, ValueHint}; +use futures::channel::oneshot; +use futures::future::{Fuse, FusedFuture}; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, StreamExt}; +use parity_scale_codec::Decode; +use parking_lot::Mutex; +use prometheus_client::registry::Registry; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::future::{pending, ready, Future}; +use std::path::PathBuf; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; +use subspace_core_primitives::{Blake3Hash, SectorIndex}; +use subspace_farmer::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; +use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmNotification}; +use subspace_farmer::cluster::nats_client::{GenericNotification, NatsClient}; +use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; +use subspace_farmer::farmer_cache::FarmerCache; +use subspace_farmer::utils::farmer_piece_getter::{DsnCacheRetryPolicy, FarmerPieceGetter}; +use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; +use subspace_farmer::utils::plotted_pieces::PlottedPieces; +use subspace_farmer::utils::run_future_in_dedicated_thread; +use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; +use subspace_networking::utils::piece_provider::PieceProvider; +use tokio::time::MissedTickBehavior; +use tracing::{error, info, warn}; + +/// Get piece retry attempts number. +const PIECE_GETTER_MAX_RETRIES: u16 = 7; +/// Defines initial duration between get_piece calls. +const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(5); +/// Defines max duration between get_piece calls. +const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40); + +type FarmIndex = u16; + +#[derive(Debug)] +struct KnownFarm { + farm_id: FarmId, + fingerprint: Blake3Hash, + last_identification: Instant, + expired_sender: oneshot::Sender<()>, +} + +enum KnownFarmInsertResult { + Inserted { + farm_index: FarmIndex, + expired_receiver: oneshot::Receiver<()>, + }, + FingerprintUpdated { + farm_index: FarmIndex, + expired_receiver: oneshot::Receiver<()>, + }, + NotInserted, +} + +#[derive(Debug, Default)] +struct KnownFarms { + known_farms: HashMap, +} + +impl KnownFarms { + /// Returns new allocated farm index if farm was not known, `None` otherwise + fn insert_or_update( + &mut self, + farm_id: FarmId, + fingerprint: Blake3Hash, + ) -> KnownFarmInsertResult { + if let Some(existing_result) = + self.known_farms + .iter_mut() + .find_map(|(&farm_index, known_farm)| { + if known_farm.farm_id == farm_id { + if known_farm.fingerprint == fingerprint { + known_farm.last_identification = Instant::now(); + Some(KnownFarmInsertResult::NotInserted) + } else { + let (expired_sender, expired_receiver) = oneshot::channel(); + + known_farm.fingerprint = fingerprint; + known_farm.expired_sender = expired_sender; + + Some(KnownFarmInsertResult::FingerprintUpdated { + farm_index, + expired_receiver, + }) + } + } else { + None + } + }) + { + return existing_result; + } + + for farm_index in FarmIndex::MIN..=FarmIndex::MAX { + if let Entry::Vacant(entry) = self.known_farms.entry(farm_index) { + let (expired_sender, expired_receiver) = oneshot::channel(); + + entry.insert(KnownFarm { + farm_id, + fingerprint, + last_identification: Instant::now(), + expired_sender, + }); + + return KnownFarmInsertResult::Inserted { + farm_index, + expired_receiver, + }; + } + } + + warn!(%farm_id, max_supported_farm_index = %FarmIndex::MAX, "Too many farms, ignoring"); + KnownFarmInsertResult::NotInserted + } + + fn remove_expired(&mut self) -> impl Iterator + '_ { + self.known_farms.extract_if(|&farm_index, known_farm| { + known_farm.last_identification.elapsed() + > FARMER_IDENTIFICATION_NOTIFICATION_INTERVAL * 2 + }) + } + + fn remove(&mut self, farm_index: FarmIndex) { + self.known_farms.remove(&farm_index); + } +} + +// TODO: Piece request processing concurrency +/// Arguments for controller +#[derive(Debug, Parser)] +pub(super) struct ControllerArgs { + /// Base path where to store P2P network identity + #[arg(long, value_hint = ValueHint::DirPath)] + base_path: PathBuf, + /// WebSocket RPC URL of the Subspace node to connect to + #[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")] + node_rpc_url: String, + /// Cache group managed by this controller, each controller must have its dedicated cache group. + /// + /// It is strongly recommended to use alphanumeric values for cache group, the same cache group + /// must be also specified on corresponding caches. + #[arg(long, default_value = "default")] + cache_group: String, + /// Network parameters + #[clap(flatten)] + network_args: NetworkArgs, + /// Sets some flags that are convenient during development, currently `--allow-private-ips` + #[arg(long)] + dev: bool, + /// Run temporary controller identity + #[arg(long, conflicts_with = "base_path")] + tmp: bool, +} + +pub(super) async fn controller( + nats_client: NatsClient, + registry: &mut Registry, + controller_args: ControllerArgs, +) -> anyhow::Result<()> { + let ControllerArgs { + mut base_path, + node_rpc_url, + cache_group, + mut network_args, + dev, + tmp, + } = controller_args; + + // Override flags with `--dev` + network_args.allow_private_ips = network_args.allow_private_ips || dev; + + let _tmp_directory = if tmp { + let tmp_directory = tempfile::Builder::new() + .prefix("subspace-cluster-controller-") + .tempdir()?; + + base_path = tmp_directory.as_ref().to_path_buf(); + + Some(tmp_directory) + } else { + if base_path == PathBuf::default() { + return Err(anyhow!("--base-path must be specified explicitly")); + } + + None + }; + + let plotted_pieces = Arc::new(AsyncRwLock::new(PlottedPieces::::default())); + + info!(url = %node_rpc_url, "Connecting to node RPC"); + let node_client = NodeRpcClient::new(&node_rpc_url).await?; + + let farmer_app_info = node_client + .farmer_app_info() + .await + .map_err(|error| anyhow!(error))?; + + let identity = Identity::open_or_create(&base_path) + .map_err(|error| anyhow!("Failed to open or create identity: {error}"))?; + let keypair = derive_libp2p_keypair(identity.secret_key()); + let peer_id = keypair.public().to_peer_id(); + let instance = peer_id.to_string(); + + let (farmer_cache, farmer_cache_worker) = FarmerCache::new(node_client.clone(), peer_id); + + // TODO: Metrics + + let (node, mut node_runner) = { + if network_args.bootstrap_nodes.is_empty() { + network_args + .bootstrap_nodes + .clone_from(&farmer_app_info.dsn_bootstrap_nodes); + } + + configure_network( + hex::encode(farmer_app_info.genesis_hash), + &base_path, + keypair, + network_args, + Arc::downgrade(&plotted_pieces), + node_client.clone(), + farmer_cache.clone(), + Some(registry), + )? + }; + + let kzg = Kzg::new(embedded_kzg_settings()); + let validator = Some(SegmentCommitmentPieceValidator::new( + node.clone(), + node_client.clone(), + kzg.clone(), + )); + let piece_provider = PieceProvider::new(node.clone(), validator.clone()); + + let piece_getter = FarmerPieceGetter::new( + piece_provider, + farmer_cache.clone(), + node_client.clone(), + Arc::clone(&plotted_pieces), + DsnCacheRetryPolicy { + max_retries: PIECE_GETTER_MAX_RETRIES, + backoff: ExponentialBackoff { + initial_interval: GET_PIECE_INITIAL_INTERVAL, + max_interval: GET_PIECE_MAX_INTERVAL, + // Try until we get a valid piece + max_elapsed_time: None, + multiplier: 1.75, + ..ExponentialBackoff::default() + }, + }, + ); + + let farmer_cache_worker_fut = run_future_in_dedicated_thread( + { + let future = farmer_cache_worker.run(piece_getter.downgrade()); + + move || future + }, + "controller-cache-worker".to_string(), + )?; + + let farm_fut = run_future_in_dedicated_thread( + move || async move { + let mut known_farms = KnownFarms::default(); + + type AddRemoveFuture<'a> = Pin< + Box< + dyn Future, Box)>> + + 'a, + >, + >; + // Futures that need to be processed sequentially in order to add/remove farms, if farm + // was added, second tuple value will be `Some`, `None` if removed. Outer `Option` is + // simply to be compatible with `farm_add_remove_in_progress` below + let mut farms_to_add_remove = VecDeque::::new(); + // Farm that is being added/removed right now (if any) + let mut farm_add_remove_in_progress: Fuse = + (Box::pin(ready(None)) as AddRemoveFuture).fuse(); + // Initialize with pending future so it never ends + let mut farms = FuturesUnordered::< + Pin)>>>, + >::from_iter([Box::pin(pending()) as Pin>]); + + let farmer_identify_subscription = pin!(nats_client + .subscribe(ClusterFarmerIdentifyFarmNotification::SUBSCRIPTION_SUBJECT) + .await? + .filter_map(move |message| async move { + ClusterFarmerIdentifyFarmNotification::decode(&mut message.payload.as_ref()) + .ok() + })); + + // Request farmer to identify themselves + if let Err(error) = nats_client + .broadcast(&ClusterControllerFarmerIdentifyBroadcast, &instance) + .await + { + warn!(%error, "Failed to send farmer identification broadcast"); + } + + let mut farmer_identify_subscription = farmer_identify_subscription.fuse(); + let mut farm_pruning_interval = tokio::time::interval_at( + (Instant::now() + FARMER_IDENTIFICATION_NOTIFICATION_INTERVAL * 2).into(), + FARMER_IDENTIFICATION_NOTIFICATION_INTERVAL * 2, + ); + farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + if farm_add_remove_in_progress.is_terminated() { + if let Some(fut) = farms_to_add_remove.pop_front() { + farm_add_remove_in_progress = fut.fuse(); + } + } + + select! { + (farm_index, result) = farms.select_next_some() => { + known_farms.remove(farm_index); + { + let plotted_pieces = &plotted_pieces; + + farms_to_add_remove.push_back(Box::pin(async move { + plotted_pieces.write().await.delete_farm(farm_index); + + None + })); + } + + match result { + Ok(()) => { + info!(%farm_index, "Farm exited successfully"); + } + Err(error) => { + error!(%farm_index, %error, "Farm exited with error"); + } + } + } + maybe_identify_message = farmer_identify_subscription.next() => { + if let Some(identify_message) = maybe_identify_message { + let ClusterFarmerIdentifyFarmNotification { + farm_id, + total_sectors_count, + fingerprint, + } = identify_message; + let (farm_index, expired_receiver, add, remove) = match known_farms.insert_or_update(farm_id, fingerprint) { + KnownFarmInsertResult::Inserted { farm_index, expired_receiver } => { + info!( + %farm_index, + %farm_id, + "Discovered new farm, initializing" + ); + + (farm_index, expired_receiver, true, false) + } + KnownFarmInsertResult::FingerprintUpdated { farm_index, expired_receiver } => { + info!( + %farm_index, + %farm_id, + "Farmer fingerprint updated, re-initializing" + ); + + (farm_index, expired_receiver, true, true) + } + KnownFarmInsertResult::NotInserted => { + // Nothing to do here + continue; + } + }; + + if remove { + let plotted_pieces = &plotted_pieces; + + farms_to_add_remove.push_back(Box::pin(async move { + plotted_pieces.write().await.delete_farm(farm_index); + + None + })); + } + + if add { + let plotted_pieces = &plotted_pieces; + let nats_client = &nats_client; + + farms_to_add_remove.push_back(Box::pin(async move { + match initialize_farm( + farm_index, + farm_id, + total_sectors_count, + plotted_pieces, + nats_client, + ).await { + Ok(farm) => { + if remove { + info!( + %farm_index, + %farm_id, + "Farm re-initialized successfully" + ); + } else { + info!( + %farm_index, + %farm_id, + "Farm initialized successfully" + ); + } + + Some((farm_index, expired_receiver, Box::new(farm) as Box<_>)) + } + Err(error) => { + warn!( + %error, + "Failed to initialize farm {farm_id}" + ); + None + } + } + })); + } + } else { + return Err(anyhow!("Farmer identify stream ended")); + } + } + _ = farm_pruning_interval.tick().fuse() => { + for (farm_index, removed_farm) in known_farms.remove_expired() { + if removed_farm.expired_sender.send(()).is_ok() { + warn!( + %farm_index, + farm_id = %removed_farm.farm_id, + "Farm expired and removed" + ); + } else { + warn!( + %farm_index, + farm_id = %removed_farm.farm_id, + "Farm exited before expiration notification" + ); + } + plotted_pieces.write().await.delete_farm(farm_index); + } + } + result = farm_add_remove_in_progress => { + if let Some((farm_index, expired_receiver, farm)) = result { + farms.push(Box::pin(async move { + select! { + result = farm.run().fuse() => { + (farm_index, result) + } + _ = expired_receiver.fuse() => { + // Nothing to do + (farm_index, Ok(())) + } + } + })); + } + } + } + } + }, + "controller-farm".to_string(), + )?; + + let networking_fut = run_future_in_dedicated_thread( + move || async move { node_runner.run().await }, + "controller-networking".to_string(), + )?; + + // This defines order in which things are dropped + let networking_fut = networking_fut; + let farm_fut = farm_fut; + let farmer_cache_worker_fut = farmer_cache_worker_fut; + + let networking_fut = pin!(networking_fut); + let farm_fut = pin!(farm_fut); + let farmer_cache_worker_fut = pin!(farmer_cache_worker_fut); + + select! { + // Networking future + _ = networking_fut.fuse() => { + info!("Node runner exited.") + }, + + // Farm future + result = farm_fut.fuse() => { + result??; + }, + + // Piece cache worker future + _ = farmer_cache_worker_fut.fuse() => { + info!("Farmer cache worker exited.") + }, + } + + anyhow::Ok(()) +} + +async fn initialize_farm( + farm_index: FarmIndex, + farm_id: FarmId, + total_sectors_count: SectorIndex, + plotted_pieces: &AsyncRwLock>, + nats_client: &NatsClient, +) -> anyhow::Result { + let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone()).await?; + + let mut plotted_pieces = plotted_pieces.write().await; + plotted_pieces.add_farm(farm_index, farm.piece_reader()); + + // Buffer sectors that are plotted while already plotted sectors are being iterated over + let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new())); + let sector_update_handler = farm.on_sector_update(Arc::new({ + let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer); + + move |(_sector_index, sector_update)| { + if let SectorUpdate::Plotting(SectorPlottingDetails::Finished { + plotted_sector, + old_plotted_sector, + .. + }) = sector_update + { + plotted_sectors_buffer + .lock() + .push((plotted_sector.clone(), old_plotted_sector.clone())); + } + } + })); + + // Add plotted sectors of the farm to global plotted pieces + let plotted_sectors = farm.plotted_sectors(); + let mut plotted_sectors = plotted_sectors + .get() + .await + .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?; + while let Some(plotted_sector_result) = plotted_sectors.next().await { + let plotted_sector = plotted_sector_result + .map_err(|error| anyhow!("Failed to get plotted sector for farm {farm_id}: {error}"))?; + + plotted_pieces.add_sector(farm_index, &plotted_sector); + } + + // Add sectors that were plotted while above iteration was happening to plotted sectors + // too + drop(sector_update_handler); + for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer.lock().drain(..) { + if let Some(old_plotted_sector) = old_plotted_sector { + plotted_pieces.delete_sector(farm_index, &old_plotted_sector); + } + plotted_pieces.add_sector(farm_index, &plotted_sector); + } + + Ok(farm) +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs new file mode 100644 index 00000000000..9902c396ab6 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -0,0 +1,452 @@ +use crate::commands::shared::metrics::{FarmerMetrics, SectorState}; +use crate::commands::shared::DiskFarm; +use anyhow::anyhow; +use async_lock::Mutex as AsyncMutex; +use bytesize::ByteSize; +use clap::Parser; +use futures::stream::{FuturesOrdered, FuturesUnordered}; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use prometheus_client::registry::Registry; +use std::fs; +use std::num::NonZeroUsize; +use std::pin::pin; +use std::sync::Arc; +use std::time::Duration; +use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; +use subspace_core_primitives::{PublicKey, Record}; +use subspace_erasure_coding::ErasureCoding; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::cluster::node_client::ClusterNodeClient; +use subspace_farmer::cluster::plotter::ClusterPlotter; +use subspace_farmer::farm::{ + Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, +}; +use subspace_farmer::single_disk_farm::{ + SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, +}; +use subspace_farmer::utils::ss58::parse_ss58_reward_address; +use subspace_farmer::utils::{ + recommended_number_of_farming_threads, run_future_in_dedicated_thread, AsyncJoinOnDrop, +}; +use subspace_farmer::NodeClient; +use subspace_proof_of_space::Table; +use tokio::sync::{Barrier, Semaphore}; +use tracing::{error, info, info_span, warn, Instrument}; + +const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); +/// Interval between farmer self-identification notification messages +pub(super) const FARMER_IDENTIFICATION_NOTIFICATION_INTERVAL: Duration = Duration::from_secs(60); + +/// Arguments for farmer +#[derive(Debug, Parser)] +pub(super) struct FarmerArgs { + /// One or more farm located at specified path, each with its own allocated space. + /// + /// In case of multiple disks, it is recommended to specify them individually rather than using + /// RAID 0, that way farmer will be able to better take advantage of concurrency of individual + /// drives. + /// + /// Format for each farm is coma-separated list of strings like this: + /// + /// path=/path/to/directory,size=5T + /// + /// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that + /// farmer will make sure to not exceed (and will pre-allocated all the space on startup to + /// ensure it will not run out of space in runtime). + disk_farms: Vec, + /// Address for farming rewards + #[arg(long, value_parser = parse_ss58_reward_address)] + reward_address: PublicKey, + /// Run temporary farmer with specified plot size in human-readable format (e.g. 10GB, 2TiB) or + /// just bytes (e.g. 4096), this will create a temporary directory for storing farmer data that + /// will be deleted at the end of the process. + #[arg(long, conflicts_with = "disk_farms")] + tmp: Option, + /// Maximum number of pieces in sector (can override protocol value to something lower). + /// + /// This will make plotting of individual sectors faster, decrease load on CPU proving, but also + /// proportionally increase amount of disk reads during audits since every sector needs to be + /// audited and there will be more of them. + /// + /// This is primarily for development and not recommended to use by regular users. + #[arg(long)] + max_pieces_in_sector: Option, + /// Do not print info about configured farms on startup + #[arg(long)] + no_info: bool, + /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some + /// compute-intensive operations during proving), defaults to number of logical CPUs + /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but + /// not more than 32 threads + #[arg(long)] + farming_thread_pool_size: Option, + /// Disable farm locking, for example if file system doesn't support it + #[arg(long)] + disable_farm_locking: bool, + /// Exit on farm error. + /// + /// By default, farmer will continue running if there are still other working farms. + #[arg(long)] + exit_on_farm_error: bool, +} + +pub(super) async fn farmer( + nats_client: NatsClient, + registry: &mut Registry, + farmer_args: FarmerArgs, +) -> anyhow::Result<()> +where + PosTable: Table, +{ + let FarmerArgs { + mut disk_farms, + reward_address, + tmp, + max_pieces_in_sector, + no_info, + farming_thread_pool_size, + disable_farm_locking, + exit_on_farm_error, + } = farmer_args; + + let _tmp_directory = if let Some(plot_size) = tmp { + let tmp_directory = tempfile::Builder::new() + .prefix("subspace-farmer-") + .tempdir()?; + + disk_farms = vec![DiskFarm { + directory: tmp_directory.as_ref().to_path_buf(), + allocated_plotting_space: plot_size.as_u64(), + }]; + + Some(tmp_directory) + } else { + if disk_farms.is_empty() { + return Err(anyhow!("There must be at least one disk farm provided")); + } + + for farm in &disk_farms { + if !farm.directory.exists() { + if let Err(error) = fs::create_dir(&farm.directory) { + return Err(anyhow!( + "Directory {} doesn't exist and can't be created: {}", + farm.directory.display(), + error + )); + } + } + } + None + }; + + let node_client = ClusterNodeClient::new(nats_client.clone()); + + let farmer_app_info = node_client + .farmer_app_info() + .await + .map_err(|error| anyhow!(error))?; + + let farmer_metrics = FarmerMetrics::new(registry); + + let kzg = Kzg::new(embedded_kzg_settings()); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| anyhow!(error))?; + + let max_pieces_in_sector = match max_pieces_in_sector { + Some(max_pieces_in_sector) => { + if max_pieces_in_sector > farmer_app_info.protocol_info.max_pieces_in_sector { + warn!( + protocol_value = farmer_app_info.protocol_info.max_pieces_in_sector, + desired_value = max_pieces_in_sector, + "Can't set max pieces in sector higher than protocol value, using protocol \ + value" + ); + + farmer_app_info.protocol_info.max_pieces_in_sector + } else { + max_pieces_in_sector + } + } + None => farmer_app_info.protocol_info.max_pieces_in_sector, + }; + + let farming_thread_pool_size = farming_thread_pool_size + .map(|farming_thread_pool_size| farming_thread_pool_size.get()) + .unwrap_or_else(recommended_number_of_farming_threads); + + let global_mutex = Arc::default(); + let plotter = Arc::new(ClusterPlotter::new(nats_client.clone())); + + let farms = { + let node_client = node_client.clone(); + let info_mutex = &AsyncMutex::new(()); + let faster_read_sector_record_chunks_mode_barrier = + Arc::new(Barrier::new(disk_farms.len())); + let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1)); + + let mut farms = Vec::with_capacity(disk_farms.len()); + let mut farms_stream = disk_farms + .into_iter() + .enumerate() + .map(|(farm_index, disk_farm)| { + let farmer_app_info = farmer_app_info.clone(); + let node_client = node_client.clone(); + let kzg = kzg.clone(); + let erasure_coding = erasure_coding.clone(); + let plotter = Arc::clone(&plotter); + let global_mutex = Arc::clone(&global_mutex); + let faster_read_sector_record_chunks_mode_barrier = + Arc::clone(&faster_read_sector_record_chunks_mode_barrier); + let faster_read_sector_record_chunks_mode_concurrency = + Arc::clone(&faster_read_sector_record_chunks_mode_concurrency); + + async move { + let farm_fut = SingleDiskFarm::new::<_, _, PosTable>( + SingleDiskFarmOptions { + directory: disk_farm.directory.clone(), + farmer_app_info, + allocated_space: disk_farm.allocated_plotting_space, + max_pieces_in_sector, + node_client, + reward_address, + kzg, + erasure_coding, + // Cache is provided by dedicated caches in farming cluster + cache_percentage: 0, + farming_thread_pool_size, + plotting_delay: None, + global_mutex, + disable_farm_locking, + faster_read_sector_record_chunks_mode_barrier, + faster_read_sector_record_chunks_mode_concurrency, + plotter, + }, + farm_index, + ); + + let farm = match farm_fut.await { + Ok(farm) => farm, + Err(SingleDiskFarmError::InsufficientAllocatedSpace { + min_space, + allocated_space, + }) => { + return ( + farm_index, + Err(anyhow!( + "Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \ + {} bytes to be exact)", + bytesize::to_string(allocated_space, true), + bytesize::to_string(allocated_space, false), + bytesize::to_string(min_space, true), + bytesize::to_string(min_space, false), + min_space + )), + ); + } + Err(error) => { + return (farm_index, Err(error.into())); + } + }; + + if !no_info { + let _info_guard = info_mutex.lock().await; + + let info = farm.info(); + info!("Farm {farm_index}:"); + info!(" ID: {}", info.id()); + info!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash())); + info!(" Public key: 0x{}", hex::encode(info.public_key())); + info!( + " Allocated space: {} ({})", + bytesize::to_string(info.allocated_space(), true), + bytesize::to_string(info.allocated_space(), false) + ); + info!(" Directory: {}", disk_farm.directory.display()); + } + + (farm_index, Ok(Box::new(farm) as Box)) + } + .instrument(info_span!("", %farm_index)) + }) + .collect::>(); + + while let Some((farm_index, farm)) = farms_stream.next().await { + if let Err(error) = &farm { + let span = info_span!("", %farm_index); + let _span_guard = span.enter(); + + error!(%error, "Farm creation failed"); + } + farms.push((farm_index, farm?)); + } + + // Restore order after unordered initialization + farms.sort_unstable_by_key(|(farm_index, _farm)| *farm_index); + + farms + .into_iter() + .map(|(_farm_index, farm)| farm) + .collect::>() + }; + + let total_and_plotted_sectors = farms + .iter() + .enumerate() + .map(|(farm_index, farm)| async move { + let total_sector_count = farm.total_sectors_count(); + let mut plotted_sectors_count = 0; + let plotted_sectors = farm.plotted_sectors(); + let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { + anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}") + })?; + while let Some(plotted_sector_result) = plotted_sectors.next().await { + plotted_sectors_count += 1; + plotted_sector_result.map_err(|error| { + anyhow!( + "Failed reading plotted sector on startup for farm {farm_index}: {error}" + ) + })?; + } + + anyhow::Ok((total_sector_count, plotted_sectors_count)) + }) + .collect::>() + .try_collect::>() + .await?; + + let mut farms_stream = (0u8..) + .zip(farms) + .zip(total_and_plotted_sectors) + .map(|((farm_index, farm), sector_counts)| { + let (total_sector_count, plotted_sectors_count) = sector_counts; + farmer_metrics.update_sectors_total( + farm.id(), + total_sector_count - plotted_sectors_count, + SectorState::NotPlotted, + ); + farmer_metrics.update_sectors_total( + farm.id(), + plotted_sectors_count, + SectorState::Plotted, + ); + farm.on_sector_update(Arc::new({ + let farm_id = *farm.id(); + let farmer_metrics = farmer_metrics.clone(); + + move |(_sector_index, sector_state)| match sector_state { + SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => { + farmer_metrics.sector_plotting.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => { + farmer_metrics.sector_downloading.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => { + farmer_metrics.observe_sector_downloading_time(&farm_id, time); + farmer_metrics.sector_downloaded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => { + farmer_metrics.sector_encoding.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => { + farmer_metrics.observe_sector_encoding_time(&farm_id, time); + farmer_metrics.sector_encoded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Writing) => { + farmer_metrics.sector_writing.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => { + farmer_metrics.observe_sector_writing_time(&farm_id, time); + farmer_metrics.sector_written.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Finished { time, .. }) => { + farmer_metrics.observe_sector_plotting_time(&farm_id, time); + farmer_metrics.sector_plotted.inc(); + farmer_metrics.update_sector_state(&farm_id, SectorState::Plotted); + } + SectorUpdate::Plotting(SectorPlottingDetails::Error(_)) => { + farmer_metrics.sector_plotting_error.inc(); + } + SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { + farmer_metrics.update_sector_state(&farm_id, SectorState::AboutToExpire); + } + SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { + farmer_metrics.update_sector_state(&farm_id, SectorState::Expired); + } + SectorUpdate::Expiration(SectorExpirationDetails::Determined { .. }) => { + // Not interested in here + } + } + })) + .detach(); + + farm.on_farming_notification(Arc::new({ + let farm_id = *farm.id(); + let farmer_metrics = farmer_metrics.clone(); + + move |farming_notification| match farming_notification { + FarmingNotification::Auditing(auditing_details) => { + farmer_metrics.observe_auditing_time(&farm_id, &auditing_details.time); + } + FarmingNotification::Proving(proving_details) => { + farmer_metrics.observe_proving_time( + &farm_id, + &proving_details.time, + proving_details.result, + ); + } + FarmingNotification::NonFatalError(error) => { + farmer_metrics.note_farming_error(&farm_id, error); + } + } + })) + .detach(); + + farm.run().map(move |result| (farm_index, result)) + }) + .collect::>(); + + let mut farm_errors = Vec::new(); + + let farm_fut = run_future_in_dedicated_thread( + move || async move { + while let Some((farm_index, result)) = farms_stream.next().await { + match result { + Ok(()) => { + info!(%farm_index, "Farm exited successfully"); + } + Err(error) => { + error!(%farm_index, %error, "Farm exited with error"); + + if farms_stream.is_empty() || exit_on_farm_error { + return Err(error); + } else { + farm_errors.push(AsyncJoinOnDrop::new( + tokio::spawn(async move { + loop { + tokio::time::sleep(FARM_ERROR_PRINT_INTERVAL).await; + + error!( + %farm_index, + %error, + "Farm errored and stopped" + ); + } + }), + true, + )) + } + } + } + } + anyhow::Ok(()) + }, + "farmer-farm".to_string(), + )?; + + let farm_fut = pin!(farm_fut); + + todo!() +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs new file mode 100644 index 00000000000..453b06f8382 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -0,0 +1,68 @@ +use crate::commands::shared::PlottingThreadPriority; +use clap::Parser; +use prometheus_client::registry::Registry; +use std::num::NonZeroUsize; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_proof_of_space::Table; + +/// Arguments for plotter +#[derive(Debug, Parser)] +pub(super) struct PlotterArgs { + /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of + /// the plotting process, defaults to `--sector-encoding-concurrency` + 1 to download future + /// sector ahead of time. + /// + /// Increase will result in higher memory usage. + #[arg(long)] + sector_downloading_concurrency: Option, + /// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and + /// number of NUMA nodes on NUMA system or L3 cache groups on large CPUs. It is further + /// restricted by + /// `--sector-downloading-concurrency` and setting this option higher than + /// `--sector-downloading-concurrency` will have no effect. + /// + /// Increase will result in higher memory usage. + #[arg(long)] + sector_encoding_concurrency: Option, + /// Defines how many record farmer will encode in a single sector concurrently, defaults to one + /// record per 2 cores, but not more than 8 in total. Higher concurrency means higher memory + /// usage and typically more efficient CPU utilization. + #[arg(long)] + record_encoding_concurrency: Option, + /// Size of one thread pool used for plotting, defaults to number of logical CPUs available + /// on UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache + /// groups on large CPUs. + /// + /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different + /// thread pools might have different number of threads if NUMA nodes do not have the same size. + /// + /// Threads will be pinned to corresponding CPU cores at creation. + #[arg(long)] + plotting_thread_pool_size: Option, + /// Specify exact CPU cores to be used for plotting bypassing any custom logic farmer might use + /// otherwise. It replaces both `--sector-encoding-concurrency` and + /// `--plotting-thread-pool-size` options if specified. Requires `--replotting-cpu-cores` to be + /// specified with the same number of CPU cores groups (or not specified at all, in which case + /// it'll use the same thread pool as plotting). + /// + /// Cores are coma-separated, with whitespace separating different thread pools/encoding + /// instances. For example "0,1 2,3" will result in two sectors being encoded at the same time, + /// each with a pair of CPU cores. + #[arg(long, conflicts_with_all = & ["sector_encoding_concurrency", "plotting_thread_pool_size"])] + plotting_cpu_cores: Option, + /// Plotting thread priority, by default de-prioritizes plotting threads in order to make sure + /// farming is successful and computer can be used comfortably for other things + #[arg(long, default_value_t = PlottingThreadPriority::Min)] + plotting_thread_priority: PlottingThreadPriority, +} + +pub(super) async fn plotter( + _nats_client: NatsClient, + _registry: &mut Registry, + _plotter_args: PlotterArgs, +) -> anyhow::Result<()> +where + PosTable: Table, +{ + todo!() +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index d5fc854343a..60506629cb8 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -656,7 +656,7 @@ where plotted_pieces.add_farm(farm_index, farm.piece_reader()); - let total_sector_count = farm.total_sectors_count(); + let total_sectors_count = farm.total_sectors_count(); let mut plotted_sectors_count = 0; let plotted_sectors = farm.plotted_sectors(); let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { @@ -675,7 +675,7 @@ where ) } - total_and_plotted_sectors.push((total_sector_count, plotted_sectors_count)); + total_and_plotted_sectors.push((total_sectors_count, plotted_sectors_count)); } info!("Finished collecting already plotted pieces successfully"); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 1a77b017055..1502d893ddc 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -137,8 +137,7 @@ where let read_piece_fut = match weak_plotted_pieces.upgrade() { Some(plotted_pieces) => plotted_pieces - .read() - .await + .try_read()? .read_piece(piece_index)? .in_current_span(), None => { diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 2f368efd1f0..a8c946f3002 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -1,4 +1,4 @@ -#![feature(const_option, type_changing_struct_update)] +#![feature(const_option, hash_extract_if, type_changing_struct_update)] mod commands; mod utils; @@ -24,6 +24,8 @@ type PosTable = ChiaTable; enum Command { /// Start a farmer, does plotting and farming Farm(commands::farm::FarmingArgs), + /// Farming cluster + Cluster(commands::cluster::ClusterArgs), /// Run various benchmarks #[clap(subcommand)] Benchmark(commands::benchmark::BenchmarkArgs), @@ -86,6 +88,9 @@ async fn main() -> anyhow::Result<()> { Command::Farm(farming_args) => { commands::farm::farm::(farming_args).await?; } + Command::Cluster(cluster_args) => { + commands::cluster::cluster::(cluster_args).await?; + } Command::Benchmark(benchmark_args) => { commands::benchmark::benchmark(benchmark_args)?; } diff --git a/crates/subspace-farmer/src/cluster.rs b/crates/subspace-farmer/src/cluster.rs new file mode 100644 index 00000000000..ab60e1382eb --- /dev/null +++ b/crates/subspace-farmer/src/cluster.rs @@ -0,0 +1,6 @@ +pub mod cache; +pub mod controller; +pub mod farmer; +pub mod nats_client; +pub mod node_client; +pub mod plotter; diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -0,0 +1 @@ + diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs new file mode 100644 index 00000000000..a95993ccd87 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -0,0 +1,567 @@ +use crate::cluster::nats_client::{ + GenericBroadcast, GenericNotification, GenericRequest, NatsClient, +}; +use crate::NodeClient; +use anyhow::anyhow; +use async_nats::{HeaderValue, Message}; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use std::future::{pending, Future}; +use std::pin::Pin; +use std::time::{Duration, Instant}; +use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; +use subspace_farmer_components::PieceGetter; +use subspace_rpc_primitives::{ + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, +}; +use tracing::{debug, trace, warn}; + +const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1); + +/// Broadcast sent by controllers requesting farmers to identify themselves +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub struct ClusterControllerFarmerIdentifyBroadcast; + +impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.controller.*.farmer-identify"; +} + +/// Broadcast with slot info sent by controllers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerSlotInfoBroadcast { + pub slot_info: SlotInfo, + pub instance: String, +} + +impl GenericBroadcast for ClusterControllerSlotInfoBroadcast { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.controller.slot-info"; + + fn deterministic_message_id(&self) -> Option { + // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might + // be simplified to just a slot number + Some(HeaderValue::from( + format!("slot-info-{}", self.slot_info.slot_number).as_str(), + )) + } +} + +/// Broadcast with reward signing info by controllers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerRewardSigningBroadcast { + pub reward_signing_info: RewardSigningInfo, +} + +impl GenericBroadcast for ClusterControllerRewardSigningBroadcast { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.controller.reward-signing-info"; +} + +/// Broadcast with archived segment headers by controllers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerArchivedSegmentHeaderBroadcast { + pub archived_segment_header: SegmentHeader, +} + +impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.controller.archived-segment-header"; + + fn deterministic_message_id(&self) -> Option { + // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might + // be simplified to just a segment index + Some(HeaderValue::from( + format!( + "archived-segment-{}", + self.archived_segment_header.segment_index() + ) + .as_str(), + )) + } +} + +/// Notification messages with solution by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerSolutionNotification { + pub solution_response: SolutionResponse, +} + +impl GenericNotification for ClusterControllerSolutionNotification { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.controller.*.solution"; +} + +/// Notification messages with reward signature by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerRewardSignatureNotification { + pub reward_signature: RewardSignatureResponse, +} + +impl GenericNotification for ClusterControllerRewardSignatureNotification { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.controller.*.reward-signature"; +} + +/// Request farmer app info from controller +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerFarmerAppInfoRequest; + +impl GenericRequest for ClusterControllerFarmerAppInfoRequest { + const SUBJECT: &'static str = "subspace.controller.farmer-app-info"; + type Response = FarmerAppInfo; +} + +/// Request segment headers with specified segment indices +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerSegmentHeadersRequest { + pub segment_indices: Vec, +} + +impl GenericRequest for ClusterControllerSegmentHeadersRequest { + const SUBJECT: &'static str = "subspace.controller.segment-headers"; + type Response = Vec>; +} + +/// Request piece with specified index +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerPieceRequest { + pub piece_index: PieceIndex, +} + +impl GenericRequest for ClusterControllerPieceRequest { + const SUBJECT: &'static str = "subspace.controller.piece-from-node"; + type Response = Option; +} + +// Create controller service that will be listing processing incoming requests +pub async fn controller_service( + nats_client: &NatsClient, + node_client: &NC, + piece_getter: &PG, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, + PG: PieceGetter + Sync, +{ + select! { + result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = solution_response_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = farmer_app_info_responder(nats_client, node_client).fuse() => { + result + }, + result = segment_headers_responder(nats_client, node_client).fuse() => { + result + }, + result = piece_responder(nats_client, piece_getter).fuse() => { + result + }, + } +} + +async fn slot_info_broadcaster( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut slot_info_notifications = node_client + .subscribe_slot_info() + .await + .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?; + + while let Some(slot_info) = slot_info_notifications.next().await { + debug!(?slot_info, "New slot"); + + let slot = slot_info.slot_number; + + if let Err(error) = nats_client + .broadcast( + &ClusterControllerSlotInfoBroadcast { + slot_info, + instance: instance.to_string(), + }, + instance, + ) + .await + { + warn!(%slot, %error, "Failed to broadcast slot info"); + } + } + + Ok(()) +} + +async fn reward_signing_broadcaster( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut reward_signing_notifications = node_client + .subscribe_reward_signing() + .await + .map_err(|error| anyhow!("Failed to subscribe to reward signing notifications: {error}"))?; + + while let Some(reward_signing_info) = reward_signing_notifications.next().await { + trace!(?reward_signing_info, "New reward signing notification"); + + if let Err(error) = nats_client + .broadcast( + &ClusterControllerRewardSigningBroadcast { + reward_signing_info, + }, + instance, + ) + .await + { + warn!(%error, "Failed to broadcast reward signing info"); + } + } + + Ok(()) +} + +async fn archived_segment_headers_broadcaster( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut archived_segments_notifications = node_client + .subscribe_archived_segment_headers() + .await + .map_err(|error| { + anyhow!("Failed to subscribe to archived segment header notifications: {error}") + })?; + + while let Some(archived_segment_header) = archived_segments_notifications.next().await { + trace!( + ?archived_segment_header, + "New archived archived segment header notification" + ); + + node_client + .acknowledge_archived_segment_header(archived_segment_header.segment_index()) + .await + .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?; + + if let Err(error) = nats_client + .broadcast( + &ClusterControllerArchivedSegmentHeaderBroadcast { + archived_segment_header, + }, + instance, + ) + .await + { + warn!(%error, "Failed to broadcast archived segment header info"); + } + } + + Ok(()) +} + +async fn solution_response_forwarder( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .subscribe( + ClusterControllerSolutionNotification::SUBSCRIPTION_SUBJECT.replace('*', instance), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?; + + while let Some(message) = subscription.next().await { + let notification = + match ClusterControllerSolutionNotification::decode(&mut message.payload.as_ref()) { + Ok(notification) => notification, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode solution notification" + ); + continue; + } + }; + + debug!(?notification, "Solution notification"); + + if let Err(error) = node_client + .submit_solution_response(notification.solution_response) + .await + { + warn!(%error, "Failed to send solution response"); + } + } + + Ok(()) +} + +async fn reward_signature_forwarder( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .subscribe( + ClusterControllerRewardSignatureNotification::SUBSCRIPTION_SUBJECT + .replace('*', instance), + ) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to reward signature notifications: {error}") + })?; + + while let Some(message) = subscription.next().await { + let notification = match ClusterControllerRewardSignatureNotification::decode( + &mut message.payload.as_ref(), + ) { + Ok(notification) => notification, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode reward signature notification" + ); + continue; + } + }; + + debug!(?notification, "Reward signature notification"); + + if let Err(error) = node_client + .submit_reward_signature(notification.reward_signature) + .await + { + warn!(%error, "Failed to send reward signature"); + } + } + + Ok(()) +} + +async fn farmer_app_info_responder( + nats_client: &NatsClient, + node_client: &NC, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .queue_subscribe( + ClusterControllerFarmerAppInfoRequest::SUBJECT, + "subspace.controller".to_string(), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to farmer app info requests: {error}"))?; + + let mut last_farmer_app_info: ::Response = node_client + .farmer_app_info() + .await + .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; + let mut last_farmer_app_info_request = Instant::now(); + + while let Some(message) = subscription.next().await { + trace!("Farmer app info request"); + + if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW { + match node_client.farmer_app_info().await { + Ok(new_last_farmer_app_info) => { + last_farmer_app_info = new_last_farmer_app_info; + last_farmer_app_info_request = Instant::now(); + } + Err(error) => { + warn!(%error, "Failed to get farmer app info"); + } + } + } + + if let Some(reply_subject) = message.reply { + if let Err(error) = nats_client + .publish(reply_subject, last_farmer_app_info.encode().into()) + .await + { + warn!(%error, "Failed to send farmer app info response"); + } + } + } + + Ok(()) +} + +async fn segment_headers_responder( + nats_client: &NatsClient, + node_client: &NC, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .queue_subscribe( + ClusterControllerSegmentHeadersRequest::SUBJECT, + "subspace.controller".to_string(), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to segment headers requests: {error}"))?; + + let mut last_request_response = None::<( + ClusterControllerSegmentHeadersRequest, + ::Response, + )>; + + while let Some(message) = subscription.next().await { + let request = + match ClusterControllerSegmentHeadersRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode segment headers request" + ); + continue; + } + }; + trace!(?request, "Segment headers request"); + + let response = if let Some((last_request, response)) = &last_request_response + && last_request.segment_indices == request.segment_indices + { + response + } else { + match node_client + .segment_headers(request.segment_indices.clone()) + .await + { + Ok(segment_headers) => &last_request_response.insert((request, segment_headers)).1, + Err(error) => { + warn!( + %error, + segment_indices = ?request.segment_indices, + "Failed to get segment headers" + ); + continue; + } + } + }; + + if let Some(reply_subject) = message.reply { + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send farmer app info response"); + } + } + } + + Ok(()) +} + +// TODO: Smarter piece handling with requests for cached pieces being redirected to cache instances +// instead +async fn piece_responder(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()> +where + PG: PieceGetter + Sync, +{ + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + + let mut subscription = nats_client + .queue_subscribe( + ClusterControllerPieceRequest::SUBJECT, + "subspace.controller".to_string(), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to piece requests: {error}"))? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + if let Some(message) = maybe_message { + // Create background task for concurrent processing + processing.push(Box::pin(process_piece_request( + nats_client, + piece_getter, + message, + ))); + } else { + break; + } + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) +} + +async fn process_piece_request(nats_client: &NatsClient, piece_getter: &PG, message: Message) +where + PG: PieceGetter, +{ + let request = match ClusterControllerPieceRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode piece request" + ); + return; + } + }; + trace!(?request, "Piece request"); + + let maybe_piece: ::Response = + match piece_getter.get_piece(request.piece_index).await { + Ok(maybe_piece) => maybe_piece, + Err(error) => { + warn!( + %error, + piece_index = %request.piece_index, + "Failed to get piece" + ); + return; + } + }; + + if let Some(reply_subject) = message.reply { + if let Err(error) = nats_client + .publish(reply_subject, maybe_piece.encode().into()) + .await + { + warn!(%error, "Failed to send farmer app info response"); + } + } +} diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs new file mode 100644 index 00000000000..bec8c17618c --- /dev/null +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -0,0 +1,907 @@ +use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; +use crate::cluster::nats_client::{ + GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, + GenericStreamResponses, NatsClient, StreamRequest, APPROXIMATE_MAX_MESSAGE_SIZE, +}; +use crate::farm::{ + Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, MaybePieceStoredResult, + PieceCache, PieceCacheOffset, PieceReader, PlotCache, PlottedSectors, SectorUpdate, +}; +use crate::utils::AsyncJoinOnDrop; +use anyhow::anyhow; +use async_nats::Message; +use async_trait::async_trait; +use event_listener_primitives::Bag; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::{select, stream, FutureExt, Stream, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use std::collections::VecDeque; +use std::future::{pending, Future}; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::Duration; +use subspace_core_primitives::crypto::blake3_hash_list; +use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex, PieceOffset, SectorIndex}; +use subspace_farmer_components::plotting::PlottedSector; +use subspace_networking::libp2p::kad::RecordKey; +use subspace_rpc_primitives::SolutionResponse; +use tokio::time::MissedTickBehavior; +use tracing::{debug, error, trace, warn}; + +const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000; + +type Handler = Bag, A>; + +/// Notification with identification details by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerIdentifyFarmNotification { + /// Farm ID + pub farm_id: FarmId, + /// Total number of sectors in the farm + pub total_sectors_count: SectorIndex, + /// Farm fingerprint changes when something about farm changes (like space pledged) + pub fingerprint: Blake3Hash, +} + +impl GenericNotification for ClusterFarmerIdentifyFarmNotification { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.farmer.*.identify"; +} + +/// Broadcast with sector updates by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerSectorUpdateBroadcast { + /// Farm ID + pub farm_id: FarmId, + /// Sector index + pub sector_index: SectorIndex, + /// Sector update + pub sector_update: SectorUpdate, +} + +impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.farmer.*.sector-update"; +} + +/// Broadcast with farming notifications by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmingNotificationBroadcast { + /// Farm ID + pub farm_id: FarmId, + /// Farming notification + pub farming_notification: FarmingNotification, +} + +impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.farmer.*.farming-notification"; +} + +/// Broadcast with solutions by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerSolutionBroadcast { + /// Farm ID + pub farm_id: FarmId, + /// Solution response + pub solution_response: SolutionResponse, +} + +impl GenericBroadcast for ClusterFarmerSolutionBroadcast { + const SUBSCRIPTION_SUBJECT: &'static str = "subspace.farmer.*.solution-response"; +} + +/// Read piece from farm +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterFarmerReadPieceRequest { + sector_index: SectorIndex, + piece_offset: PieceOffset, +} + +impl GenericRequest for ClusterFarmerReadPieceRequest { + const SUBJECT: &'static str = "subspace.farmer.*.farm.read-piece"; + type Response = Result, String>; +} + +/// Request plotted from farmer, request +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterFarmerPlottedSectorsRequest; + +impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest { + const SUBJECT: &'static str = "subspace.farmer.*.farm.plotted-sectors"; + type Response = Result; +} + +#[derive(Debug)] +struct ClusterPlottedSectors { + instance: String, + nats_client: NatsClient, +} + +#[async_trait] +impl PlottedSectors for ClusterPlottedSectors { + async fn get( + &self, + ) -> Result< + Box> + Unpin + Send + '_>, + FarmError, + > { + // TODO: Timeouts + Ok(Box::new( + self.nats_client + .stream_request(ClusterFarmerPlottedSectorsRequest, Some(&self.instance)) + .await? + .map(|response| response.map_err(FarmError::from)), + )) + } +} + +#[derive(Debug)] +struct DummyPieceCache; + +#[async_trait] +impl PieceCache for DummyPieceCache { + fn max_num_elements(&self) -> usize { + 0 + } + + async fn contents( + &self, + ) -> Box)> + Unpin + Send + '_> { + Box::new(stream::empty()) + } + + async fn write_piece( + &self, + _offset: PieceCacheOffset, + _piece_index: PieceIndex, + _piece: &Piece, + ) -> Result<(), FarmError> { + Err("Can't write pieces into empty cache".into()) + } + + async fn read_piece_index( + &self, + _offset: PieceCacheOffset, + ) -> Result, FarmError> { + Ok(None) + } + + async fn read_piece(&self, _offset: PieceCacheOffset) -> Result, FarmError> { + Ok(None) + } +} + +#[derive(Debug)] +struct DummyPlotCache; + +#[async_trait] +impl PlotCache for DummyPlotCache { + async fn is_piece_maybe_stored( + &self, + _key: &RecordKey, + ) -> Result { + Ok(MaybePieceStoredResult::No) + } + + async fn try_store_piece( + &self, + _piece_index: PieceIndex, + _piece: &Piece, + ) -> Result { + Ok(false) + } + + async fn read_piece(&self, _key: &RecordKey) -> Result, FarmError> { + Ok(None) + } +} + +#[derive(Debug)] +struct ClusterPieceReader { + instance: String, + nats_client: NatsClient, +} + +#[async_trait] +impl PieceReader for ClusterPieceReader { + async fn read_piece( + &self, + sector_index: SectorIndex, + piece_offset: PieceOffset, + ) -> Result, FarmError> { + Ok(self + .nats_client + .request( + &ClusterFarmerReadPieceRequest { + sector_index, + piece_offset, + }, + Some(&self.instance), + ) + .await??) + } +} + +#[derive(Default, Debug)] +struct Handlers { + sector_update: Handler<(SectorIndex, SectorUpdate)>, + farming_notification: Handler, + solution: Handler, +} + +#[derive(Debug)] +pub struct ClusterFarm { + farm_id: FarmId, + farm_id_string: String, + total_sectors_count: SectorIndex, + nats_client: NatsClient, + handlers: Arc, + background_tasks: AsyncJoinOnDrop<()>, +} + +#[async_trait(?Send)] +impl Farm for ClusterFarm { + fn id(&self) -> &FarmId { + &self.farm_id + } + + fn total_sectors_count(&self) -> SectorIndex { + self.total_sectors_count + } + + fn plotted_sectors(&self) -> Arc { + Arc::new(ClusterPlottedSectors { + instance: self.farm_id_string.clone(), + nats_client: self.nats_client.clone(), + }) + } + + fn piece_cache(&self) -> Arc { + Arc::new(DummyPieceCache) + } + + fn plot_cache(&self) -> Arc { + Arc::new(DummyPlotCache) + } + + fn piece_reader(&self) -> Arc { + Arc::new(ClusterPieceReader { + instance: self.farm_id_string.clone(), + nats_client: self.nats_client.clone(), + }) + } + + fn on_sector_update( + &self, + callback: HandlerFn<(SectorIndex, SectorUpdate)>, + ) -> Box { + Box::new(self.handlers.sector_update.add(callback)) + } + + fn on_farming_notification( + &self, + callback: HandlerFn, + ) -> Box { + Box::new(self.handlers.farming_notification.add(callback)) + } + + fn on_solution(&self, callback: HandlerFn) -> Box { + Box::new(self.handlers.solution.add(callback)) + } + + fn run(self: Box) -> Pin> + Send>> { + Box::pin(async move { Ok(self.background_tasks.await?) }) + } +} + +impl ClusterFarm { + pub async fn new( + farm_id: FarmId, + total_sectors_count: SectorIndex, + nats_client: NatsClient, + ) -> anyhow::Result { + let farm_id_string = farm_id.to_string(); + + let sector_updates_subscription = nats_client + .subscribe( + ClusterFarmerSectorUpdateBroadcast::SUBSCRIPTION_SUBJECT + .replace('*', &farm_id_string), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))? + .filter_map(move |message| async move { + match ClusterFarmerSectorUpdateBroadcast::decode(&mut message.payload.as_ref()) { + Ok(request) => Some(request), + Err(error) => { + warn!( + %error, + %farm_id, + message = %hex::encode(message.payload), + "Failed to decode sector updates broadcast" + ); + None + } + } + }); + let farming_notifications_subscription = nats_client + .subscribe( + ClusterFarmerFarmingNotificationBroadcast::SUBSCRIPTION_SUBJECT + .replace('*', &farm_id_string), + ) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to farming notifications broadcast: {error}") + })? + .filter_map(move |message| async move { + match ClusterFarmerFarmingNotificationBroadcast::decode( + &mut message.payload.as_ref(), + ) { + Ok(request) => Some(request), + Err(error) => { + warn!( + %error, + %farm_id, + message = %hex::encode(message.payload), + "Failed to decode farming notifications broadcast" + ); + None + } + } + }); + let solution_subscription = nats_client + .subscribe( + ClusterFarmerSolutionBroadcast::SUBSCRIPTION_SUBJECT.replace('*', &farm_id_string), + ) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to solution responses broadcast: {error}") + })? + .filter_map(move |message| async move { + match ClusterFarmerSolutionBroadcast::decode(&mut message.payload.as_ref()) { + Ok(request) => Some(request), + Err(error) => { + warn!( + %error, + %farm_id, + message = %hex::encode(message.payload), + "Failed to decode solution responses broadcast" + ); + None + } + } + }); + + let handlers = Arc::::default(); + // Run background tasks and fire corresponding notifications + let background_tasks = AsyncJoinOnDrop::new( + tokio::spawn({ + let handlers = Arc::clone(&handlers); + + async move { + let mut sector_updates_subscription = pin!(sector_updates_subscription); + let mut farming_notifications_subscription = + pin!(farming_notifications_subscription); + let mut solution_subscription = pin!(solution_subscription); + + let sector_updates_fut = async { + while let Some(ClusterFarmerSectorUpdateBroadcast { + sector_index, + sector_update, + .. + }) = sector_updates_subscription.next().await + { + handlers + .sector_update + .call_simple(&(sector_index, sector_update)); + } + }; + let farming_notifications_fut = async { + while let Some(ClusterFarmerFarmingNotificationBroadcast { + farming_notification, + .. + }) = farming_notifications_subscription.next().await + { + handlers + .farming_notification + .call_simple(&farming_notification); + } + }; + let solutions_fut = async { + while let Some(ClusterFarmerSolutionBroadcast { + solution_response, .. + }) = solution_subscription.next().await + { + handlers.solution.call_simple(&solution_response); + } + }; + + select! { + _ = sector_updates_fut.fuse() => {} + _ = farming_notifications_fut.fuse() => {} + _ = solutions_fut.fuse() => {} + } + } + }), + true, + ); + + Ok(Self { + farm_id, + farm_id_string: farm_id.to_string(), + total_sectors_count, + nats_client, + handlers, + background_tasks, + }) + } +} + +/// Details about the farm +#[derive(Debug)] +struct FarmDetails { + farm_id: FarmId, + farm_id_string: String, + total_sectors_count: SectorIndex, + piece_reader: Arc, + plotted_sectors: Arc, + _background_tasks: AsyncJoinOnDrop<()>, +} + +/// Create farmer service for specified farms that will be processing incoming requests and send +/// periodic identify notifications +pub fn farmer_service( + nats_client: NatsClient, + farms: &[F], + identification_notification_interval: Duration, +) -> impl Future> + Send + 'static +where + F: Farm, +{ + // For each farm start forwarding notifications as broadcast messages and create farm details + // that can be used to respond to incoming requests + let farms_details = farms + .iter() + .map(|farm| { + let farm_id = *farm.id(); + let nats_client = nats_client.clone(); + + let (sector_updates_sender, mut sector_updates_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + let (farming_notifications_sender, mut farming_notifications_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + let (solutions_sender, mut solutions_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + + let sector_updates_handler_id = + farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| { + if let Err(error) = + sector_updates_sender + .clone() + .try_send(ClusterFarmerSectorUpdateBroadcast { + farm_id, + sector_index: *sector_index, + sector_update: sector_update.clone(), + }) + { + warn!(%farm_id, %error, "Failed to send sector update notification"); + } + })); + + let farming_notifications_handler_id = + farm.on_farming_notification(Arc::new(move |farming_notification| { + if let Err(error) = farming_notifications_sender.clone().try_send( + ClusterFarmerFarmingNotificationBroadcast { + farm_id, + farming_notification: farming_notification.clone(), + }, + ) { + warn!(%farm_id, %error, "Failed to send farming notification"); + } + })); + + let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| { + if let Err(error) = + solutions_sender + .clone() + .try_send(ClusterFarmerSolutionBroadcast { + farm_id, + solution_response: solution_response.clone(), + }) + { + warn!(%farm_id, %error, "Failed to send solution notification"); + } + })); + + let background_tasks = AsyncJoinOnDrop::new( + tokio::spawn(async move { + let farm_id_string = farm_id.to_string(); + + let sector_updates_fut = async { + while let Some(broadcast) = sector_updates_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast sector update"); + } + } + }; + let farming_notifications_fut = async { + while let Some(broadcast) = farming_notifications_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast farming notification"); + } + } + }; + let solutions_fut = async { + while let Some(broadcast) = solutions_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast solution"); + } + } + }; + + select! { + _ = sector_updates_fut.fuse() => {} + _ = farming_notifications_fut.fuse() => {} + _ = solutions_fut.fuse() => {} + } + + drop(sector_updates_handler_id); + drop(farming_notifications_handler_id); + drop(solutions_handler_id); + }), + true, + ); + + FarmDetails { + farm_id, + farm_id_string: farm_id.to_string(), + total_sectors_count: farm.total_sectors_count(), + piece_reader: farm.piece_reader(), + plotted_sectors: farm.plotted_sectors(), + _background_tasks: background_tasks, + } + }) + .collect::>(); + + async move { + select! { + result = identity_responder(&nats_client, &farms_details, identification_notification_interval).fuse() => { + result + }, + result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { + result + }, + result = read_piece_responder(&nats_client, &farms_details).fuse() => { + result + }, + } + } +} + +// Listen for farmer identification broadcast from controller and publish identification +// broadcast in response, also send periodic notifications reminding that farm exists +async fn identity_responder( + nats_client: &NatsClient, + farms_details: &[FarmDetails], + identification_notification_interval: Duration, +) -> anyhow::Result<()> { + let mut subscription = nats_client + .subscribe(ClusterControllerFarmerIdentifyBroadcast::SUBSCRIPTION_SUBJECT) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}") + })? + .fuse(); + // Also send periodic updates in addition to the subscription response + let mut interval = tokio::time::interval(identification_notification_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + select! { + maybe_message = subscription.next() => { + if let Some(message) = maybe_message { + trace!(?message, "Farmer received identify broadcast message"); + + send_identify_notification(nats_client, farms_details).await; + interval.reset(); + } else { + debug!("Identify broadcast stream ended"); + break; + } + } + _ = interval.tick().fuse() => { + trace!("Farmer self-identification"); + + send_identify_notification(nats_client, farms_details).await; + } + } + } + + Ok(()) +} + +async fn send_identify_notification(nats_client: &NatsClient, farms_details: &[FarmDetails]) { + farms_details + .iter() + .map(|farm_details| async move { + if let Err(error) = nats_client + .notification( + &ClusterFarmerIdentifyFarmNotification { + farm_id: farm_details.farm_id, + total_sectors_count: farm_details.total_sectors_count, + fingerprint: blake3_hash_list(&[ + &farm_details.farm_id.encode(), + &farm_details.total_sectors_count.to_le_bytes(), + ]), + }, + Some(&farm_details.farm_id_string), + ) + .await + { + warn!( + farm_id = %farm_details.farm_id, + %error, + "Failed to send farmer identify notification" + ); + } + }) + .collect::>() + .collect::>() + .await; +} + +async fn plotted_sectors_responder( + nats_client: &NatsClient, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + farms_details + .iter() + .map(|farm_details| async move { + // Initialize with pending future so it never ends + let mut processing = + FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + let mut subscription = nats_client + .subscribe( + ClusterFarmerPlottedSectorsRequest::SUBJECT + .replace('*', &farm_details.farm_id_string), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to plotted sectors requests for farm {}: {}", + farm_details.farm_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + if let Some(message) = maybe_message { + // Create background task for concurrent processing + processing.push(Box::pin(process_plotted_sectors_request( + nats_client, + farm_details, + message, + ))); + } else { + break; + } + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No farms"))? +} + +async fn process_plotted_sectors_request( + nats_client: &NatsClient, + farm_details: &FarmDetails, + message: Message, +) { + type Request = StreamRequest; + type Response = GenericStreamResponses< + ::Response, + >; + + let request = match Request::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + farm_id = %farm_details.farm_id, + message = %hex::encode(message.payload), + "Failed to decode plotted sectors request" + ); + return; + } + }; + + trace!(?request, "Plotted sectors request"); + + let mut plotted_sectors = match farm_details.plotted_sectors.get().await { + Ok(plotted_sectors) => plotted_sectors + .map(|maybe_plotted_sector| maybe_plotted_sector.map_err(|error| error.to_string())), + Err(error) => { + error!( + %error, + farm_id = %farm_details.farm_id, + "Failed to get plotted sectors" + ); + + let error_message = format!("Failed to get plotted sectors: {error}"); + let error_response = Response::Last(VecDeque::from([Err(error_message)])); + + if let Err(error) = nats_client + .publish(request.response_subject, error_response.encode().into()) + .await + { + warn!(%error, "Failed to send plotted sectors response"); + } + return; + } + }; + + // Pull the first plotted sector to measure response size + let first_plotted_sector = match plotted_sectors.next().await { + Some(first_plotted_sector) => first_plotted_sector, + None => { + if let Err(error) = nats_client + .publish( + request.response_subject.clone(), + Response::Last(VecDeque::new()).encode().into(), + ) + .await + { + warn!(%error, "Failed to send plotted sectors response"); + } + + return; + } + }; + let max_responses_per_message = + APPROXIMATE_MAX_MESSAGE_SIZE / first_plotted_sector.encoded_size(); + + // Initialize buffer that will be reused for responses + let mut buffer = VecDeque::with_capacity(max_responses_per_message); + buffer.push_back(first_plotted_sector); + + loop { + // Try to fill the buffer + let mut local_plotted_sectors = plotted_sectors + .by_ref() + .take(max_responses_per_message - buffer.len()); + while let Some(plotted_sector) = local_plotted_sectors.next().await { + buffer.push_back(plotted_sector); + } + + let full = buffer.len() == max_responses_per_message; + let response = if full { + Response::Continue(buffer) + } else { + Response::Last(buffer) + }; + + if let Err(error) = nats_client + .publish(request.response_subject.clone(), response.encode().into()) + .await + { + warn!(%error, "Failed to send plotted sectors response"); + return; + } + + if full { + buffer = response.into(); + } else { + return; + } + } +} + +async fn read_piece_responder( + nats_client: &NatsClient, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + farms_details + .iter() + .map(|farm_details| async move { + // Initialize with pending future so it never ends + let mut processing = + FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + let mut subscription = nats_client + .subscribe( + ClusterFarmerReadPieceRequest::SUBJECT + .replace('*', &farm_details.farm_id_string), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to read piece requests for farm {}: {}", + farm_details.farm_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + if let Some(message) = maybe_message { + // Create background task for concurrent processing + processing.push(Box::pin(process_read_piece_request( + nats_client, + farm_details, + message, + ))); + } else { + break; + } + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No farms"))? +} + +async fn process_read_piece_request( + nats_client: &NatsClient, + farms_details: &FarmDetails, + message: Message, +) { + let Some(reply_subject) = message.reply else { + return; + }; + + let ClusterFarmerReadPieceRequest { + sector_index, + piece_offset, + } = match ClusterFarmerReadPieceRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode read piece request" + ); + return; + } + }; + + let response: ::Response = farms_details + .piece_reader + .read_piece(sector_index, piece_offset) + .await + .map_err(|error| error.to_string()); + + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send read piece response"); + } +} diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs new file mode 100644 index 00000000000..b25476f9671 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -0,0 +1,355 @@ +use async_nats::{ + Client, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind, Subject, + SubscribeError, Subscriber, ToServerAddrs, +}; +use derive_more::{Deref, DerefMut}; +use futures::{Stream, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use std::any::type_name; +use std::collections::VecDeque; +use std::fmt; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; +use thiserror::Error; +use tracing::warn; +use ulid::Ulid; + +/// Approximate max message size (a few more bytes will not hurt), the actual limit is expected to +/// be 2M in NATS +pub const APPROXIMATE_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024 * 10 / 8; + +/// Generic request with associated response +pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { + /// Request subject with optional `*` in place of application instance to receive the request + const SUBJECT: &'static str; + /// Response type that corresponds to this request + type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; +} + +/// Generic stream request where response is streamed using [`GenericStreamResponses`] +pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { + /// Request subject with optional `*` in place of application instance to receive the request + const SUBJECT: &'static str; + /// Response type that corresponds to this stream request + type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; +} + +/// Messages sent in response to [`StreamRequest`]. +/// +/// Empty list of responses means the end of the stream. +#[derive(Debug, Encode, Decode)] +pub enum GenericStreamResponses { + Continue(VecDeque), + Last(VecDeque), +} + +impl From> for VecDeque { + fn from(value: GenericStreamResponses) -> Self { + match value { + GenericStreamResponses::Continue(responses) => responses, + GenericStreamResponses::Last(responses) => responses, + } + } +} + +impl GenericStreamResponses { + fn next(&mut self) -> Option { + match self { + GenericStreamResponses::Continue(responses) => responses.pop_front(), + GenericStreamResponses::Last(responses) => responses.pop_front(), + } + } + + fn is_last(&self) -> bool { + matches!(self, Self::Last(_)) + } +} + +/// Generic stream request that expects a stream of responses. +/// +/// Internally it is expected that [`GenericStreamResponses`] messages will be +/// sent to auto-generated subject specified in `response_subject` field. +#[derive(Debug, Encode, Decode)] +#[non_exhaustive] +pub struct StreamRequest +where + Request: GenericStreamRequest, +{ + /// Request + pub request: Request, + /// Topic to stream [`GenericStreamResponses`]s to + pub response_subject: String, +} + +impl StreamRequest +where + Request: GenericStreamRequest, +{ + /// Create new stream request + pub fn new(request: Request) -> Self { + Self { + request, + response_subject: format!("stream-response.{}", Ulid::new()), + } + } +} + +/// Stream request error +#[derive(Debug, Error)] +pub enum StreamRequestError { + /// Subscribe error + #[error("Subscribe error: {0}")] + Subscribe(#[from] SubscribeError), + /// Publish error + #[error("Publish error: {0}")] + Publish(#[from] PublishError), +} + +#[derive(Debug, Deref, DerefMut)] +#[pin_project::pin_project] +pub struct StreamResponseSubscriber { + #[pin] + #[deref] + #[deref_mut] + subscriber: Subscriber, + buffered_responses: GenericStreamResponses, + _phantom: PhantomData, +} + +impl Stream for StreamResponseSubscriber +where + Response: Decode, +{ + type Item = Response; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(response) = self.buffered_responses.next() { + return Poll::Ready(Some(response)); + } else if self.buffered_responses.is_last() { + return Poll::Ready(None); + } + + let mut projected = self.project(); + match projected.subscriber.poll_next_unpin(cx) { + Poll::Ready(Some(message)) => { + match GenericStreamResponses::::decode(&mut message.payload.as_ref()) { + Ok(mut responses) => { + if let Some(response) = responses.next() { + *projected.buffered_responses = responses; + Poll::Ready(Some(response)) + } else { + Poll::Ready(None) + } + } + Err(error) => { + warn!( + %error, + message_type = %type_name::(), + message = %hex::encode(message.payload), + "Failed to decode stream response" + ); + + Poll::Ready(None) + } + } + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +/// Generic one-off notification +pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static { + /// Request subject with optional `*` in place of application instance to receive the request + const SUBSCRIPTION_SUBJECT: &'static str; +} + +/// Generic broadcast message. +/// +/// Broadcast messages are sent by an instance to (potentially) an instance-specific subject that +/// any other app can subscribe to. The same broadcast message can also originate from multiple +/// places and be de-duplicated using [`Self::deterministic_message_id`]. +pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static { + /// Subject that can be used for subscriptions with optional `*` in place of application + /// instance sending broadcast + const SUBSCRIPTION_SUBJECT: &'static str; + + /// Deterministic message ID that is used for de-duplicating messages broadcast by different + /// instances + fn deterministic_message_id(&self) -> Option { + None + } +} + +// TODO: Use or remove +#[derive(Debug, Deref, DerefMut)] +#[pin_project::pin_project] +pub struct SubscriberWrapper { + #[pin] + #[deref] + #[deref_mut] + subscriber: Subscriber, + _phantom: PhantomData, +} + +impl Stream for SubscriberWrapper +where + Message: Decode, +{ + type Item = Message; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project().subscriber.poll_next_unpin(cx) { + Poll::Ready(Some(message)) => match Message::decode(&mut message.payload.as_ref()) { + Ok(message) => Poll::Ready(Some(message)), + Err(error) => { + warn!( + %error, + message_type = %type_name::(), + message = %hex::encode(message.payload), + "Failed to decode stream message" + ); + + Poll::Pending + } + }, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +/// NATS client wrapper that can be used to interact with other Subspace-specific clients +#[derive(Debug, Clone, Deref)] +pub struct NatsClient { + client: Client, +} + +impl From for NatsClient { + fn from(client: Client) -> Self { + Self { client } + } +} + +impl NatsClient { + /// Create new instance by connecting to specified addresses + pub async fn new(addrs: A) -> Result { + Ok(Self { + client: async_nats::connect(addrs).await?, + }) + } + + /// Make request and wait for response + pub async fn request( + &self, + request: &Request, + instance: Option<&str>, + ) -> Result + where + Request: GenericRequest, + { + let subject = if let Some(instance) = instance { + Subject::from(Request::SUBJECT.replace('*', instance)) + } else { + Subject::from_static(Request::SUBJECT) + }; + let message = self + .client + .request(subject.clone(), request.encode().into()) + .await?; + + let response = + Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| { + warn!( + %subject, + %error, + response_type = %type_name::(), + response = %hex::encode(message.payload), + "Response decoding failed" + ); + + RequestErrorKind::Other + })?; + + Ok(response) + } + + /// Make request that expects stream response + pub async fn stream_request( + &self, + request: Request, + instance: Option<&str>, + ) -> Result, StreamRequestError> + where + Request: GenericStreamRequest, + { + let stream_request = StreamRequest::new(request); + + let subscriber = self + .client + .subscribe(stream_request.response_subject.clone()) + .await?; + + let subject = if let Some(instance) = instance { + Subject::from(Request::SUBJECT.replace('*', instance)) + } else { + Subject::from_static(Request::SUBJECT) + }; + + self.client + .publish(subject, stream_request.encode().into()) + .await?; + + Ok(StreamResponseSubscriber { + subscriber, + buffered_responses: GenericStreamResponses::Continue(VecDeque::new()), + _phantom: PhantomData, + }) + } + + /// Make notification without waiting for response + pub async fn notification( + &self, + notification: &Notification, + instance: Option<&str>, + ) -> Result<(), PublishError> + where + Notification: GenericNotification, + { + let subject = if let Some(instance) = instance { + Subject::from(Notification::SUBSCRIPTION_SUBJECT.replace('*', instance)) + } else { + Subject::from_static(Notification::SUBSCRIPTION_SUBJECT) + }; + + self.client + .publish(subject, notification.encode().into()) + .await + } + + /// Send a broadcast message + pub async fn broadcast( + &self, + message: &Broadcast, + instance: &str, + ) -> Result<(), PublishError> + where + Broadcast: GenericBroadcast, + { + self.client + .publish_with_headers( + Broadcast::SUBSCRIPTION_SUBJECT.replace('*', instance), + { + let mut headers = HeaderMap::new(); + if let Some(message_id) = message.deterministic_message_id() { + headers.insert("Nats-Msg-Id", message_id); + } + headers + }, + message.encode().into(), + ) + .await + } +} diff --git a/crates/subspace-farmer/src/cluster/node_client.rs b/crates/subspace-farmer/src/cluster/node_client.rs new file mode 100644 index 00000000000..f288b8077b7 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/node_client.rs @@ -0,0 +1,165 @@ +use crate::cluster::controller::{ + ClusterControllerArchivedSegmentHeaderBroadcast, ClusterControllerFarmerAppInfoRequest, + ClusterControllerPieceRequest, ClusterControllerRewardSignatureNotification, + ClusterControllerRewardSigningBroadcast, ClusterControllerSegmentHeadersRequest, + ClusterControllerSlotInfoBroadcast, ClusterControllerSolutionNotification, +}; +use crate::cluster::nats_client::{GenericBroadcast, NatsClient}; +use crate::node_client::{Error as RpcError, Error, NodeClient}; +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use parity_scale_codec::Decode; +use parking_lot::Mutex; +use std::pin::Pin; +use std::sync::Arc; +use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; +use subspace_rpc_primitives::{ + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, +}; + +/// [`NodeClient`] used in cluster environment that connects to node through a controller instead +/// of to the node directly +#[derive(Debug, Clone)] +pub struct ClusterNodeClient { + client: NatsClient, + // Store last slot info instance that can be used to send solution response to (some instances + // may be not synced and not able to receive solution responses) + last_slot_info_instance: Arc>, +} + +impl ClusterNodeClient { + /// Create a new instance + pub fn new(client: NatsClient) -> Self { + Self { + client, + last_slot_info_instance: Arc::default(), + } + } +} + +#[async_trait] +impl NodeClient for ClusterNodeClient { + async fn farmer_app_info(&self) -> Result { + Ok(self + .client + .request(&ClusterControllerFarmerAppInfoRequest, None) + .await?) + } + + async fn subscribe_slot_info( + &self, + ) -> Result + Send + 'static>>, RpcError> { + let subscription = self + .client + .subscribe(ClusterControllerSlotInfoBroadcast::SUBSCRIPTION_SUBJECT) + .await?; + let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance); + + Ok(Box::pin(subscription.filter_map(move |message| { + let maybe_slot_info = + ClusterControllerSlotInfoBroadcast::decode(&mut message.payload.as_ref()) + .ok() + .map(|message| { + *last_slot_info_instance.lock() = message.instance; + + message.slot_info + }); + + async move { maybe_slot_info } + }))) + } + + async fn submit_solution_response( + &self, + solution_response: SolutionResponse, + ) -> Result<(), RpcError> { + let last_slot_info_instance = self.last_slot_info_instance.lock().clone(); + Ok(self + .client + .notification( + &ClusterControllerSolutionNotification { solution_response }, + Some(&last_slot_info_instance), + ) + .await?) + } + + async fn subscribe_reward_signing( + &self, + ) -> Result + Send + 'static>>, RpcError> { + let subscription = self + .client + .subscribe(ClusterControllerRewardSigningBroadcast::SUBSCRIPTION_SUBJECT) + .await?; + + Ok(Box::pin(subscription.filter_map(|message| { + let maybe_slot_info = + ClusterControllerRewardSigningBroadcast::decode(&mut message.payload.as_ref()) + .ok() + .map(|message| message.reward_signing_info); + + async move { maybe_slot_info } + }))) + } + + /// Submit a block signature + async fn submit_reward_signature( + &self, + reward_signature: RewardSignatureResponse, + ) -> Result<(), RpcError> { + let last_slot_info_instance = self.last_slot_info_instance.lock().clone(); + Ok(self + .client + .notification( + &ClusterControllerRewardSignatureNotification { reward_signature }, + Some(&last_slot_info_instance), + ) + .await?) + } + + async fn subscribe_archived_segment_headers( + &self, + ) -> Result + Send + 'static>>, RpcError> { + let subscription = self + .client + .subscribe(ClusterControllerArchivedSegmentHeaderBroadcast::SUBSCRIPTION_SUBJECT) + .await?; + + Ok(Box::pin(subscription.filter_map(|message| { + let maybe_slot_info = ClusterControllerArchivedSegmentHeaderBroadcast::decode( + &mut message.payload.as_ref(), + ) + .ok() + .map(|message| message.archived_segment_header); + + async move { maybe_slot_info } + }))) + } + + async fn segment_headers( + &self, + segment_indices: Vec, + ) -> Result>, RpcError> { + Ok(self + .client + .request( + &ClusterControllerSegmentHeadersRequest { segment_indices }, + None, + ) + .await?) + } + + async fn piece(&self, piece_index: PieceIndex) -> Result, RpcError> { + Ok(self + .client + .request(&ClusterControllerPieceRequest { piece_index }, None) + .await?) + } + + async fn acknowledge_archived_segment_header( + &self, + _segment_index: SegmentIndex, + ) -> Result<(), Error> { + // Acknowledgement is unnecessary/unsupported + Ok(()) + } +} diff --git a/crates/subspace-farmer/src/cluster/plotter.rs b/crates/subspace-farmer/src/cluster/plotter.rs new file mode 100644 index 00000000000..d27e01202e8 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/plotter.rs @@ -0,0 +1,39 @@ +use crate::cluster::nats_client::NatsClient; +use crate::plotter::{Plotter, SectorPlottingProgress}; +use async_trait::async_trait; +use futures::Sink; +use std::error::Error; +use subspace_core_primitives::{PublicKey, SectorIndex}; +use subspace_farmer_components::FarmerProtocolInfo; + +/// Cluster plotter +// TODO: Limit number of sectors that can be plotted concurrently in order to limit memory usage +pub struct ClusterPlotter { + nats_client: NatsClient, +} + +#[async_trait] +impl Plotter for ClusterPlotter { + async fn plot_sector( + &self, + _public_key: PublicKey, + _sector_index: SectorIndex, + _farmer_protocol_info: FarmerProtocolInfo, + _pieces_in_sector: u16, + _replotting: bool, + mut _progress_sender: PS, + ) where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, + { + // TODO + todo!() + } +} + +impl ClusterPlotter { + /// Create new instance + pub fn new(nats_client: NatsClient) -> Self { + Self { nats_client } + } +} diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 6ba2943711c..6495fdf1a6a 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -38,6 +38,7 @@ //! are `target ± ½ * solution range` (while also handing overflow/underflow) when interpreted as //! 64-bit unsigned integers. +pub mod cluster; pub mod farm; pub mod farmer_cache; pub(crate) mod identity; diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index c649dc705ab..181f74a19d1 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -25,6 +25,7 @@ use tracing::{debug, warn}; const MAX_DEFAULT_FARMING_THREADS: usize = 32; /// Joins async join handle on drop +#[derive(Debug)] pub struct AsyncJoinOnDrop { handle: Option>, abort_on_drop: bool, diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index deaa0529acb..210189458a7 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -212,7 +212,10 @@ where let inner = &self.inner; trace!(%piece_index, "Getting piece from local plot"); - let maybe_read_piece_fut = inner.plotted_pieces.read().await.read_piece(piece_index); + let maybe_read_piece_fut = inner + .plotted_pieces + .try_read() + .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index)); if let Some(read_piece_fut) = maybe_read_piece_fut { if let Some(piece) = read_piece_fut.await {