Skip to content

Commit

Permalink
Merge pull request #2242 from subspace/autonat-wrapper
Browse files Browse the repository at this point in the history
Autonat and Kademlia settings updates.
  • Loading branch information
nazar-pc authored Nov 17, 2023
2 parents 21b1c75 + 9e5f337 commit e9af82f
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 50 deletions.
16 changes: 8 additions & 8 deletions crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ pub(crate) mod persistent_parameters;
#[cfg(test)]
mod tests;

use crate::protocols::autonat_wrapper::{
Behaviour as AutonatWrapper, Config as AutonatWrapperConfig,
};
use crate::protocols::connected_peers::{
Behaviour as ConnectedPeersBehaviour, Config as ConnectedPeersConfig,
Event as ConnectedPeersEvent,
Expand All @@ -19,7 +22,7 @@ use crate::protocols::subspace_connection_limits::Behaviour as ConnectionLimitsB
use crate::PeerInfoProvider;
use derive_more::From;
use libp2p::allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers};
use libp2p::autonat::{Behaviour as Autonat, Config as AutonatConfig, Event as AutonatEvent};
use libp2p::autonat::Event as AutonatEvent;
use libp2p::connection_limits::ConnectionLimits;
use libp2p::gossipsub::{
Behaviour as Gossipsub, Config as GossipsubConfig, Event as GossipsubEvent, MessageAuthenticity,
Expand Down Expand Up @@ -59,8 +62,8 @@ pub(crate) struct BehaviorConfig<RecordStore> {
pub(crate) general_connected_peers_config: Option<ConnectedPeersConfig>,
/// The configuration for the [`ConnectedPeers`] protocol (special instance).
pub(crate) special_connected_peers_config: Option<ConnectedPeersConfig>,
/// Autonat configuration (optional).
pub(crate) autonat: Option<AutonatConfig>,
/// Autonat configuration.
pub(crate) autonat: AutonatWrapperConfig,
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -85,7 +88,7 @@ pub(crate) struct Behavior<RecordStore> {
Toggle<ConnectedPeersBehaviour<GeneralConnectedPeersInstance>>,
pub(crate) special_connected_peers:
Toggle<ConnectedPeersBehaviour<SpecialConnectedPeersInstance>>,
pub(crate) autonat: Toggle<Autonat>,
pub(crate) autonat: AutonatWrapper,
}

impl<RecordStore> Behavior<RecordStore>
Expand Down Expand Up @@ -137,10 +140,7 @@ where
.special_connected_peers_config
.map(ConnectedPeersBehaviour::new)
.into(),
autonat: config
.autonat
.map(|autonat_config| Autonat::new(config.peer_id, autonat_config))
.into(),
autonat: AutonatWrapper::new(config.autonat),
}
}
}
Expand Down
33 changes: 25 additions & 8 deletions crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::constructor::temporary_bans::TemporaryBans;
use crate::constructor::transport::build_transport;
use crate::node::Node;
use crate::node_runner::{NodeRunner, NodeRunnerConfig};
use crate::protocols::autonat_wrapper::Config as AutonatWrapperConfig;
use crate::protocols::connected_peers::Config as ConnectedPeersConfig;
use crate::protocols::peer_info::PeerInfoProvider;
use crate::protocols::request_response::request_response_factory::RequestHandler;
Expand Down Expand Up @@ -99,6 +100,8 @@ const YAMUX_BUFFER_SIZE: usize = Piece::SIZE + 1024 * 1024;

/// Max confidence for autonat protocol. Could affect Kademlia mode change.
pub(crate) const AUTONAT_MAX_CONFIDENCE: usize = 3;
/// We set a very long pause before autonat initialization (Duration::Max panics).
const AUTONAT_SERVER_PROBE_DELAY: Duration = Duration::from_secs(3600 * 24 * 365);

/// Defines Kademlia mode
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -481,8 +484,18 @@ where

debug!(?connection_limits, "DSN connection limits set.");

let enable_autonat = external_addresses.is_empty() && kademlia_mode.is_dynamic();
debug!(%enable_autonat, ?external_addresses, ?kademlia_mode, "Autonat settings.");
let autonat_boot_delay = if kademlia_mode.is_static() || !external_addresses.is_empty() {
AUTONAT_SERVER_PROBE_DELAY
} else {
AutonatConfig::default().boot_delay
};

debug!(
?autonat_boot_delay,
?kademlia_mode,
?external_addresses,
"Autonat boot delay set."
);

let mut behaviour = Behavior::new(BehaviorConfig {
peer_id: local_peer_id,
Expand Down Expand Up @@ -516,12 +529,16 @@ where
..ConnectedPeersConfig::default()
}
}),
autonat: enable_autonat.then(|| AutonatConfig {
use_connected: true,
only_global_ips: !config.allow_non_global_addresses_in_dht,
confidence_max: AUTONAT_MAX_CONFIDENCE,
..Default::default()
}),
autonat: AutonatWrapperConfig {
inner_config: AutonatConfig {
use_connected: true,
only_global_ips: !config.allow_non_global_addresses_in_dht,
confidence_max: AUTONAT_MAX_CONFIDENCE,
boot_delay: autonat_boot_delay,
..Default::default()
},
local_peer_id,
},
});

match (kademlia_mode, external_addresses.is_empty()) {
Expand Down
70 changes: 36 additions & 34 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,22 +733,35 @@ where
});

