Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed May 1, 2024
1 parent 34462da commit 5bac925
Show file tree
Hide file tree
Showing 19 changed files with 2,913 additions and 10 deletions.
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod benchmark;
pub(crate) mod cluster;
pub(crate) mod farm;
mod info;
mod scrub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn audit(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow::anyhow!(error))?;
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
let table_generator = Mutex::new(PosTable::generator());

let sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm)
Expand Down Expand Up @@ -276,7 +276,7 @@ fn prove(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow::anyhow!(error))?;
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
let table_generator = Mutex::new(PosTable::generator());

let mut sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm)
Expand Down
111 changes: 111 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
mod cache;
mod controller;
mod farmer;
mod plotter;

use crate::commands::cluster::cache::{cache, CacheArgs};
use crate::commands::cluster::controller::{controller, ControllerArgs};
use crate::commands::cluster::farmer::{farmer, FarmerArgs};
use crate::commands::cluster::plotter::{plotter, PlotterArgs};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_nats::ServerAddr;
use clap::{Parser, Subcommand};
use futures::{select, FutureExt};
use prometheus_client::registry::Registry;
use std::net::SocketAddr;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_proof_of_space::Table;

/// Arguments for farmer
#[derive(Debug, Parser)]
pub(crate) struct ClusterArgs {
/// Shared arguments for all subcommands
#[clap(flatten)]
shared_args: SharedArgs,
/// Cluster subcommands
#[clap(subcommand)]
subcommand: ClusterSubcommand,
}

/// Shared arguments
#[derive(Debug, Parser)]
struct SharedArgs {
/// NATS server address, typically in `nats://server1:port1` format, can be specified multiple
/// times.
///
/// NOTE: NATS must be configured for message sizes of 2MiB or larger (1MiB is the default).
#[arg(long, alias = "nats-server", required = true)]
nats_servers: Vec<ServerAddr>,
/// Defines endpoints for the prometheus metrics server. It doesn't start without at least
/// one specified endpoint. Format: 127.0.0.1:8080
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])]
prometheus_listen_on: Vec<SocketAddr>,
}

/// Arguments for cluster
#[derive(Debug, Subcommand)]
enum ClusterSubcommand {
/// Farming cluster controller
Controller(ControllerArgs),
/// Farming cluster farmer
Farmer(FarmerArgs),
/// Farming cluster plotter
Plotter(PlotterArgs),
/// Farming cluster cache
Cache(CacheArgs),
}

pub(crate) async fn cluster<PosTable>(cluster_args: ClusterArgs) -> anyhow::Result<()>
where
PosTable: Table,
{
let signal = shutdown_signal();

let nats_client = NatsClient::new(cluster_args.shared_args.nats_servers)
.await
.map_err(|error| anyhow!("Failed to connect to NATS server: {error}"))?;
let mut registry = Registry::default();

let run_fut = async move {
match cluster_args.subcommand {
ClusterSubcommand::Controller(controller_args) => {
controller(nats_client, &mut registry, controller_args).await
}
ClusterSubcommand::Farmer(farmer_args) => {
farmer::<PosTable>(nats_client, &mut registry, farmer_args).await
}
ClusterSubcommand::Plotter(plotter_args) => {
plotter::<PosTable>(nats_client, &mut registry, plotter_args).await
}
ClusterSubcommand::Cache(cache_args) => {
cache(nats_client, &mut registry, cache_args).await
}
}
};

// TODO: Run
// let _prometheus_worker = if !cluster_args.shared_args.prometheus_listen_on.is_empty() {
// let prometheus_task = start_prometheus_metrics_server(
// cluster_args.shared_args.prometheus_listen_on,
// RegistryAdapter::PrometheusClient(registry),
// )?;
//
// let join_handle = tokio::spawn(prometheus_task);
// Some(AsyncJoinOnDrop::new(join_handle, true))
// } else {
// None
// };

select! {
// Signal future
_ = signal.fuse() => {
Ok(())
},

// Run future
result = run_fut.fuse() => {
result
},
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use clap::Parser;
use prometheus_client::registry::Registry;
use subspace_farmer::cluster::nats_client::NatsClient;

/// Arguments for cache
#[derive(Debug, Parser)]
pub(super) struct CacheArgs {
// TODO: Paths
/// Cache group to use, the same cache group must be also specified on corresponding controller
#[arg(long, default_value = "default")]
cache_group: String,
}

pub(super) async fn cache(
_nats_client: NatsClient,
_registry: &mut Registry,
_cache_args: CacheArgs,
) -> anyhow::Result<()> {
todo!()
}
Loading

0 comments on commit 5bac925

Please sign in to comment.