Skip to content

Commit

Permalink
Merge pull request #2497 from subspace/connection-limits
Browse files Browse the repository at this point in the history
Improve the default connection limit management for DSN.
  • Loading branch information
nazar-pc authored Feb 1, 2024
2 parents 9aae023 + 9c74558 commit 5fe09ce
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 390 deletions.
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
/// Size of the cache of archived segments for the purposes of faster sector expiration checks.
const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expect("Not zero; qed");
/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed");
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(7).expect("Not zero; qed");

/// Details about sector currently being plotted
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const GOSSIPSUB_PROTOCOL_PREFIX: &str = "subspace/gossipsub";
/// It must be set for large plots.
const SWARM_MAX_NEGOTIATING_INBOUND_STREAMS: usize = 100000;
/// How long will connection be allowed to be open without any usage
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
/// The default maximum established incoming connection number for the swarm.
const SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 100;
/// The default maximum established incoming connection number for the swarm.
Expand Down
22 changes: 11 additions & 11 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::protocols::request_response::handlers::generic_request_handler::Gener
use crate::protocols::request_response::request_response_factory;
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::HandlerFn;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand All @@ -17,6 +16,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use thiserror::Error;
use tokio::sync::OwnedSemaphorePermit;
use tracing::{debug, error, trace};

/// Topic subscription, will unsubscribe when last instance is dropped for a particular topic.
Expand All @@ -28,7 +28,7 @@ pub struct TopicSubscription {
command_sender: Option<mpsc::Sender<Command>>,
#[pin]
receiver: mpsc::UnboundedReceiver<Bytes>,
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
}

