diff --git a/crates/subspace-networking/src/constructor.rs b/crates/subspace-networking/src/constructor.rs index 446e2e6827..f2aec2ae17 100644 --- a/crates/subspace-networking/src/constructor.rs +++ b/crates/subspace-networking/src/constructor.rs @@ -84,6 +84,10 @@ const TEMPORARY_BANS_DEFAULT_BACKOFF_RANDOMIZATION_FACTOR: f64 = 0.1; const TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER: f64 = 1.5; const TEMPORARY_BANS_DEFAULT_MAX_INTERVAL: Duration = Duration::from_secs(30 * 60); +/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers +/// wasting resources and producing a ton of log records. +const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1); + /// Specific YAMUX settings for Subspace applications: additional buffer space for pieces and /// substream's limit. /// @@ -479,6 +483,7 @@ where reserved_peers: ReservedPeersConfig { reserved_peers: reserved_peers.clone(), protocol_name: RESERVED_PEERS_PROTOCOL_NAME, + dialing_interval: DIALING_INTERVAL_IN_SECS, }, peer_info_config: PeerInfoConfig::new(PEER_INFO_PROTOCOL_NAME), peer_info_provider, diff --git a/crates/subspace-networking/src/protocols/reserved_peers.rs b/crates/subspace-networking/src/protocols/reserved_peers.rs index fff49548b1..5aa88086b8 100644 --- a/crates/subspace-networking/src/protocols/reserved_peers.rs +++ b/crates/subspace-networking/src/protocols/reserved_peers.rs @@ -2,6 +2,8 @@ mod handler; #[cfg(test)] mod tests; +use futures::FutureExt; +use futures_timer::Delay; use handler::Handler; use libp2p::core::{Endpoint, Multiaddr}; use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm}; @@ -12,9 +14,8 @@ use libp2p::swarm::{ }; use libp2p::PeerId; use std::collections::HashMap; -use std::ops::Add; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; use tracing::{debug, trace}; use crate::utils::strip_peer_id; @@ -36,7 +37,7 @@ use crate::utils::strip_peer_id; /// 3. `Connected`: This state signals that the peer is currently connected. /// /// The protocol will attempt to establish a connection to a `NotConnected` peer after a set delay, -/// specified by `DIALING_INTERVAL_IN_SECS`, to prevent multiple simultaneous connection attempts +/// specified by configurable dialing interval, to prevent multiple simultaneous connection attempts /// to offline peers. This delay not only conserves resources, but also reduces the amount of /// log output. /// @@ -51,11 +52,15 @@ use crate::utils::strip_peer_id; /// #[derive(Debug)] pub struct Behaviour { - /// Protocol name. - protocol_name: &'static str, + /// Protocol configuration. + config: Config, /// A mapping from `PeerId` to `ReservedPeerState`, where each `ReservedPeerState` /// represents the current state of the connection to a reserved peer. reserved_peers_state: HashMap, + /// Delay between dialing attempts. + dialing_delay: Delay, + /// Future waker. + waker: Option, } /// Reserved peers protocol configuration. @@ -65,29 +70,21 @@ pub struct Config { pub protocol_name: &'static str, /// Predefined set of reserved peers with addresses. pub reserved_peers: Vec, + /// Interval between new dialing attempts. + pub dialing_interval: Duration, } /// Reserved peer connection status. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ConnectionStatus { - /// Reserved peer is not connected. The next connection attempt is scheduled. - NotConnected { scheduled_at: Instant }, + /// Reserved peer is not connected. + NotConnected, /// Reserved peer dialing is in progress. PendingConnection, /// Reserved peer is connected. Connected, } -/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers -/// wasting resources and producing a ton of log records. -const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1); - -/// Helper-function to schedule a connection attempt. -#[inline] -fn schedule_connection() -> Instant { - Instant::now().add(DIALING_INTERVAL_IN_SECS) -} - /// Defines the state of a reserved peer connection state. #[derive(Debug, Clone)] struct ReservedPeerState { @@ -110,7 +107,8 @@ impl Behaviour { "Reserved peers protocol initialization...." ); - let peer_addresses = strip_peer_id(config.reserved_peers); + let peer_addresses = strip_peer_id(config.reserved_peers.clone()); + let dialing_delay = Delay::new(config.dialing_interval); let reserved_peers_state = peer_addresses .into_iter() @@ -120,17 +118,17 @@ impl Behaviour { ReservedPeerState { peer_id, address, - connection_status: ConnectionStatus::NotConnected { - scheduled_at: schedule_connection(), - }, + connection_status: ConnectionStatus::NotConnected, }, ) }) .collect(); Self { - protocol_name: config.protocol_name, + config, reserved_peers_state, + waker: None, + dialing_delay, } } @@ -138,10 +136,16 @@ impl Behaviour { #[inline] fn new_reserved_peers_handler(&self, peer_id: &PeerId) -> Handler { Handler::new( - self.protocol_name, + self.config.protocol_name, self.reserved_peers_state.contains_key(peer_id), ) } + + fn wake(&self) { + if let Some(waker) = &self.waker { + waker.wake_by_ref() + } + } } impl NetworkBehaviour for Behaviour { @@ -175,6 +179,7 @@ impl NetworkBehaviour for Behaviour { state.connection_status = ConnectionStatus::Connected; debug!(peer_id=%state.peer_id, "Reserved peer connected."); + self.wake(); } } FromSwarm::ConnectionClosed(ConnectionClosed { @@ -184,11 +189,10 @@ impl NetworkBehaviour for Behaviour { }) => { if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) { if remaining_established == 0 { - state.connection_status = ConnectionStatus::NotConnected { - scheduled_at: schedule_connection(), - }; + state.connection_status = ConnectionStatus::NotConnected; debug!(%state.peer_id, "Reserved peer disconnected."); + self.wake(); } } } @@ -196,12 +200,11 @@ impl NetworkBehaviour for Behaviour { if let Some(peer_id) = peer_id { if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) { if state.connection_status == ConnectionStatus::PendingConnection { - state.connection_status = ConnectionStatus::NotConnected { - scheduled_at: schedule_connection(), - }; + state.connection_status = ConnectionStatus::NotConnected; }; debug!(peer_id=%state.peer_id, "Reserved peer dialing failed."); + self.wake(); } } } @@ -228,28 +231,35 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, - _: &mut Context<'_>, + cx: &mut Context<'_>, _: &mut impl PollParameters, ) -> Poll>> { - for (_, state) in self.reserved_peers_state.iter_mut() { - trace!(?state, "Reserved peer state."); + // Schedule new peer dialing. + match self.dialing_delay.poll_unpin(cx) { + Poll::Pending => {} + Poll::Ready(()) => { + self.dialing_delay.reset(self.config.dialing_interval); - if let ConnectionStatus::NotConnected { scheduled_at } = state.connection_status { - if Instant::now() > scheduled_at { - state.connection_status = ConnectionStatus::PendingConnection; + for (_, state) in self.reserved_peers_state.iter_mut() { + trace!(?state, "Reserved peer state."); - debug!(peer_id=%state.peer_id, "Dialing the reserved peer...."); + if let ConnectionStatus::NotConnected = state.connection_status { + state.connection_status = ConnectionStatus::PendingConnection; - let dial_opts = - DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]); + debug!(peer_id=%state.peer_id, "Dialing the reserved peer...."); - return Poll::Ready(ToSwarm::Dial { - opts: dial_opts.build(), - }); + let dial_opts = + DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]); + + return Poll::Ready(ToSwarm::Dial { + opts: dial_opts.build(), + }); + } } } } + self.waker.replace(cx.waker().clone()); Poll::Pending } } diff --git a/crates/subspace-networking/src/protocols/reserved_peers/tests.rs b/crates/subspace-networking/src/protocols/reserved_peers/tests.rs index 1a3e366f7e..19c8f99a03 100644 --- a/crates/subspace-networking/src/protocols/reserved_peers/tests.rs +++ b/crates/subspace-networking/src/protocols/reserved_peers/tests.rs @@ -16,6 +16,8 @@ struct ReservedPeersInstance; const PROTOCOL_NAME: &str = "/reserved-peers"; +const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1); + #[tokio::test()] async fn test_connection_breaks_after_timeout_without_reservation() { let connection_timeout = Duration::from_millis(300); @@ -28,6 +30,7 @@ async fn test_connection_breaks_after_timeout_without_reservation() { Behaviour::new(Config { protocol_name: PROTOCOL_NAME, reserved_peers: Vec::new(), + dialing_interval: DIALING_INTERVAL_IN_SECS, }), ); @@ -38,6 +41,7 @@ async fn test_connection_breaks_after_timeout_without_reservation() { Behaviour::new(Config { protocol_name: PROTOCOL_NAME, reserved_peers: Vec::new(), + dialing_interval: DIALING_INTERVAL_IN_SECS, }), ); @@ -77,6 +81,7 @@ async fn test_connection_reservation() { Behaviour::new(Config { protocol_name: PROTOCOL_NAME, reserved_peers: vec![peer2_address.parse().unwrap()], + dialing_interval: DIALING_INTERVAL_IN_SECS, }), ); @@ -86,6 +91,7 @@ async fn test_connection_reservation() { Behaviour::new(Config { protocol_name: PROTOCOL_NAME, reserved_peers: vec![peer1_address.parse().unwrap()], + dialing_interval: DIALING_INTERVAL_IN_SECS, }), ); @@ -124,6 +130,7 @@ async fn test_connection_reservation_symmetry() { Behaviour::new(Config { protocol_name: PROTOCOL_NAME, reserved_peers: vec![peer2_address.parse().unwrap()], + dialing_interval: DIALING_INTERVAL_IN_SECS, }), ); @@ -133,6 +140,7 @@ async fn test_connection_reservation_symmetry() { Behaviour::new(Config { protocol_name: PROTOCOL_NAME, reserved_peers: Vec::new(), + dialing_interval: DIALING_INTERVAL_IN_SECS, }), ); @@ -157,8 +165,8 @@ async fn test_connection_reservation_symmetry() { #[tokio::test()] async fn test_reserved_peers_dial_event() { - let connection_timeout = Duration::from_millis(300); - let long_delay = Duration::from_millis(1000); + let connection_timeout = Duration::from_millis(1300); + let long_delay = Duration::from_millis(2000); let identity1 = Keypair::generate_ed25519(); let identity2 = Keypair::generate_ed25519(); @@ -172,6 +180,7 @@ async fn test_reserved_peers_dial_event() { Behaviour::new(Config { protocol_name: PROTOCOL_NAME, reserved_peers: vec![peer2_address.parse().unwrap()], + dialing_interval: DIALING_INTERVAL_IN_SECS, }), );