Skip to content

Commit

Permalink
Merge pull request #2295 from subspace/disable-connected-peers
Browse files Browse the repository at this point in the history
Disable "connected peers" and "peer info" protocols.
  • Loading branch information
nazar-pc authored Dec 11, 2023
2 parents ed93719 + e49138e commit 8e3e13b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 115 deletions.
61 changes: 34 additions & 27 deletions crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,16 @@ mod tests;
use crate::protocols::autonat_wrapper::{
Behaviour as AutonatWrapper, Config as AutonatWrapperConfig,
};
use crate::protocols::connected_peers::{
Behaviour as ConnectedPeersBehaviour, Config as ConnectedPeersConfig,
Event as ConnectedPeersEvent,
};
use crate::protocols::peer_info::{
Behaviour as PeerInfoBehaviour, Config as PeerInfoConfig, Event as PeerInfoEvent,
};
use crate::protocols::connected_peers::Config as ConnectedPeersConfig;
use crate::protocols::peer_info::Event as PeerInfoEvent;
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, RequestHandler, RequestResponseFactoryBehaviour,
};
use crate::protocols::reserved_peers::{
Behaviour as ReservedPeersBehaviour, Config as ReservedPeersConfig, Event as ReservedPeersEvent,
};
use crate::protocols::subspace_connection_limits::Behaviour as ConnectionLimitsBehaviour;
use crate::PeerInfoProvider;
use crate::{PeerInfoConfig, PeerInfoProvider};
use derive_more::From;
use libp2p::allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers};
use libp2p::autonat::Event as AutonatEvent;
Expand Down Expand Up @@ -54,12 +49,20 @@ pub(crate) struct BehaviorConfig<RecordStore> {
pub(crate) connection_limits: ConnectionLimits,
/// The configuration for the [`ReservedPeersBehaviour`].
pub(crate) reserved_peers: ReservedPeersConfig,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// The configuration for the [`PeerInfo`] protocol.
pub(crate) peer_info_config: PeerInfoConfig,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Provides peer-info for local peer.
pub(crate) peer_info_provider: Option<PeerInfoProvider>,
/// The configuration for the [`ConnectedPeers`] protocol (general instance).
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
pub(crate) general_connected_peers_config: Option<ConnectedPeersConfig>,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// The configuration for the [`ConnectedPeers`] protocol (special instance).
pub(crate) special_connected_peers_config: Option<ConnectedPeersConfig>,
/// Autonat configuration.
Expand All @@ -85,11 +88,12 @@ pub(crate) struct Behavior<RecordStore> {
pub(crate) request_response: RequestResponseFactoryBehaviour,
pub(crate) block_list: BlockListBehaviour,
pub(crate) reserved_peers: ReservedPeersBehaviour,
pub(crate) peer_info: Toggle<PeerInfoBehaviour>,
pub(crate) general_connected_peers:
Toggle<ConnectedPeersBehaviour<GeneralConnectedPeersInstance>>,
pub(crate) special_connected_peers:
Toggle<ConnectedPeersBehaviour<SpecialConnectedPeersInstance>>,
// TODO: Restore or remove connected peer later
// pub(crate) peer_info: Toggle<PeerInfoBehaviour>,
// pub(crate) general_connected_peers:
// Toggle<ConnectedPeersBehaviour<GeneralConnectedPeersInstance>>,
// pub(crate) special_connected_peers:
// Toggle<ConnectedPeersBehaviour<SpecialConnectedPeersInstance>>,
pub(crate) autonat: AutonatWrapper,
}

Expand All @@ -116,9 +120,10 @@ where
})
.into();

let peer_info = config
.peer_info_provider
.map(|provider| PeerInfoBehaviour::new(config.peer_info_config, provider));
// TODO: Restore or remove connected peer later
// let peer_info = config
// .peer_info_provider
// .map(|provider| PeerInfoBehaviour::new(config.peer_info_config, provider));

