diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 01afbcf720..37e00987fe 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -393,19 +393,16 @@ where .map_err(|error| anyhow::anyhow!(error))?; // TODO: Consider introducing and using global in-memory segment header cache (this comment is // in multiple files) - let segment_commitments_cache = Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE)); - let piece_provider = PieceProvider::new( + let segment_commitments_cache = Arc::new(Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE))); + let validator = Some(SegmentCommitmentPieceValidator::new( node.clone(), - Some(SegmentCommitmentPieceValidator::new( - node.clone(), - node_client.clone(), - kzg.clone(), - segment_commitments_cache, - )), - ); + node_client.clone(), + kzg.clone(), + segment_commitments_cache, + )); + let piece_provider = PieceProvider::new(node.clone(), validator.clone()); let piece_getter = Arc::new(FarmerPieceGetter::new( - node.clone(), piece_provider, piece_cache.clone(), node_client.clone(), diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 81bf56303e..4111f96280 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -3,20 +3,18 @@ use crate::utils::readers_and_pieces::ReadersAndPieces; use crate::NodeClient; use async_trait::async_trait; use parking_lot::Mutex; -use std::collections::HashSet; use std::error::Error; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex}; use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::kad::RecordKey; -use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; -use subspace_networking::Node; use tracing::{debug, error, trace}; +const MAX_RANDOM_WALK_ROUNDS: usize = 35; + pub struct FarmerPieceGetter { - node: Node, piece_provider: PieceProvider, piece_cache: PieceCache, node_client: NC, @@ -25,14 +23,12 @@ pub struct FarmerPieceGetter { impl FarmerPieceGetter { pub fn new( - node: Node, piece_provider: PieceProvider, piece_cache: PieceCache, node_client: NC, readers_and_pieces: Arc>>, ) -> Self { Self { - node, piece_provider, piece_cache, node_client, @@ -71,7 +67,7 @@ where trace!(%piece_index, "Getting piece from DSN L2 cache"); let maybe_piece = self .piece_provider - .get_piece(piece_index, Self::convert_retry_policy(retry_policy)) + .get_piece_from_dsn_cache(piece_index, Self::convert_retry_policy(retry_policy)) .await?; if maybe_piece.is_some() { @@ -113,31 +109,21 @@ where } // L1 piece acquisition - // TODO: consider using retry policy for L1 lookups as well. - trace!(%piece_index, "Getting piece from DSN L1"); - let connected_peers = HashSet::::from_iter(self.node.connected_peers().await?); - if connected_peers.is_empty() { - debug!(%piece_index, "Cannot acquire piece from DSN L1: no connected peers"); - - return Ok(None); - } + trace!(%piece_index, "Getting piece from DSN L1."); - for peer_id in connected_peers.iter() { - let maybe_piece = self - .piece_provider - .get_piece_from_peer(*peer_id, piece_index) - .await; + let archival_storage_search_result = self + .piece_provider + .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS) + .await; - if maybe_piece.is_some() { - trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded"); + if archival_storage_search_result.is_some() { + trace!(%piece_index, "DSN L1 lookup succeeded"); - return Ok(maybe_piece); - } + return Ok(archival_storage_search_result); } debug!( %piece_index, - connected_peers=%connected_peers.len(), "Cannot acquire piece: all methods yielded empty result" ); Ok(None) diff --git a/crates/subspace-farmer/src/utils/piece_validator.rs b/crates/subspace-farmer/src/utils/piece_validator.rs index 089ac16a18..446acf612e 100644 --- a/crates/subspace-farmer/src/utils/piece_validator.rs +++ b/crates/subspace-farmer/src/utils/piece_validator.rs @@ -2,6 +2,7 @@ use crate::NodeClient; use async_trait::async_trait; use lru::LruCache; use parking_lot::Mutex; +use std::sync::Arc; use subspace_archiving::archiver::is_piece_valid; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{Piece, PieceIndex, SegmentCommitment, SegmentIndex}; @@ -10,11 +11,12 @@ use subspace_networking::utils::piece_provider::PieceValidator; use subspace_networking::Node; use tracing::{error, warn}; +#[derive(Clone)] pub struct SegmentCommitmentPieceValidator { dsn_node: Node, node_client: NC, kzg: Kzg, - segment_commitment_cache: Mutex>, + segment_commitment_cache: Arc>>, } impl SegmentCommitmentPieceValidator { @@ -22,7 +24,7 @@ impl SegmentCommitmentPieceValidator { dsn_node: Node, node_client: NC, kzg: Kzg, - segment_commitment_cache: Mutex>, + segment_commitment_cache: Arc>>, ) -> Self { Self { dsn_node, diff --git a/crates/subspace-networking/examples/benchmark.rs b/crates/subspace-networking/examples/benchmark.rs index 80d478b86d..c18e2f4f41 100644 --- a/crates/subspace-networking/examples/benchmark.rs +++ b/crates/subspace-networking/examples/benchmark.rs @@ -157,7 +157,7 @@ async fn simple_benchmark(node: Node, max_pieces: usize, start_with: usize, retr let piece_index = PieceIndex::from(i as u64); let start = Instant::now(); let piece = piece_provider - .get_piece(piece_index, RetryPolicy::Limited(retries)) + .get_piece_from_dsn_cache(piece_index, RetryPolicy::Limited(retries)) .await; let end = Instant::now(); let duration = end.duration_since(start); @@ -220,7 +220,7 @@ async fn parallel_benchmark( .expect("Semaphore cannot be closed."); let semaphore_acquired = Instant::now(); let maybe_piece = piece_provider - .get_piece(piece_index, RetryPolicy::Limited(retries)) + .get_piece_from_dsn_cache(piece_index, RetryPolicy::Limited(retries)) .await; let end = Instant::now(); diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 5f15dd0106..6e038170fa 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -400,7 +400,21 @@ impl Node { &self, key: Multihash, ) -> Result, GetClosestPeersError> { - let permit = self.shared.rate_limiter.acquire_kademlia_permit().await; + self.get_closest_peers_internal(key, true).await + } + + /// Get closest peers by multihash key using Kademlia DHT. + async fn get_closest_peers_internal( + &self, + key: Multihash, + acquire_permit: bool, + ) -> Result, GetClosestPeersError> { + let permit = if acquire_permit { + Some(self.shared.rate_limiter.acquire_kademlia_permit().await) + } else { + None + }; + trace!(?key, "Starting 'GetClosestPeers' request."); let (result_sender, result_receiver) = mpsc::unbounded(); @@ -588,6 +602,14 @@ impl NodeRequestsBatchHandle { ) -> Result, GetProvidersError> { self.node.get_providers_internal(key, false).await } + + /// Get closest peers by key. Initiate 'find_node' Kademlia operation. + pub async fn get_closest_peers( + &mut self, + key: Multihash, + ) -> Result, GetClosestPeersError> { + self.node.get_closest_peers_internal(key, false).await + } /// Sends the generic request to the peer and awaits the result. pub async fn send_generic_request( &mut self, diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index ef6e17683c..22bcdbbbe4 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -60,7 +60,7 @@ enum QueryResultSender { ClosestPeers { sender: mpsc::UnboundedSender, // Just holding onto permit while data structure is not dropped - _permit: RateLimiterPermit, + _permit: Option, }, Providers { sender: mpsc::UnboundedSender, diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 8ab72c54f5..3121bb9a41 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -79,7 +79,7 @@ pub(crate) enum Command { GetClosestPeers { key: Multihash, result_sender: mpsc::UnboundedSender, - permit: RateLimiterPermit, + permit: Option, }, GenericRequest { peer_id: PeerId, diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index ed92f74f04..c3e0e6901c 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -7,6 +7,7 @@ use backoff::future::retry; use backoff::ExponentialBackoff; use futures::StreamExt; use libp2p::PeerId; +use std::collections::HashSet; use std::error::Error; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -119,8 +120,9 @@ where None } - /// Returns piece by its index. Uses retry policy for error handling. - pub async fn get_piece( + /// Returns piece by its index from farmer's piece cache (L2). + /// Uses retry policy for error handling. + pub async fn get_piece_from_dsn_cache( &self, piece_index: PieceIndex, retry_policy: RetryPolicy, @@ -205,4 +207,134 @@ where None } + + /// Get piece from archival storage (L1). The algorithm tries to get a piece from currently + /// connected peers and falls back to random walking. + pub async fn get_piece_from_archival_storage( + &self, + piece_index: PieceIndex, + max_random_walking_rounds: usize, + ) -> Option { + // TODO: consider using retry policy for L1 lookups as well. + trace!(%piece_index, "Getting piece from archival storage.."); + + let connected_peers = { + let connected_peers = match self.node.connected_peers().await { + Ok(connected_peers) => connected_peers, + Err(err) => { + debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)"); + + Default::default() + } + }; + + HashSet::::from_iter(connected_peers) + }; + + if connected_peers.is_empty() { + debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)"); + } else { + for peer_id in connected_peers.iter() { + let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await; + + if maybe_piece.is_some() { + trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded"); + + return maybe_piece; + } + } + } + + trace!(%piece_index, "Getting piece from DSN L1 using random walk."); + let random_walk_result = self + .get_piece_by_random_walking(piece_index, max_random_walking_rounds) + .await; + + if random_walk_result.is_some() { + trace!(%piece_index, "DSN L1 lookup via random walk succeeded"); + + return random_walk_result; + } else { + debug!( + %piece_index, + %max_random_walking_rounds, + "Cannot acquire piece from DSN L1: random walk failed" + ); + } + + None + } + + /// Get piece from L1 by random walking + async fn get_piece_by_random_walking( + &self, + piece_index: PieceIndex, + walking_rounds: usize, + ) -> Option { + for round in 0..walking_rounds { + debug!(%piece_index, round, "Random walk round"); + + let result = self + .get_piece_by_random_walking_from_single_round(piece_index, round) + .await; + + if result.is_some() { + return result; + } + } + + debug!(%piece_index, "Random walking piece retrieval failed."); + + None + } + + /// Get piece from L1 by random walking (single round) + async fn get_piece_by_random_walking_from_single_round( + &self, + piece_index: PieceIndex, + round: usize, + ) -> Option { + trace!(%piece_index, "get_piece_by_random_walking round"); + + // Random walk key + let key = PeerId::random(); + + let mut request_batch = self.node.get_requests_batch_handle().await; + let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await; + + match get_closest_peers_result { + Ok(mut get_closest_peers_stream) => { + while let Some(peer_id) = get_closest_peers_stream.next().await { + trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item"); + + let request_result = request_batch + .send_generic_request(peer_id, PieceByIndexRequest { piece_index }) + .await; + + match request_result { + Ok(PieceByIndexResponse { piece: Some(piece) }) => { + trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded."); + + if let Some(validator) = &self.piece_validator { + return validator.validate_piece(peer_id, piece_index, piece).await; + } else { + return Some(piece); + } + } + Ok(PieceByIndexResponse { piece: None }) => { + debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece."); + } + Err(error) => { + debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."); + } + } + } + } + Err(err) => { + warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"); + } + } + + None + } } diff --git a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs index 8a0c8eb363..a3b271250e 100644 --- a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs +++ b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs @@ -280,7 +280,7 @@ where } }; let maybe_piece = match piece_provider - .get_piece( + .get_piece_from_dsn_cache( piece_index, RetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), )