impl Stream for TopicSubscription {
Expand Down Expand Up @@ -265,7 +265,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerRecord>, GetValueError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -288,7 +288,7 @@ impl Node {
key: Multihash,
value: Vec<u8>,
) -> Result<impl Stream<Item = ()>, PutValueError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -308,7 +308,7 @@ impl Node {

/// Subcribe to some topic on the DSN.
pub async fn subscribe(&self, topic: Sha256Topic) -> Result<TopicSubscription, SubscribeError> {
let permit = self.shared.rate_limiter.acquire_regular_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand Down Expand Up @@ -336,7 +336,7 @@ impl Node {

/// Subcribe a messgo to some topic on the DSN.
pub async fn publish(&self, topic: Sha256Topic, message: Vec<u8>) -> Result<(), PublishError> {
let _permit = self.shared.rate_limiter.acquire_regular_permit().await;
let _permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand All @@ -362,7 +362,7 @@ impl Node {
Request: GenericRequest,
{
let _permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};
Expand Down Expand Up @@ -410,7 +410,7 @@ impl Node {
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};
Expand Down Expand Up @@ -447,7 +447,7 @@ impl Node {
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
let permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};
Expand Down Expand Up @@ -576,7 +576,7 @@ impl Node {

/// Returns the request batch handle with common "connection permit" slot from the shared pool.
pub async fn get_requests_batch_handle(&self) -> NodeRequestsBatchHandle {
let _permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let _permit = self.shared.rate_limiter.acquire_permit().await;

NodeRequestsBatchHandle {
_permit,
Expand All @@ -591,7 +591,7 @@ impl Node {
/// we don't need to obtain separate semaphore permits for the operations.
pub struct NodeRequestsBatchHandle {
node: Node,
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
}

impl NodeRequestsBatchHandle {
Expand Down
38 changes: 7 additions & 31 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, IfDisconnected,
};
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
Expand Down Expand Up @@ -37,41 +36,35 @@ use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fmt::Debug;
use std::net::IpAddr;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::OwnedSemaphorePermit;
use tokio::task::yield_now;
use tokio::time::Sleep;
use tracing::{debug, error, trace, warn};

/// How many peers should node be connected to before boosting turns on.
///
/// 1 means boosting starts with second peer.
const CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD: NonZeroUsize =
NonZeroUsize::new(5).expect("Not zero; qed");

enum QueryResultSender {
Value {
sender: mpsc::UnboundedSender<PeerRecord>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
},
ClosestPeers {
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: Option<RateLimiterPermit>,
_permit: Option<OwnedSemaphorePermit>,
},
Providers {
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: Option<RateLimiterPermit>,
_permit: Option<OwnedSemaphorePermit>,
},
PutValue {
sender: mpsc::UnboundedSender<()>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
},
Bootstrap {
sender: mpsc::UnboundedSender<()>,
Expand Down Expand Up @@ -503,15 +496,7 @@ where
.num_established_peer_connections
.fetch_add(1, Ordering::SeqCst)
+ 1;
if num_established_peer_connections > CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get() {
// The peer count exceeded the threshold, bump up the quota.
if let Err(error) = shared.rate_limiter.expand_kademlia_semaphore() {
warn!(%error, "Failed to expand Kademlia concurrent tasks");
}
if let Err(error) = shared.rate_limiter.expand_regular_semaphore() {
warn!(%error, "Failed to expand regular concurrent tasks");
}
}

shared
.handlers
.num_established_peer_connections_change
Expand Down Expand Up @@ -550,16 +535,7 @@ where
.num_established_peer_connections
.fetch_sub(1, Ordering::SeqCst)
- 1;
if num_established_peer_connections == CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get()
{
// The previous peer count was over the threshold, reclaim the quota.
if let Err(error) = shared.rate_limiter.shrink_kademlia_semaphore() {
warn!(%error, "Failed to shrink Kademlia concurrent tasks");
}
if let Err(error) = shared.rate_limiter.shrink_regular_semaphore() {
warn!(%error, "Failed to shrink regular concurrent tasks");
}
}

shared
.handlers
.num_established_peer_connections_change
Expand Down
11 changes: 6 additions & 5 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::protocols::request_response::request_response_factory::RequestFailure;
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::{RateLimiter, RateLimiterPermit};
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::Handler;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
Expand All @@ -13,6 +13,7 @@ use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;

/// Represents Kademlia events (RoutablePeer, PendingRoutablePeer, UnroutablePeer).
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -55,13 +56,13 @@ pub(crate) enum Command {
GetValue {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerRecord>,
permit: RateLimiterPermit,
permit: OwnedSemaphorePermit,
},
PutValue {
key: Multihash,
value: Vec<u8>,
result_sender: mpsc::UnboundedSender<()>,
permit: RateLimiterPermit,
permit: OwnedSemaphorePermit,
},
Subscribe {
topic: Sha256Topic,
Expand All @@ -79,7 +80,7 @@ pub(crate) enum Command {
GetClosestPeers {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: Option<RateLimiterPermit>,
permit: Option<OwnedSemaphorePermit>,
},
GenericRequest {
peer_id: PeerId,
Expand All @@ -90,7 +91,7 @@ pub(crate) enum Command {
GetProviders {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: Option<RateLimiterPermit>,
permit: Option<OwnedSemaphorePermit>,
},
BanPeer {
peer_id: PeerId,
Expand Down
101 changes: 12 additions & 89 deletions crates/subspace-networking/src/utils/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,17 @@
pub(crate) mod resizable_semaphore;
#[cfg(test)]
mod tests;

use crate::utils::rate_limiter::resizable_semaphore::{
ResizableSemaphore, ResizableSemaphorePermit, SemaphoreError,
};
use std::num::NonZeroUsize;
use tracing::{debug, trace};

/// Base limit for number of concurrent tasks initiated towards Kademlia.
///
/// We restrict this so we can manage outgoing requests a bit better by cancelling low-priority
/// requests, but this value will be boosted depending on number of connected peers.
const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize = NonZeroUsize::new(15).expect("Not zero; qed");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 15;
/// Base limit for number of any concurrent tasks except Kademlia.
///
/// We configure total number of streams per connection to 256. Here we assume half of them might be
/// incoming and half outgoing, we also leave a small buffer of streams just in case.
///
/// We restrict this so we don't exceed number of streams for single peer, but this value will be
/// boosted depending on number of connected peers.
const REGULAR_BASE_CONCURRENT_TASKS: NonZeroUsize =
NonZeroUsize::new(50 - KADEMLIA_BASE_CONCURRENT_TASKS.get()).expect("Not zero; qed");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
pub(crate) const REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 25;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::debug;

/// Defines the minimum size of the "connection limit semaphore".
const MINIMUM_CONNECTIONS_SEMAPHORE_SIZE: usize = 3;

/// Empiric parameter for connection timeout and retry parameters (total retries and backoff time).
const CONNECTION_TIMEOUT_PARAMETER: usize = 9;

#[derive(Debug)]
pub(crate) struct RateLimiterPermit {
/// Limits Kademlia substreams.
_substream_limit_permit: ResizableSemaphorePermit,

/// Limits outgoing connections.
_connection_limit_permit: ResizableSemaphorePermit,
}
const CONNECTION_TIMEOUT_PARAMETER: usize = 2;

#[derive(Debug)]
pub(crate) struct RateLimiter {
kademlia_tasks_semaphore: ResizableSemaphore,
regular_tasks_semaphore: ResizableSemaphore,
connections_semaphore: ResizableSemaphore,
connections_semaphore: Arc<Semaphore>,
}

impl RateLimiter {
Expand All @@ -63,9 +24,7 @@ impl RateLimiter {
debug!(%out_connections, %pending_out_connections, %permits, "Rate limiter was instantiated.");

Self {
kademlia_tasks_semaphore: ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS),
regular_tasks_semaphore: ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS),
connections_semaphore: ResizableSemaphore::new(permits),
connections_semaphore: Arc::new(Semaphore::new(permits.get())),
}
}

Expand All @@ -91,47 +50,11 @@ impl RateLimiter {
.max(minimum_semaphore_size)
}

pub(crate) async fn acquire_regular_permit(&self) -> RateLimiterPermit {
let connections_permit = self.connections_semaphore.acquire().await;
let substream_permit = self.regular_tasks_semaphore.acquire().await;

RateLimiterPermit {
_connection_limit_permit: connections_permit,
_substream_limit_permit: substream_permit,
}
}

pub(crate) fn expand_regular_semaphore(&self) -> Result<(), SemaphoreError> {
self.regular_tasks_semaphore
.expand(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER)
.map(|old_capacity| trace!(%old_capacity, "Expand regular semaphore."))
}

pub(crate) fn shrink_regular_semaphore(&self) -> Result<(), SemaphoreError> {
self.regular_tasks_semaphore
.shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER)
.map(|old_capacity| trace!(%old_capacity, "Shrink regular semaphore."))
}

pub(crate) async fn acquire_kademlia_permit(&self) -> RateLimiterPermit {
let connections_permit = self.connections_semaphore.acquire().await;
let substream_permit = self.kademlia_tasks_semaphore.acquire().await;

RateLimiterPermit {
_connection_limit_permit: connections_permit,
_substream_limit_permit: substream_permit,
}
}

pub(crate) fn expand_kademlia_semaphore(&self) -> Result<(), SemaphoreError> {
self.kademlia_tasks_semaphore
.expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER)
.map(|old_capacity| trace!(%old_capacity, "Expand kademlia semaphore."))
}

pub(crate) fn shrink_kademlia_semaphore(&self) -> Result<(), SemaphoreError> {
self.kademlia_tasks_semaphore
.shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER)
.map(|old_capacity| trace!(%old_capacity, "Shrink kademlia semaphore."))
pub(crate) async fn acquire_permit(&self) -> OwnedSemaphorePermit {
self.connections_semaphore
.clone()
.acquire_owned()
.await
.expect("We never close semaphore.")
}
}
Loading

0 comments on commit 5fe09ce

Please sign in to comment.