Self {
connection_limits: ConnectionLimitsBehaviour::new(config.connection_limits),
Expand All @@ -133,15 +138,16 @@ where
.expect("RequestResponse protocols registration failed."),
block_list: BlockListBehaviour::default(),
reserved_peers: ReservedPeersBehaviour::new(config.reserved_peers),
peer_info: peer_info.into(),
general_connected_peers: config
.general_connected_peers_config
.map(ConnectedPeersBehaviour::new)
.into(),
special_connected_peers: config
.special_connected_peers_config
.map(ConnectedPeersBehaviour::new)
.into(),
// TODO: Restore or remove connected peer later
//peer_info: peer_info.into(),
// general_connected_peers: config
// .general_connected_peers_config
// .map(ConnectedPeersBehaviour::new)
// .into(),
// special_connected_peers: config
// .special_connected_peers_config
// .map(ConnectedPeersBehaviour::new)
// .into(),
autonat: AutonatWrapper::new(config.autonat),
}
}
Expand All @@ -158,7 +164,8 @@ pub(crate) enum Event {
VoidEventStub(VoidEvent),
ReservedPeers(ReservedPeersEvent),
PeerInfo(PeerInfoEvent),
GeneralConnectedPeers(ConnectedPeersEvent<GeneralConnectedPeersInstance>),
SpecialConnectedPeers(ConnectedPeersEvent<SpecialConnectedPeersInstance>),
// TODO: Restore or remove connected peer later
// GeneralConnectedPeers(ConnectedPeersEvent<GeneralConnectedPeersInstance>),
// SpecialConnectedPeers(ConnectedPeersEvent<SpecialConnectedPeersInstance>),
Autonat(AutonatEvent),
}
132 changes: 70 additions & 62 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ use crate::behavior::persistent_parameters::{
append_p2p_suffix, remove_p2p_suffix, KnownPeersRegistry, PeerAddressRemovedEvent,
PEERS_ADDRESSES_BATCH_SIZE,
};
use crate::behavior::{
Behavior, Event, GeneralConnectedPeersInstance, SpecialConnectedPeersInstance,
};
use crate::behavior::{Behavior, Event};
use crate::constructor;
use crate::constructor::temporary_bans::TemporaryBans;
use crate::constructor::{ConnectedPeersHandler, LocalOnlyRecordStore};
use crate::protocols::connected_peers::Event as ConnectedPeersEvent;
use crate::protocols::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess};
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, IfDisconnected,
Expand Down Expand Up @@ -132,10 +129,16 @@ where
peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
/// Defines protocol version for the network peers. Affects network partition.
protocol_version: String,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Defines whether we maintain a persistent connection for common peers.
general_connection_decision_handler: Option<ConnectedPeersHandler>,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Defines whether we maintain a persistent connection for special peers.
special_connection_decision_handler: Option<ConnectedPeersHandler>,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Randomness generator used for choosing Kademlia addresses.
rng: StdRng,
/// Addresses to bootstrap Kademlia network
Expand Down Expand Up @@ -454,12 +457,13 @@ where
SwarmEvent::Behaviour(Event::PeerInfo(event)) => {
self.handle_peer_info_event(event).await;
}
SwarmEvent::Behaviour(Event::GeneralConnectedPeers(event)) => {
self.handle_general_connected_peers_event(event).await;
}
SwarmEvent::Behaviour(Event::SpecialConnectedPeers(event)) => {
self.handle_special_connected_peers_event(event).await;
}
// TODO: Restore or remove connected peer later
// SwarmEvent::Behaviour(Event::GeneralConnectedPeers(event)) => {
// self.handle_general_connected_peers_event(event).await;
// }
// SwarmEvent::Behaviour(Event::SpecialConnectedPeers(event)) => {
// self.handle_special_connected_peers_event(event).await;
// }
SwarmEvent::Behaviour(Event::Autonat(event)) => {
self.handle_autonat_event(event).await;
}
Expand Down Expand Up @@ -1178,61 +1182,63 @@ where
});
}

if let Some(general_connected_peers) =
self.swarm.behaviour_mut().general_connected_peers.as_mut()
{
let keep_alive = self
.general_connection_decision_handler
.as_ref()
.map(|handler| handler(&peer_info))
.unwrap_or(false);

general_connected_peers.update_keep_alive_status(event.peer_id, keep_alive);
}

if let Some(special_connected_peers) =
self.swarm.behaviour_mut().special_connected_peers.as_mut()
{
let special_keep_alive = self
.special_connection_decision_handler
.as_ref()
.map(|handler| handler(&peer_info))
.unwrap_or(false);

special_connected_peers.update_keep_alive_status(event.peer_id, special_keep_alive);
}
// TODO: Restore or remove connected peer later
// if let Some(general_connected_peers) =
// self.swarm.behaviour_mut().general_connected_peers.as_mut()
// {
// let keep_alive = self
// .general_connection_decision_handler
// .as_ref()
// .map(|handler| handler(&peer_info))
// .unwrap_or(false);
//
// general_connected_peers.update_keep_alive_status(event.peer_id, keep_alive);
// }
//
// if let Some(special_connected_peers) =
// self.swarm.behaviour_mut().special_connected_peers.as_mut()
// {
// let special_keep_alive = self
// .special_connection_decision_handler
// .as_ref()
// .map(|handler| handler(&peer_info))
// .unwrap_or(false);
//
// special_connected_peers.update_keep_alive_status(event.peer_id, special_keep_alive);
// }
}
}

async fn handle_general_connected_peers_event(
&mut self,
event: ConnectedPeersEvent<GeneralConnectedPeersInstance>,
) {
trace!(?event, "General connected peers event.");

let peers = self.get_peers_to_dial().await;

if let Some(general_connected_peers) =
self.swarm.behaviour_mut().general_connected_peers.as_mut()
{
general_connected_peers.add_peers_to_dial(&peers);
}
}

