Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add infrastructure for Subspace networking metrics. #2284

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE};
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::metrics::Metrics;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::strip_peer_id;
Expand Down Expand Up @@ -65,13 +64,14 @@ pub(super) fn configure_dsn(

// Metrics
let mut metrics_registry = Registry::default();
let metrics = initialize_metrics.then(|| Metrics::new(&mut metrics_registry));
let dsn_metric_registry = initialize_metrics.then_some(&mut metrics_registry);

let default_config = Config::new(
protocol_prefix,
keypair,
piece_cache.clone(),
Some(PeerInfoProvider::new_farmer()),
dsn_metric_registry,
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
);
let config = Config {
reserved_peers,
Expand Down Expand Up @@ -203,7 +203,6 @@ pub(super) fn configure_dsn(
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Dynamic,
external_addresses,
metrics,
disable_bootstrap_on_start,
..default_config
};
Expand Down
8 changes: 7 additions & 1 deletion crates/subspace-networking/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ pub async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client));
let default_config = Config::new(
protocol_prefix,
keypair,
(),
Some(PeerInfoProvider::Client),
None,
);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
let config_1 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
metrics: Some(metrics),
external_metrics: Some(metrics),
..Config::default()
};
let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap();
Expand Down
8 changes: 7 additions & 1 deletion crates/subspace-networking/examples/random-walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,13 @@ async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client));
let default_config = Config::new(
protocol_prefix,
keypair,
(),
Some(PeerInfoProvider::Client),
None,
);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use clap::Parser;
use futures::{select, FutureExt};
use libp2p::identity::ed25519::Keypair;
use libp2p::kad::Mode;
use libp2p::metrics::Metrics;
use libp2p::{identity, Multiaddr, PeerId};
use prometheus_client::registry::Registry;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -153,19 +152,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let keypair = identity::Keypair::from(decoded_keypair);

// Metrics
let mut metric_registry = Registry::default();
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();
let metrics =
metrics_endpoints_are_specified.then(|| Metrics::new(&mut metric_registry));

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metric_registry),
)
})
.transpose()?;
let mut metrics_registry = Registry::default();
let dsn_metrics_registry =
metrics_endpoints_are_specified.then_some(&mut metrics_registry);

let known_peers_registry_config = KnownPeersManagerConfig {
enable_known_peers_source: false,
Expand Down Expand Up @@ -199,10 +189,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Static(Mode::Server),
external_addresses,
metrics,
networking_parameters_registry: known_peers_registry.boxed(),

..Config::new(protocol_version.to_string(), keypair, (), None)
..Config::new(
protocol_version.to_string(),
keypair,
(),
None,
dsn_metrics_registry,
)
};
let (node, mut node_runner) =
subspace_networking::construct(config).expect("Networking stack creation failed.");
Expand All @@ -220,6 +215,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
.detach();

