Skip to content

Commit

Permalink
networking: Fix tests of reserved-peers protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
shamil-gadelshin committed Nov 3, 2023
1 parent 733ad8f commit c9a15f3
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 44 deletions.
5 changes: 5 additions & 0 deletions crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const TEMPORARY_BANS_DEFAULT_BACKOFF_RANDOMIZATION_FACTOR: f64 = 0.1;
const TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER: f64 = 1.5;
const TEMPORARY_BANS_DEFAULT_MAX_INTERVAL: Duration = Duration::from_secs(30 * 60);

/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers
/// wasting resources and producing a ton of log records.
const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);

/// Specific YAMUX settings for Subspace applications: additional buffer space for pieces and
/// substream's limit.
///
Expand Down Expand Up @@ -479,6 +483,7 @@ where
reserved_peers: ReservedPeersConfig {
reserved_peers: reserved_peers.clone(),
protocol_name: RESERVED_PEERS_PROTOCOL_NAME,
dialing_interval: DIALING_INTERVAL_IN_SECS,
},
peer_info_config: PeerInfoConfig::new(PEER_INFO_PROTOCOL_NAME),
peer_info_provider,
Expand Down
94 changes: 52 additions & 42 deletions crates/subspace-networking/src/protocols/reserved_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ mod handler;
#[cfg(test)]
mod tests;

use futures::FutureExt;
use futures_timer::Delay;
use handler::Handler;
use libp2p::core::{Endpoint, Multiaddr};
use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm};
Expand All @@ -12,9 +14,8 @@ use libp2p::swarm::{
};
use libp2p::PeerId;
use std::collections::HashMap;
use std::ops::Add;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use tracing::{debug, trace};

use crate::utils::strip_peer_id;
Expand All @@ -36,7 +37,7 @@ use crate::utils::strip_peer_id;
/// 3. `Connected`: This state signals that the peer is currently connected.
///
/// The protocol will attempt to establish a connection to a `NotConnected` peer after a set delay,
/// specified by `DIALING_INTERVAL_IN_SECS`, to prevent multiple simultaneous connection attempts
/// specified by configurable dialing interval, to prevent multiple simultaneous connection attempts
/// to offline peers. This delay not only conserves resources, but also reduces the amount of
/// log output.
///
Expand All @@ -51,11 +52,15 @@ use crate::utils::strip_peer_id;
///
#[derive(Debug)]
pub struct Behaviour {
/// Protocol name.
protocol_name: &'static str,
/// Protocol configuration.
config: Config,
/// A mapping from `PeerId` to `ReservedPeerState`, where each `ReservedPeerState`
/// represents the current state of the connection to a reserved peer.
reserved_peers_state: HashMap<PeerId, ReservedPeerState>,
/// Delay between dialing attempts.
dialing_delay: Delay,
/// Future waker.
waker: Option<Waker>,
}

/// Reserved peers protocol configuration.
Expand All @@ -65,29 +70,21 @@ pub struct Config {
pub protocol_name: &'static str,
/// Predefined set of reserved peers with addresses.
pub reserved_peers: Vec<Multiaddr>,
/// Interval between new dialing attempts.
pub dialing_interval: Duration,
}

/// Reserved peer connection status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionStatus {
/// Reserved peer is not connected. The next connection attempt is scheduled.
NotConnected { scheduled_at: Instant },
/// Reserved peer is not connected.
NotConnected,
/// Reserved peer dialing is in progress.
PendingConnection,
/// Reserved peer is connected.
Connected,
}

/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers
/// wasting resources and producing a ton of log records.
const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);

/// Helper-function to schedule a connection attempt.
#[inline]
fn schedule_connection() -> Instant {
Instant::now().add(DIALING_INTERVAL_IN_SECS)
}

/// Defines the state of a reserved peer connection state.
#[derive(Debug, Clone)]
struct ReservedPeerState {
Expand All @@ -110,7 +107,8 @@ impl Behaviour {
"Reserved peers protocol initialization...."
);

let peer_addresses = strip_peer_id(config.reserved_peers);
let peer_addresses = strip_peer_id(config.reserved_peers.clone());
let dialing_delay = Delay::new(config.dialing_interval);

let reserved_peers_state = peer_addresses
.into_iter()
Expand All @@ -120,28 +118,34 @@ impl Behaviour {
ReservedPeerState {
peer_id,
address,
connection_status: ConnectionStatus::NotConnected {
scheduled_at: schedule_connection(),
},
connection_status: ConnectionStatus::NotConnected,
},
)
})
.collect();

Self {
protocol_name: config.protocol_name,
config,
reserved_peers_state,
waker: None,
dialing_delay,
}
}

/// Create a connection handler for the reserved peers protocol.
#[inline]
fn new_reserved_peers_handler(&self, peer_id: &PeerId) -> Handler {
Handler::new(
self.protocol_name,
self.config.protocol_name,
self.reserved_peers_state.contains_key(peer_id),
)
}

fn wake(&self) {
if let Some(waker) = &self.waker {
waker.wake_by_ref()
}
}
}