async fn handle_special_connected_peers_event(
&mut self,
event: ConnectedPeersEvent<SpecialConnectedPeersInstance>,
) {
trace!(?event, "Special connected peers event.");

let peers = self.get_peers_to_dial().await;

if let Some(special_connected_peers) =
self.swarm.behaviour_mut().special_connected_peers.as_mut()
{
special_connected_peers.add_peers_to_dial(&peers);
}
}
// TODO: Restore or remove connected peer later
// async fn handle_general_connected_peers_event(
// &mut self,
// event: ConnectedPeersEvent<GeneralConnectedPeersInstance>,
// ) {
// trace!(?event, "General connected peers event.");
//
// let peers = self.get_peers_to_dial().await;
//
// if let Some(general_connected_peers) =
// self.swarm.behaviour_mut().general_connected_peers.as_mut()
// {
// general_connected_peers.add_peers_to_dial(&peers);
// }
// }
//
// async fn handle_special_connected_peers_event(
// &mut self,
// event: ConnectedPeersEvent<SpecialConnectedPeersInstance>,
// ) {
// trace!(?event, "Special connected peers event.");
//
// let peers = self.get_peers_to_dial().await;
//
// if let Some(special_connected_peers) =
// self.swarm.behaviour_mut().special_connected_peers.as_mut()
// {
// special_connected_peers.add_peers_to_dial(&peers);
// }
// }

async fn handle_autonat_event(&mut self, event: AutonatEvent) {
trace!(?event, "Autonat event received.");
Expand Down Expand Up @@ -1575,6 +1581,8 @@ where
}
}

// TODO: Restore or remove connected peer later
#[allow(dead_code)]
async fn get_peers_to_dial(&mut self) -> Vec<PeerAddress> {
let mut result_peers =
Vec::with_capacity(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE + PEERS_ADDRESSES_BATCH_SIZE);
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/src/protocols/connected_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
//! attempts, and manages a cache for candidates for permanent connections. It maintains
//! a single connection for each peer. Multiple protocol instances could be instantiated.
//! TODO: Restore or remove connected peer later
#![allow(dead_code)]

mod handler;

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/src/protocols/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! TODO: Restore or remove connected peer later
#![allow(dead_code)]

mod handler;
mod protocol;

Expand Down
54 changes: 28 additions & 26 deletions crates/subspace-service/src/sync_from_dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const MIN_OFFLINE_PERIOD: Duration = Duration::from_secs(60);
#[derive(Debug)]
enum NotificationReason {
NoImportedBlocks,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
WentOnlineSubspace,
WentOnlineSubstrate,
}
Expand Down Expand Up @@ -97,38 +99,38 @@ where

async fn create_observer<Block, Client>(
network_service: &NetworkService<Block, <Block as BlockT>::Hash>,
node: &Node,
_node: &Node,
client: &Client,
notifications_sender: mpsc::Sender<NotificationReason>,
) where
Block: BlockT,
Client: BlockchainEvents<Block> + Send + Sync + 'static,
{
// Separate reactive observer for Subspace networking that is not a future
let _handler_id = node.on_num_established_peer_connections_change({
// Assuming node is offline by default
let last_online = Atomic::new(None::<Instant>);
let notifications_sender = notifications_sender.clone();

Arc::new(move |&new_connections| {
let is_online = new_connections > 0;
let was_online = last_online
.load(Ordering::AcqRel)
.map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
.unwrap_or_default();

if is_online && !was_online {
// Doesn't matter if sending failed here
let _ = notifications_sender
.clone()
.try_send(NotificationReason::WentOnlineSubspace);
}

if is_online {
last_online.store(Some(Instant::now()), Ordering::Release);
}
})
});
// // Separate reactive observer for Subspace networking that is not a future
// let _handler_id = node.on_num_established_peer_connections_change({
// // Assuming node is offline by default
// let last_online = Atomic::new(None::<Instant>);
// let notifications_sender = notifications_sender.clone();
//
// Arc::new(move |&new_connections| {
// let is_online = new_connections > 0;
// let was_online = last_online
// .load(Ordering::AcqRel)
// .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
// .unwrap_or_default();
//
// if is_online && !was_online {
// // Doesn't matter if sending failed here
// let _ = notifications_sender
// .clone()
// .try_send(NotificationReason::WentOnlineSubspace);
// }
//
// if is_online {
// last_online.store(Some(Instant::now()), Ordering::Release);
// }
// })
// });
futures::select! {
_ = create_imported_blocks_observer(client, notifications_sender.clone()).fuse() => {
// Runs indefinitely
Expand Down

0 comments on commit 8e3e13b

Please sign in to comment.