if full_kademlia_support {
let old_addresses = kademlia
.kbucket(peer_id)
.and_then(|peers| {
let key = peer_id.into();
peers.iter().find_map(|peer| {
(peer.node.key == &key).then_some(
peer.node
.value
.iter()
.filter(|address| info.listen_addrs.contains(address))
.cloned()
.collect::<Vec<_>>(),
)
})
})
.unwrap_or_default();
//TODO: Consider restoring obsolete address removal
// let old_addresses = kademlia
// .kbucket(peer_id)
// .and_then(|peers| {
// let key = peer_id.into();
// peers.iter().find_map(|peer| {
// (peer.node.key == &key).then_some(
// peer.node
// .value
// .iter()
// .filter(|address| info.listen_addrs.contains(address))
// .cloned()
// .collect::<Vec<_>>(),
// )
// })
// })
// .unwrap_or_default();

// for old_address in old_addresses {
// trace!(
// %local_peer_id,
// %peer_id,
// %old_address,
// "Removing old self-reported address from Kademlia DHT",
// );
//
// kademlia.remove_address(&peer_id, &old_address);
// }

for address in info.listen_addrs {
if !self.allow_non_global_addresses_in_dht
&& !is_global_address_or_dns(&address)
Expand All @@ -762,7 +775,7 @@ where
continue;
}

trace!(
debug!(
%local_peer_id,
%peer_id,
%address,
Expand All @@ -772,16 +785,6 @@ where

kademlia.add_address(&peer_id, address);
}
for old_address in old_addresses {
trace!(
%local_peer_id,
%peer_id,
%old_address,
"Removing old self-reported address from Kademlia DHT",
);

kademlia.remove_address(&peer_id, &old_address);
}
} else {
debug!(
%local_peer_id,
Expand Down Expand Up @@ -1190,13 +1193,12 @@ where

async fn handle_autonat_event(&mut self, event: AutonatEvent) {
trace!(?event, "Autonat event received.");
if let Some(autonat) = self.swarm.behaviour().autonat.as_ref() {
debug!(
public_address=?autonat.public_address(),
confidence=%autonat.confidence(),
"Current public address confidence."
);
}
let autonat = &self.swarm.behaviour().autonat;
debug!(
public_address=?autonat.public_address(),
confidence=%autonat.confidence(),
"Current public address confidence."
);

match event {
AutonatEvent::InboundProbe(_inbound_probe_event) => {
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/src/protocols.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod autonat_wrapper;
pub(crate) mod connected_peers;
pub mod peer_info;
pub mod request_response;
Expand Down
150 changes: 150 additions & 0 deletions crates/subspace-networking/src/protocols/autonat_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use crate::utils::is_global_address_or_dns;
use libp2p::autonat::{Behaviour as Autonat, Config as AutonatConfig, Event as AutonatEvent};
use libp2p::core::Endpoint;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use std::collections::HashSet;
use std::task::{Context, Poll};
use tracing::debug;

pub(crate) struct Config {
pub(crate) inner_config: AutonatConfig,
pub(crate) local_peer_id: PeerId,
}

pub(crate) struct Behaviour {
inner: Autonat,
config: Config,
listen_addresses: HashSet<Multiaddr>,
}

impl Behaviour {
pub(crate) fn new(config: Config) -> Self {
Self {
inner: Autonat::new(config.local_peer_id, config.inner_config.clone()),
config,
listen_addresses: Default::default(),
}
}

fn private_ips_enabled(&self) -> bool {
!self.config.inner_config.only_global_ips
}

fn address_corresponds_to_listening_addresses(&self, addr: &Multiaddr) -> bool {
let candidate_protocol = addr
.iter()
.find_map(|protocol| match protocol {
udp @ Protocol::Udp(_) => Some(udp),
tcp @ Protocol::Tcp(_) => Some(tcp),
_ => None,
})
.expect("Either TCP or UDP protocol should be enabled.");

let address_result = self
.listen_addresses
.iter()
.any(|addr| addr.iter().any(|protocol| protocol == candidate_protocol));

debug!(
%address_result,
?addr,
listen_addresses=?self.listen_addresses,
"Address candidate corresponds to listening addresses."
);

address_result
}

pub(crate) fn public_address(&self) -> Option<&Multiaddr> {
self.inner.public_address()
}

pub(crate) fn confidence(&self) -> usize {
self.inner.confidence()
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = <Autonat as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = AutonatEvent;

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}

fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
new_listen_addr_event @ FromSwarm::NewListenAddr(_) => {
if let FromSwarm::NewListenAddr(addr) = new_listen_addr_event {
//TODO: handle listener address change
self.listen_addresses.insert(addr.addr.clone());

if self.private_ips_enabled() || is_global_address_or_dns(addr.addr) {
self.inner.on_swarm_event(new_listen_addr_event);
} else {
debug!(addr=?addr.addr, "Skipped listening address in AutonatWrapper.");
}
}
}
new_external_addr_event @ FromSwarm::NewExternalAddrCandidate(_) => {
if let FromSwarm::NewExternalAddrCandidate(addr) = new_external_addr_event {
if self.address_corresponds_to_listening_addresses(addr.addr) {
self.inner.on_swarm_event(new_external_addr_event);
} else {
debug!(
addr=?addr.addr,
"Skipped external address candidate in AutonatWrapper."
);
}
}
}
event => {
self.inner.on_swarm_event(event);
}
}
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.inner
.on_connection_handler_event(peer_id, connection_id, event)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
self.inner.poll(cx)
}
}

0 comments on commit e9af82f

Please sign in to comment.