info!("Subspace Bootstrap Node started");

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
)
})
.transpose()?;
if let Some(prometheus_task) = prometheus_task {
select! {
_ = node_runner.run().fuse() => {},
Expand Down
23 changes: 20 additions & 3 deletions crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::protocols::request_response::request_response_factory::RequestHandler
use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
use crate::shared::Shared;
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::strip_peer_id;
use crate::utils::{strip_peer_id, SubspaceMetrics};
use crate::{PeerInfo, PeerInfoConfig};
use backoff::{ExponentialBackoff, SystemClock};
use futures::channel::mpsc;
Expand All @@ -35,6 +35,7 @@ use libp2p::multiaddr::Protocol;
use libp2p::yamux::Config as YamuxConfig;
use libp2p::{identity, Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::borrow::Cow;
use std::iter::Empty;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -239,7 +240,9 @@ pub struct Config<LocalRecordProvider> {
/// Backoff policy for temporary banning of unreachable peers.
pub temporary_ban_backoff: ExponentialBackoff,
/// Optional external prometheus metrics. None will disable metrics gathering.
pub metrics: Option<Metrics>,
pub external_metrics: Option<Metrics>,
/// Internal prometheus metrics. None will disable metrics gathering.
pub metrics: Option<SubspaceMetrics>,
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
/// Defines protocol version for the network peers. Affects network partition.
pub protocol_version: String,
/// Specifies a source for peer information. None disables the protocol.
Expand Down Expand Up @@ -292,6 +295,7 @@ impl Default for Config<()> {
keypair,
(),
Some(PeerInfoProvider::new_client()),
None,
)
}
}
Expand All @@ -306,7 +310,17 @@ where
keypair: identity::Keypair,
local_records_provider: LocalRecordProvider,
peer_info_provider: Option<PeerInfoProvider>,
metrics_registry: Option<&mut Registry>,
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
) -> Self {
let (external_metrics, metrics) = metrics_registry
.map(|registry| {
(
Some(Metrics::new(registry)),
Some(SubspaceMetrics::new(registry)),
)
})
.unwrap_or((None, None));

let mut kademlia = KademliaConfig::default();
kademlia
.set_query_timeout(KADEMLIA_QUERY_TIMEOUT)
Expand Down Expand Up @@ -379,7 +393,8 @@ where
max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS,
temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE,
temporary_ban_backoff,
metrics: None,
external_metrics,
metrics,
protocol_version,
peer_info_provider,
// Don't need to keep additional connections by default
Expand Down Expand Up @@ -450,6 +465,7 @@ where
max_pending_outgoing_connections,
temporary_bans_cache_size,
temporary_ban_backoff,
external_metrics,
metrics,
protocol_version,
peer_info_provider,
Expand Down Expand Up @@ -639,6 +655,7 @@ where
networking_parameters_registry,
reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(),
temporary_bans,
external_metrics,
metrics,
protocol_version,
general_connection_decision_handler,
Expand Down
23 changes: 18 additions & 5 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::protocols::request_response::request_response_factory::{
};
use crate::shared::{Command, CreatedSubscription, NewPeerInfo, PeerDiscovered, Shared};
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress};
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress, SubspaceMetrics};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand Down Expand Up @@ -123,8 +123,10 @@ where
reserved_peers: HashMap<PeerId, Multiaddr>,
/// Temporarily banned peers.
temporary_bans: Arc<Mutex<TemporaryBans>>,
/// Prometheus metrics.
metrics: Option<Metrics>,
/// External Prometheus metrics.
external_metrics: Option<Metrics>,
/// Subspace Prometheus metrics.
metrics: Option<SubspaceMetrics>,
/// Mapping from specific peer to ip addresses
peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
/// Defines protocol version for the network peers. Affects network partition.
Expand Down Expand Up @@ -178,7 +180,8 @@ where
pub(crate) networking_parameters_registry: Box<dyn KnownPeersRegistry>,
pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
pub(crate) metrics: Option<Metrics>,
pub(crate) external_metrics: Option<Metrics>,
pub(crate) metrics: Option<SubspaceMetrics>,
pub(crate) protocol_version: String,
pub(crate) general_connection_decision_handler: Option<ConnectedPeersHandler>,
pub(crate) special_connection_decision_handler: Option<ConnectedPeersHandler>,
Expand All @@ -201,6 +204,7 @@ where
mut networking_parameters_registry,
reserved_peers,
temporary_bans,
external_metrics,
metrics,
protocol_version,
general_connection_decision_handler,
Expand Down Expand Up @@ -239,6 +243,7 @@ where
networking_parameters_registry,
reserved_peers,
temporary_bans,
external_metrics,
metrics,
peer_ip_addresses: HashMap::new(),
protocol_version,
Expand Down Expand Up @@ -548,6 +553,10 @@ where
if num_established.get() == 1 {
shared.handlers.connected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.inc_established_connections()
}
}
SwarmEvent::ConnectionClosed {
peer_id,
Expand Down Expand Up @@ -592,6 +601,10 @@ where
if num_established == 0 {
shared.handlers.disconnected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.dec_established_connections()
};
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = &peer_id {
Expand Down Expand Up @@ -1556,7 +1569,7 @@ where
}

fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
if let Some(ref mut metrics) = self.metrics {
if let Some(ref mut metrics) = self.external_metrics {
match swarm_event {
SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
metrics.record(ping_event);
Expand Down
35 changes: 35 additions & 0 deletions crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use event_listener_primitives::Bag;
use futures::future::{Fuse, FusedFuture, FutureExt};
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
Expand All @@ -21,6 +23,39 @@ use tokio::runtime::Handle;
use tokio::task;
use tracing::warn;

const NETWORKING_REGISTRY_PREFIX: &str = "subspace";

/// Metrics for Subspace networking
pub struct SubspaceMetrics {
established_connections: Gauge,
}

impl SubspaceMetrics {
/// Constructor
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix(NETWORKING_REGISTRY_PREFIX);

let gauge = Gauge::default();
sub_registry.register(
"established_connections",
"The current number of established connections",
gauge.clone(),
);

Self {
established_connections: gauge,
}
}

pub(crate) fn inc_established_connections(&mut self) {
self.established_connections.inc();
}

pub(crate) fn dec_established_connections(&mut self) {
self.established_connections.dec();
}
}

/// Joins async join handle on drop
pub(crate) struct AsyncJoinOnDrop<T>(Option<Fuse<task::JoinHandle<T>>>);

Expand Down
15 changes: 10 additions & 5 deletions crates/subspace-service/src/dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use subspace_networking::libp2p::kad::Mode;
use subspace_networking::libp2p::metrics::Metrics;
use subspace_networking::libp2p::{identity, Multiaddr};
use subspace_networking::utils::strip_peer_id;
use subspace_networking::{
Expand Down Expand Up @@ -80,8 +79,8 @@ pub(crate) fn create_dsn_instance(
) -> Result<(Node, NodeRunner<()>, Option<Registry>), DsnConfigurationError> {
trace!("Subspace networking starting.");

let mut metric_registry = Registry::default();
let metrics = enable_metrics.then(|| Metrics::new(&mut metric_registry));
let mut metrics_registry = Registry::default();
let dsn_metrics_registry = enable_metrics.then_some(&mut metrics_registry);

let networking_parameters_registry = {
// TODO: Make `base_path` point to `network` once we can clean up below migration code
Expand Down Expand Up @@ -122,6 +121,7 @@ pub(crate) fn create_dsn_instance(
keypair,
(),
Some(PeerInfoProvider::new_node()),
dsn_metrics_registry,
);

let networking_config = subspace_networking::Config {
Expand All @@ -147,13 +147,18 @@ pub(crate) fn create_dsn_instance(
bootstrap_addresses: dsn_config.bootstrap_nodes,
external_addresses: dsn_config.external_addresses,
kademlia_mode: KademliaMode::Static(Mode::Client),
metrics,
disable_bootstrap_on_start: dsn_config.disable_bootstrap_on_start,

..default_networking_config
};

subspace_networking::construct(networking_config)
.map(|(node, node_runner)| (node, node_runner, enable_metrics.then_some(metric_registry)))
.map(|(node, node_runner)| {
(
node,
node_runner,
enable_metrics.then_some(metrics_registry),
)
})
.map_err(Into::into)
}
Loading