impl NetworkBehaviour for Behaviour {
Expand Down Expand Up @@ -175,6 +179,7 @@ impl NetworkBehaviour for Behaviour {
state.connection_status = ConnectionStatus::Connected;

debug!(peer_id=%state.peer_id, "Reserved peer connected.");
self.wake();
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
Expand All @@ -184,24 +189,22 @@ impl NetworkBehaviour for Behaviour {
}) => {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
if remaining_established == 0 {
state.connection_status = ConnectionStatus::NotConnected {
scheduled_at: schedule_connection(),
};
state.connection_status = ConnectionStatus::NotConnected;

debug!(%state.peer_id, "Reserved peer disconnected.");
self.wake();
}
}
}
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => {
if let Some(peer_id) = peer_id {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
if state.connection_status == ConnectionStatus::PendingConnection {
state.connection_status = ConnectionStatus::NotConnected {
scheduled_at: schedule_connection(),
};
state.connection_status = ConnectionStatus::NotConnected;
};

debug!(peer_id=%state.peer_id, "Reserved peer dialing failed.");
self.wake();
}
}
}
Expand All @@ -228,28 +231,35 @@ impl NetworkBehaviour for Behaviour {

fn poll(
&mut self,
_: &mut Context<'_>,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
for (_, state) in self.reserved_peers_state.iter_mut() {
trace!(?state, "Reserved peer state.");
// Schedule new peer dialing.
match self.dialing_delay.poll_unpin(cx) {
Poll::Pending => {}
Poll::Ready(()) => {
self.dialing_delay.reset(self.config.dialing_interval);

if let ConnectionStatus::NotConnected { scheduled_at } = state.connection_status {
if Instant::now() > scheduled_at {
state.connection_status = ConnectionStatus::PendingConnection;
for (_, state) in self.reserved_peers_state.iter_mut() {
trace!(?state, "Reserved peer state.");

debug!(peer_id=%state.peer_id, "Dialing the reserved peer....");
if let ConnectionStatus::NotConnected = state.connection_status {
state.connection_status = ConnectionStatus::PendingConnection;

let dial_opts =
DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]);
debug!(peer_id=%state.peer_id, "Dialing the reserved peer....");

return Poll::Ready(ToSwarm::Dial {
opts: dial_opts.build(),
});
let dial_opts =
DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]);

return Poll::Ready(ToSwarm::Dial {
opts: dial_opts.build(),
});
}
}
}
}

self.waker.replace(cx.waker().clone());
Poll::Pending
}
}
13 changes: 11 additions & 2 deletions crates/subspace-networking/src/protocols/reserved_peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ struct ReservedPeersInstance;

const PROTOCOL_NAME: &str = "/reserved-peers";

const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);

#[tokio::test()]
async fn test_connection_breaks_after_timeout_without_reservation() {
let connection_timeout = Duration::from_millis(300);
Expand All @@ -28,6 +30,7 @@ async fn test_connection_breaks_after_timeout_without_reservation() {
Behaviour::new(Config {
protocol_name: PROTOCOL_NAME,
reserved_peers: Vec::new(),
dialing_interval: DIALING_INTERVAL_IN_SECS,
}),
);

Expand All @@ -38,6 +41,7 @@ async fn test_connection_breaks_after_timeout_without_reservation() {
Behaviour::new(Config {
protocol_name: PROTOCOL_NAME,
reserved_peers: Vec::new(),
dialing_interval: DIALING_INTERVAL_IN_SECS,
}),
);

Expand Down Expand Up @@ -77,6 +81,7 @@ async fn test_connection_reservation() {
Behaviour::new(Config {
protocol_name: PROTOCOL_NAME,
reserved_peers: vec![peer2_address.parse().unwrap()],
dialing_interval: DIALING_INTERVAL_IN_SECS,
}),
);

Expand All @@ -86,6 +91,7 @@ async fn test_connection_reservation() {
Behaviour::new(Config {
protocol_name: PROTOCOL_NAME,
reserved_peers: vec![peer1_address.parse().unwrap()],
dialing_interval: DIALING_INTERVAL_IN_SECS,
}),
);

Expand Down Expand Up @@ -124,6 +130,7 @@ async fn test_connection_reservation_symmetry() {
Behaviour::new(Config {
protocol_name: PROTOCOL_NAME,
reserved_peers: vec![peer2_address.parse().unwrap()],
dialing_interval: DIALING_INTERVAL_IN_SECS,
}),
);

Expand All @@ -133,6 +140,7 @@ async fn test_connection_reservation_symmetry() {
Behaviour::new(Config {
protocol_name: PROTOCOL_NAME,
reserved_peers: Vec::new(),
dialing_interval: DIALING_INTERVAL_IN_SECS,
}),
);

Expand All @@ -157,8 +165,8 @@ async fn test_connection_reservation_symmetry() {

#[tokio::test()]
async fn test_reserved_peers_dial_event() {
let connection_timeout = Duration::from_millis(300);
let long_delay = Duration::from_millis(1000);
let connection_timeout = Duration::from_millis(1300);
let long_delay = Duration::from_millis(2000);

let identity1 = Keypair::generate_ed25519();
let identity2 = Keypair::generate_ed25519();
Expand All @@ -172,6 +180,7 @@ async fn test_reserved_peers_dial_event() {
Behaviour::new(Config {
protocol_name: PROTOCOL_NAME,
reserved_peers: vec![peer2_address.parse().unwrap()],
dialing_interval: DIALING_INTERVAL_IN_SECS,
}),
);

Expand Down

0 comments on commit c9a15f3

Please sign in to comment.