From f07d0671eb1c227b4ba0c72b379795b90bfa7fd7 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 15 Dec 2023 14:32:09 +0700 Subject: [PATCH 1/5] networking: Add random walking piece provider. --- crates/subspace-networking/src/node.rs | 16 ++++ crates/subspace-networking/src/utils.rs | 1 + .../utils/random_walking_piece_provider.rs | 90 +++++++++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 crates/subspace-networking/src/utils/random_walking_piece_provider.rs diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 5f15dd0106..c3bced8c7d 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -399,6 +399,14 @@ impl Node { pub async fn get_closest_peers( &self, key: Multihash, + ) -> Result, GetClosestPeersError> { + self.get_closest_peers_internal(key).await + } + + /// Get closest peers by multihash key using Kademlia DHT. + async fn get_closest_peers_internal( + &self, + key: Multihash, ) -> Result, GetClosestPeersError> { let permit = self.shared.rate_limiter.acquire_kademlia_permit().await; trace!(?key, "Starting 'GetClosestPeers' request."); @@ -588,6 +596,14 @@ impl NodeRequestsBatchHandle { ) -> Result, GetProvidersError> { self.node.get_providers_internal(key, false).await } + + /// Get closest peers by key. Initiate 'find_node' Kademlia operation. + pub async fn get_closest_peers( + &mut self, + key: Multihash, + ) -> Result, GetClosestPeersError> { + self.node.get_closest_peers_internal(key).await + } /// Sends the generic request to the peer and awaits the result. pub async fn send_generic_request( &mut self, diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 31ecae37a0..09adf161c6 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -2,6 +2,7 @@ pub mod multihash; pub mod piece_provider; +pub mod random_walking_piece_provider; pub(crate) mod rate_limiter; #[cfg(test)] mod tests; diff --git a/crates/subspace-networking/src/utils/random_walking_piece_provider.rs b/crates/subspace-networking/src/utils/random_walking_piece_provider.rs new file mode 100644 index 0000000000..7ac4bb7103 --- /dev/null +++ b/crates/subspace-networking/src/utils/random_walking_piece_provider.rs @@ -0,0 +1,90 @@ +//! Provides methods to retrieve pieces from DSN L1 via random walk. + +use crate::utils::piece_provider::PieceValidator; +use crate::{Node, PieceByIndexRequest, PieceByIndexResponse}; +use futures::StreamExt; +use libp2p::PeerId; +use subspace_core_primitives::{Piece, PieceIndex}; +use tracing::{debug, trace, warn}; + +/// Piece provider with cancellation and optional piece validator. +pub struct RandomWalkingPieceProvider { + node: Node, + piece_validator: Option, +} + +impl RandomWalkingPieceProvider +where + PV: PieceValidator, +{ + /// Creates new piece provider. + pub fn new(node: Node, piece_validator: Option) -> Self { + Self { + node, + piece_validator, + } + } + + /// Get piece from L1 + pub async fn get_piece(&self, piece_index: PieceIndex, walking_rounds: usize) -> Option { + for round in 0..walking_rounds { + debug!(%piece_index, round, "Random walk round"); + + let result = self.get_piece_by_random_walking(piece_index).await; + + if result.is_some() { + return result; + } + } + + debug!(%piece_index, "Random walking piece retrieval failed."); + + None + } + + /// Get piece from L1 by random walking + async fn get_piece_by_random_walking(&self, piece_index: PieceIndex) -> Option { + trace!(%piece_index, "get_piece_by_random_walking round"); + + // Random walk key + let key = PeerId::random(); + + let mut request_batch = self.node.get_requests_batch_handle().await; + let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await; + + match get_closest_peers_result { + Ok(mut get_closest_peers_stream) => { + while let Some(peer_id) = get_closest_peers_stream.next().await { + trace!(%piece_index, %peer_id, "get_closest_peers returned an item"); + + let request_result = request_batch + .send_generic_request(peer_id, PieceByIndexRequest { piece_index }) + .await; + + match request_result { + Ok(PieceByIndexResponse { piece: Some(piece) }) => { + trace!(%peer_id, %piece_index, ?key, "Piece request succeeded."); + + if let Some(validator) = &self.piece_validator { + return validator.validate_piece(peer_id, piece_index, piece).await; + } else { + return Some(piece); + } + } + Ok(PieceByIndexResponse { piece: None }) => { + debug!(%peer_id, %piece_index, ?key, "Piece request returned empty piece."); + } + Err(error) => { + debug!(%peer_id, %piece_index, ?key, ?error, "Piece request failed."); + } + } + } + } + Err(err) => { + warn!(%piece_index, ?key, ?err, "get_closest_peers returned an error"); + } + } + + None + } +} From 1a0038fcb2c38c9486e6da63fe22473e110010ab Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 15 Dec 2023 15:34:23 +0700 Subject: [PATCH 2/5] farmer: Add random walk piece acquisition. --- .../src/bin/subspace-farmer/commands/farm.rs | 22 +++++---- .../src/utils/farmer_piece_getter.rs | 48 ++++++++++++++----- .../src/utils/piece_validator.rs | 6 ++- 3 files changed, 52 insertions(+), 24 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index ce23388eaf..4e9bccf53d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -37,6 +37,7 @@ use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::identity::{ed25519, Keypair}; use subspace_networking::libp2p::Multiaddr; use subspace_networking::utils::piece_provider::PieceProvider; +use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider; use subspace_proof_of_space::Table; use tempfile::TempDir; use tokio::sync::Semaphore; @@ -390,20 +391,23 @@ where .map_err(|error| anyhow::anyhow!(error))?; // TODO: Consider introducing and using global in-memory segment header cache (this comment is // in multiple files) - let segment_commitments_cache = Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE)); - let piece_provider = PieceProvider::new( + let segment_commitments_cache = Arc::new(Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE))); + let validator = Some(SegmentCommitmentPieceValidator::new( node.clone(), - Some(SegmentCommitmentPieceValidator::new( - node.clone(), - node_client.clone(), - kzg.clone(), - segment_commitments_cache, - )), - ); + node_client.clone(), + kzg.clone(), + segment_commitments_cache, + )); + + let piece_provider = PieceProvider::new(node.clone(), validator.clone()); + + let random_walking_piece_provider = + RandomWalkingPieceProvider::new(node.clone(), validator.clone()); let piece_getter = Arc::new(FarmerPieceGetter::new( node.clone(), piece_provider, + random_walking_piece_provider, piece_cache.clone(), node_client.clone(), Arc::clone(&readers_and_pieces), diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 81bf56303e..02f00999df 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -12,12 +12,16 @@ use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; +use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider; use subspace_networking::Node; use tracing::{debug, error, trace}; +const MAX_RANDOM_WALK_ROUNDS: usize = 50; + pub struct FarmerPieceGetter { node: Node, piece_provider: PieceProvider, + random_walking_piece_provider: RandomWalkingPieceProvider, piece_cache: PieceCache, node_client: NC, readers_and_pieces: Arc>>, @@ -27,6 +31,7 @@ impl FarmerPieceGetter { pub fn new( node: Node, piece_provider: PieceProvider, + random_walking_piece_provider: RandomWalkingPieceProvider, piece_cache: PieceCache, node_client: NC, readers_and_pieces: Arc>>, @@ -34,6 +39,7 @@ impl FarmerPieceGetter { Self { node, piece_provider, + random_walking_piece_provider, piece_cache, node_client, readers_and_pieces, @@ -118,21 +124,37 @@ where let connected_peers = HashSet::::from_iter(self.node.connected_peers().await?); if connected_peers.is_empty() { debug!(%piece_index, "Cannot acquire piece from DSN L1: no connected peers"); - - return Ok(None); + } else { + for peer_id in connected_peers.iter() { + let maybe_piece = self + .piece_provider + .get_piece_from_peer(*peer_id, piece_index) + .await; + + if maybe_piece.is_some() { + trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded"); + + return Ok(maybe_piece); + } + } } - for peer_id in connected_peers.iter() { - let maybe_piece = self - .piece_provider - .get_piece_from_peer(*peer_id, piece_index) - .await; - - if maybe_piece.is_some() { - trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded"); - - return Ok(maybe_piece); - } + trace!(%piece_index, "Getting piece from DSN L1 using random walk."); + let random_walk_result = self + .random_walking_piece_provider + .get_piece(piece_index, MAX_RANDOM_WALK_ROUNDS) + .await; + + if random_walk_result.is_some() { + trace!(%piece_index, "DSN L1 lookup via random walk succeeded"); + + return Ok(random_walk_result); + } else { + debug!( + %piece_index, + max_rounds=%MAX_RANDOM_WALK_ROUNDS, + "Cannot acquire piece from DSN L1: random walk failed" + ); } debug!( diff --git a/crates/subspace-farmer/src/utils/piece_validator.rs b/crates/subspace-farmer/src/utils/piece_validator.rs index 089ac16a18..446acf612e 100644 --- a/crates/subspace-farmer/src/utils/piece_validator.rs +++ b/crates/subspace-farmer/src/utils/piece_validator.rs @@ -2,6 +2,7 @@ use crate::NodeClient; use async_trait::async_trait; use lru::LruCache; use parking_lot::Mutex; +use std::sync::Arc; use subspace_archiving::archiver::is_piece_valid; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{Piece, PieceIndex, SegmentCommitment, SegmentIndex}; @@ -10,11 +11,12 @@ use subspace_networking::utils::piece_provider::PieceValidator; use subspace_networking::Node; use tracing::{error, warn}; +#[derive(Clone)] pub struct SegmentCommitmentPieceValidator { dsn_node: Node, node_client: NC, kzg: Kzg, - segment_commitment_cache: Mutex>, + segment_commitment_cache: Arc>>, } impl SegmentCommitmentPieceValidator { @@ -22,7 +24,7 @@ impl SegmentCommitmentPieceValidator { dsn_node: Node, node_client: NC, kzg: Kzg, - segment_commitment_cache: Mutex>, + segment_commitment_cache: Arc>>, ) -> Self { Self { dsn_node, From 9d6a43231d4537fd4b9a1688573a17dd35e57e49 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 20 Dec 2023 13:42:09 +0700 Subject: [PATCH 3/5] Refactor piece acquisition from archival storage. --- .../src/bin/subspace-farmer/commands/farm.rs | 7 - .../src/utils/farmer_piece_getter.rs | 50 ++----- crates/subspace-networking/src/utils.rs | 1 - .../src/utils/piece_provider.rs | 131 ++++++++++++++++++ .../utils/random_walking_piece_provider.rs | 90 ------------ 5 files changed, 139 insertions(+), 140 deletions(-) delete mode 100644 crates/subspace-networking/src/utils/random_walking_piece_provider.rs diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 4e9bccf53d..52e2b93685 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -37,7 +37,6 @@ use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::identity::{ed25519, Keypair}; use subspace_networking::libp2p::Multiaddr; use subspace_networking::utils::piece_provider::PieceProvider; -use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider; use subspace_proof_of_space::Table; use tempfile::TempDir; use tokio::sync::Semaphore; @@ -398,16 +397,10 @@ where kzg.clone(), segment_commitments_cache, )); - let piece_provider = PieceProvider::new(node.clone(), validator.clone()); - let random_walking_piece_provider = - RandomWalkingPieceProvider::new(node.clone(), validator.clone()); - let piece_getter = Arc::new(FarmerPieceGetter::new( - node.clone(), piece_provider, - random_walking_piece_provider, piece_cache.clone(), node_client.clone(), Arc::clone(&readers_and_pieces), diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 02f00999df..b245a43a09 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -3,25 +3,19 @@ use crate::utils::readers_and_pieces::ReadersAndPieces; use crate::NodeClient; use async_trait::async_trait; use parking_lot::Mutex; -use std::collections::HashSet; use std::error::Error; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex}; use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::kad::RecordKey; -use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; -use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider; -use subspace_networking::Node; use tracing::{debug, error, trace}; const MAX_RANDOM_WALK_ROUNDS: usize = 50; pub struct FarmerPieceGetter { - node: Node, piece_provider: PieceProvider, - random_walking_piece_provider: RandomWalkingPieceProvider, piece_cache: PieceCache, node_client: NC, readers_and_pieces: Arc>>, @@ -29,17 +23,13 @@ pub struct FarmerPieceGetter { impl FarmerPieceGetter { pub fn new( - node: Node, piece_provider: PieceProvider, - random_walking_piece_provider: RandomWalkingPieceProvider, piece_cache: PieceCache, node_client: NC, readers_and_pieces: Arc>>, ) -> Self { Self { - node, piece_provider, - random_walking_piece_provider, piece_cache, node_client, readers_and_pieces, @@ -119,47 +109,23 @@ where } // L1 piece acquisition - // TODO: consider using retry policy for L1 lookups as well. - trace!(%piece_index, "Getting piece from DSN L1"); - let connected_peers = HashSet::::from_iter(self.node.connected_peers().await?); - if connected_peers.is_empty() { - debug!(%piece_index, "Cannot acquire piece from DSN L1: no connected peers"); - } else { - for peer_id in connected_peers.iter() { - let maybe_piece = self - .piece_provider - .get_piece_from_peer(*peer_id, piece_index) - .await; - - if maybe_piece.is_some() { - trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded"); - - return Ok(maybe_piece); - } - } - } + trace!(%piece_index, "Getting piece from DSN L1."); - trace!(%piece_index, "Getting piece from DSN L1 using random walk."); - let random_walk_result = self - .random_walking_piece_provider - .get_piece(piece_index, MAX_RANDOM_WALK_ROUNDS) + let archival_storage_search_result = self + .piece_provider + .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS) .await; - if random_walk_result.is_some() { - trace!(%piece_index, "DSN L1 lookup via random walk succeeded"); + if archival_storage_search_result.is_some() { + trace!(%piece_index, "DSN L1 lookup succeeded"); - return Ok(random_walk_result); + return Ok(archival_storage_search_result); } else { - debug!( - %piece_index, - max_rounds=%MAX_RANDOM_WALK_ROUNDS, - "Cannot acquire piece from DSN L1: random walk failed" - ); + debug!(%piece_index, "Cannot acquire piece from DSN L1"); } debug!( %piece_index, - connected_peers=%connected_peers.len(), "Cannot acquire piece: all methods yielded empty result" ); Ok(None) diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 09adf161c6..31ecae37a0 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -2,7 +2,6 @@ pub mod multihash; pub mod piece_provider; -pub mod random_walking_piece_provider; pub(crate) mod rate_limiter; #[cfg(test)] mod tests; diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index ed92f74f04..8fae011dd9 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -7,6 +7,7 @@ use backoff::future::retry; use backoff::ExponentialBackoff; use futures::StreamExt; use libp2p::PeerId; +use std::collections::HashSet; use std::error::Error; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -205,4 +206,134 @@ where None } + + /// Get piece from archival storage (L1). The algorithm tries to get a piece from currently + /// connected peers and falls back to random walking. + pub async fn get_piece_from_archival_storage( + &self, + piece_index: PieceIndex, + max_random_walking_rounds: usize, + ) -> Option { + // TODO: consider using retry policy for L1 lookups as well. + trace!(%piece_index, "Getting piece from archival storage.."); + + let connected_peers = { + let connected_peers = match self.node.connected_peers().await { + Ok(connected_peers) => connected_peers, + Err(err) => { + debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)"); + + Default::default() + } + }; + + HashSet::::from_iter(connected_peers) + }; + + if connected_peers.is_empty() { + debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)"); + } else { + for peer_id in connected_peers.iter() { + let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await; + + if maybe_piece.is_some() { + trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded"); + + return maybe_piece; + } + } + } + + trace!(%piece_index, "Getting piece from DSN L1 using random walk."); + let random_walk_result = self + .get_piece_by_random_walking(piece_index, max_random_walking_rounds) + .await; + + if random_walk_result.is_some() { + trace!(%piece_index, "DSN L1 lookup via random walk succeeded"); + + return random_walk_result; + } else { + debug!( + %piece_index, + %max_random_walking_rounds, + "Cannot acquire piece from DSN L1: random walk failed" + ); + } + + None + } + + /// Get piece from L1 by random walking + async fn get_piece_by_random_walking( + &self, + piece_index: PieceIndex, + walking_rounds: usize, + ) -> Option { + for round in 0..walking_rounds { + debug!(%piece_index, round, "Random walk round"); + + let result = self + .get_piece_by_random_walking_from_single_round(piece_index, round) + .await; + + if result.is_some() { + return result; + } + } + + debug!(%piece_index, "Random walking piece retrieval failed."); + + None + } + + /// Get piece from L1 by random walking (single round) + async fn get_piece_by_random_walking_from_single_round( + &self, + piece_index: PieceIndex, + round: usize, + ) -> Option { + trace!(%piece_index, "get_piece_by_random_walking round"); + + // Random walk key + let key = PeerId::random(); + + let mut request_batch = self.node.get_requests_batch_handle().await; + let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await; + + match get_closest_peers_result { + Ok(mut get_closest_peers_stream) => { + while let Some(peer_id) = get_closest_peers_stream.next().await { + trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item"); + + let request_result = request_batch + .send_generic_request(peer_id, PieceByIndexRequest { piece_index }) + .await; + + match request_result { + Ok(PieceByIndexResponse { piece: Some(piece) }) => { + trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded."); + + if let Some(validator) = &self.piece_validator { + return validator.validate_piece(peer_id, piece_index, piece).await; + } else { + return Some(piece); + } + } + Ok(PieceByIndexResponse { piece: None }) => { + debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece."); + } + Err(error) => { + debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."); + } + } + } + } + Err(err) => { + warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"); + } + } + + None + } } diff --git a/crates/subspace-networking/src/utils/random_walking_piece_provider.rs b/crates/subspace-networking/src/utils/random_walking_piece_provider.rs deleted file mode 100644 index 7ac4bb7103..0000000000 --- a/crates/subspace-networking/src/utils/random_walking_piece_provider.rs +++ /dev/null @@ -1,90 +0,0 @@ -//! Provides methods to retrieve pieces from DSN L1 via random walk. - -use crate::utils::piece_provider::PieceValidator; -use crate::{Node, PieceByIndexRequest, PieceByIndexResponse}; -use futures::StreamExt; -use libp2p::PeerId; -use subspace_core_primitives::{Piece, PieceIndex}; -use tracing::{debug, trace, warn}; - -/// Piece provider with cancellation and optional piece validator. -pub struct RandomWalkingPieceProvider { - node: Node, - piece_validator: Option, -} - -impl RandomWalkingPieceProvider -where - PV: PieceValidator, -{ - /// Creates new piece provider. - pub fn new(node: Node, piece_validator: Option) -> Self { - Self { - node, - piece_validator, - } - } - - /// Get piece from L1 - pub async fn get_piece(&self, piece_index: PieceIndex, walking_rounds: usize) -> Option { - for round in 0..walking_rounds { - debug!(%piece_index, round, "Random walk round"); - - let result = self.get_piece_by_random_walking(piece_index).await; - - if result.is_some() { - return result; - } - } - - debug!(%piece_index, "Random walking piece retrieval failed."); - - None - } - - /// Get piece from L1 by random walking - async fn get_piece_by_random_walking(&self, piece_index: PieceIndex) -> Option { - trace!(%piece_index, "get_piece_by_random_walking round"); - - // Random walk key - let key = PeerId::random(); - - let mut request_batch = self.node.get_requests_batch_handle().await; - let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await; - - match get_closest_peers_result { - Ok(mut get_closest_peers_stream) => { - while let Some(peer_id) = get_closest_peers_stream.next().await { - trace!(%piece_index, %peer_id, "get_closest_peers returned an item"); - - let request_result = request_batch - .send_generic_request(peer_id, PieceByIndexRequest { piece_index }) - .await; - - match request_result { - Ok(PieceByIndexResponse { piece: Some(piece) }) => { - trace!(%peer_id, %piece_index, ?key, "Piece request succeeded."); - - if let Some(validator) = &self.piece_validator { - return validator.validate_piece(peer_id, piece_index, piece).await; - } else { - return Some(piece); - } - } - Ok(PieceByIndexResponse { piece: None }) => { - debug!(%peer_id, %piece_index, ?key, "Piece request returned empty piece."); - } - Err(error) => { - debug!(%peer_id, %piece_index, ?key, ?error, "Piece request failed."); - } - } - } - } - Err(err) => { - warn!(%piece_index, ?key, ?err, "get_closest_peers returned an error"); - } - } - - None - } -} From a3a19e1953e59c670ab2313411f1ea074ff23b62 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 20 Dec 2023 16:10:21 +0700 Subject: [PATCH 4/5] networking: Fix get_closest_peers in batches. --- crates/subspace-networking/src/node.rs | 12 +++++++++--- crates/subspace-networking/src/node_runner.rs | 2 +- crates/subspace-networking/src/shared.rs | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index c3bced8c7d..6e038170fa 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -400,15 +400,21 @@ impl Node { &self, key: Multihash, ) -> Result, GetClosestPeersError> { - self.get_closest_peers_internal(key).await + self.get_closest_peers_internal(key, true).await } /// Get closest peers by multihash key using Kademlia DHT. async fn get_closest_peers_internal( &self, key: Multihash, + acquire_permit: bool, ) -> Result, GetClosestPeersError> { - let permit = self.shared.rate_limiter.acquire_kademlia_permit().await; + let permit = if acquire_permit { + Some(self.shared.rate_limiter.acquire_kademlia_permit().await) + } else { + None + }; + trace!(?key, "Starting 'GetClosestPeers' request."); let (result_sender, result_receiver) = mpsc::unbounded(); @@ -602,7 +608,7 @@ impl NodeRequestsBatchHandle { &mut self, key: Multihash, ) -> Result, GetClosestPeersError> { - self.node.get_closest_peers_internal(key).await + self.node.get_closest_peers_internal(key, false).await } /// Sends the generic request to the peer and awaits the result. pub async fn send_generic_request( diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index ef6e17683c..22bcdbbbe4 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -60,7 +60,7 @@ enum QueryResultSender { ClosestPeers { sender: mpsc::UnboundedSender, // Just holding onto permit while data structure is not dropped - _permit: RateLimiterPermit, + _permit: Option, }, Providers { sender: mpsc::UnboundedSender, diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 8ab72c54f5..3121bb9a41 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -79,7 +79,7 @@ pub(crate) enum Command { GetClosestPeers { key: Multihash, result_sender: mpsc::UnboundedSender, - permit: RateLimiterPermit, + permit: Option, }, GenericRequest { peer_id: PeerId, From 015674ad65db17adfa8763a443ed2347a75d78ee Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Thu, 21 Dec 2023 14:45:24 +0700 Subject: [PATCH 5/5] Refactor farmer piece provider. --- crates/subspace-farmer/src/utils/farmer_piece_getter.rs | 6 ++---- crates/subspace-networking/examples/benchmark.rs | 4 ++-- crates/subspace-networking/src/utils/piece_provider.rs | 5 +++-- crates/subspace-service/src/sync_from_dsn/import_blocks.rs | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index b245a43a09..4111f96280 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -12,7 +12,7 @@ use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; use tracing::{debug, error, trace}; -const MAX_RANDOM_WALK_ROUNDS: usize = 50; +const MAX_RANDOM_WALK_ROUNDS: usize = 35; pub struct FarmerPieceGetter { piece_provider: PieceProvider, @@ -67,7 +67,7 @@ where trace!(%piece_index, "Getting piece from DSN L2 cache"); let maybe_piece = self .piece_provider - .get_piece(piece_index, Self::convert_retry_policy(retry_policy)) + .get_piece_from_dsn_cache(piece_index, Self::convert_retry_policy(retry_policy)) .await?; if maybe_piece.is_some() { @@ -120,8 +120,6 @@ where trace!(%piece_index, "DSN L1 lookup succeeded"); return Ok(archival_storage_search_result); - } else { - debug!(%piece_index, "Cannot acquire piece from DSN L1"); } debug!( diff --git a/crates/subspace-networking/examples/benchmark.rs b/crates/subspace-networking/examples/benchmark.rs index 80d478b86d..c18e2f4f41 100644 --- a/crates/subspace-networking/examples/benchmark.rs +++ b/crates/subspace-networking/examples/benchmark.rs @@ -157,7 +157,7 @@ async fn simple_benchmark(node: Node, max_pieces: usize, start_with: usize, retr let piece_index = PieceIndex::from(i as u64); let start = Instant::now(); let piece = piece_provider - .get_piece(piece_index, RetryPolicy::Limited(retries)) + .get_piece_from_dsn_cache(piece_index, RetryPolicy::Limited(retries)) .await; let end = Instant::now(); let duration = end.duration_since(start); @@ -220,7 +220,7 @@ async fn parallel_benchmark( .expect("Semaphore cannot be closed."); let semaphore_acquired = Instant::now(); let maybe_piece = piece_provider - .get_piece(piece_index, RetryPolicy::Limited(retries)) + .get_piece_from_dsn_cache(piece_index, RetryPolicy::Limited(retries)) .await; let end = Instant::now(); diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 8fae011dd9..c3e0e6901c 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -120,8 +120,9 @@ where None } - /// Returns piece by its index. Uses retry policy for error handling. - pub async fn get_piece( + /// Returns piece by its index from farmer's piece cache (L2). + /// Uses retry policy for error handling. + pub async fn get_piece_from_dsn_cache( &self, piece_index: PieceIndex, retry_policy: RetryPolicy, diff --git a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs index 8a0c8eb363..a3b271250e 100644 --- a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs +++ b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs @@ -280,7 +280,7 @@ where } }; let maybe_piece = match piece_provider - .get_piece( + .get_piece_from_dsn_cache( piece_index, RetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), )