Skip to content

Commit

Permalink
Merge pull request #2341 from subspace/l1-random-walker
Browse files Browse the repository at this point in the history
Introduce piece acquisition by L1 random-walking algorithm.
  • Loading branch information
nazar-pc authored Dec 21, 2023
2 parents c9fc868 + 9ff1733 commit 8f53e8b
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 45 deletions.
17 changes: 7 additions & 10 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,16 @@ where
.map_err(|error| anyhow::anyhow!(error))?;
// TODO: Consider introducing and using global in-memory segment header cache (this comment is
// in multiple files)
let segment_commitments_cache = Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE));
let piece_provider = PieceProvider::new(
let segment_commitments_cache = Arc::new(Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE)));
let validator = Some(SegmentCommitmentPieceValidator::new(
node.clone(),
Some(SegmentCommitmentPieceValidator::new(
node.clone(),
node_client.clone(),
kzg.clone(),
segment_commitments_cache,
)),
);
node_client.clone(),
kzg.clone(),
segment_commitments_cache,
));
let piece_provider = PieceProvider::new(node.clone(), validator.clone());

let piece_getter = Arc::new(FarmerPieceGetter::new(
node.clone(),
piece_provider,
piece_cache.clone(),
node_client.clone(),
Expand Down
36 changes: 11 additions & 25 deletions crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@ use crate::utils::readers_and_pieces::ReadersAndPieces;
use crate::NodeClient;
use async_trait::async_trait;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::error::Error;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy};
use subspace_networking::Node;
use tracing::{debug, error, trace};

const MAX_RANDOM_WALK_ROUNDS: usize = 35;

pub struct FarmerPieceGetter<PV, NC> {
node: Node,
piece_provider: PieceProvider<PV>,
piece_cache: PieceCache,
node_client: NC,
Expand All @@ -25,14 +23,12 @@ pub struct FarmerPieceGetter<PV, NC> {

impl<PV, NC> FarmerPieceGetter<PV, NC> {
pub fn new(
node: Node,
piece_provider: PieceProvider<PV>,
piece_cache: PieceCache,
node_client: NC,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
) -> Self {
Self {
node,
piece_provider,
piece_cache,
node_client,
Expand Down Expand Up @@ -71,7 +67,7 @@ where
trace!(%piece_index, "Getting piece from DSN L2 cache");
let maybe_piece = self
.piece_provider
.get_piece(piece_index, Self::convert_retry_policy(retry_policy))
.get_piece_from_dsn_cache(piece_index, Self::convert_retry_policy(retry_policy))
.await?;

if maybe_piece.is_some() {
Expand Down Expand Up @@ -113,31 +109,21 @@ where
}

// L1 piece acquisition
// TODO: consider using retry policy for L1 lookups as well.
trace!(%piece_index, "Getting piece from DSN L1");
let connected_peers = HashSet::<PeerId>::from_iter(self.node.connected_peers().await?);
if connected_peers.is_empty() {
debug!(%piece_index, "Cannot acquire piece from DSN L1: no connected peers");

return Ok(None);
}
trace!(%piece_index, "Getting piece from DSN L1.");

for peer_id in connected_peers.iter() {
let maybe_piece = self
.piece_provider
.get_piece_from_peer(*peer_id, piece_index)
.await;
let archival_storage_search_result = self
.piece_provider
.get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
.await;

if maybe_piece.is_some() {
trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded");
if archival_storage_search_result.is_some() {
trace!(%piece_index, "DSN L1 lookup succeeded");

return Ok(maybe_piece);
}
return Ok(archival_storage_search_result);
}

debug!(
%piece_index,
connected_peers=%connected_peers.len(),
"Cannot acquire piece: all methods yielded empty result"
);
Ok(None)
Expand Down
6 changes: 4 additions & 2 deletions crates/subspace-farmer/src/utils/piece_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::NodeClient;
use async_trait::async_trait;
use lru::LruCache;
use parking_lot::Mutex;
use std::sync::Arc;
use subspace_archiving::archiver::is_piece_valid;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{Piece, PieceIndex, SegmentCommitment, SegmentIndex};
Expand All @@ -10,19 +11,20 @@ use subspace_networking::utils::piece_provider::PieceValidator;
use subspace_networking::Node;
use tracing::{error, warn};

#[derive(Clone)]
pub struct SegmentCommitmentPieceValidator<NC> {
dsn_node: Node,
node_client: NC,
kzg: Kzg,
segment_commitment_cache: Mutex<LruCache<SegmentIndex, SegmentCommitment>>,
segment_commitment_cache: Arc<Mutex<LruCache<SegmentIndex, SegmentCommitment>>>,
}

impl<NC> SegmentCommitmentPieceValidator<NC> {
pub fn new(
dsn_node: Node,
node_client: NC,
kzg: Kzg,
segment_commitment_cache: Mutex<LruCache<SegmentIndex, SegmentCommitment>>,
segment_commitment_cache: Arc<Mutex<LruCache<SegmentIndex, SegmentCommitment>>>,
) -> Self {
Self {
dsn_node,
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-networking/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn simple_benchmark(node: Node, max_pieces: usize, start_with: usize, retr
let piece_index = PieceIndex::from(i as u64);
let start = Instant::now();
let piece = piece_provider
.get_piece(piece_index, RetryPolicy::Limited(retries))
.get_piece_from_dsn_cache(piece_index, RetryPolicy::Limited(retries))
.await;
let end = Instant::now();
let duration = end.duration_since(start);
Expand Down Expand Up @@ -220,7 +220,7 @@ async fn parallel_benchmark(
.expect("Semaphore cannot be closed.");
let semaphore_acquired = Instant::now();
let maybe_piece = piece_provider
.get_piece(piece_index, RetryPolicy::Limited(retries))
.get_piece_from_dsn_cache(piece_index, RetryPolicy::Limited(retries))
.await;

let end = Instant::now();
Expand Down
24 changes: 23 additions & 1 deletion crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,21 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
self.get_closest_peers_internal(key, true).await
}

/// Get closest peers by multihash key using Kademlia DHT.
async fn get_closest_peers_internal(
&self,
key: Multihash,
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
} else {
None
};

trace!(?key, "Starting 'GetClosestPeers' request.");

let (result_sender, result_receiver) = mpsc::unbounded();
Expand Down Expand Up @@ -588,6 +602,14 @@ impl NodeRequestsBatchHandle {
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
self.node.get_providers_internal(key, false).await
}

/// Get closest peers by key. Initiate 'find_node' Kademlia operation.
pub async fn get_closest_peers(
&mut self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
self.node.get_closest_peers_internal(key, false).await
}
/// Sends the generic request to the peer and awaits the result.
pub async fn send_generic_request<Request>(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ enum QueryResultSender {
ClosestPeers {
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: Option<RateLimiterPermit>,
},
Providers {
sender: mpsc::UnboundedSender<PeerId>,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub(crate) enum Command {
GetClosestPeers {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: RateLimiterPermit,
permit: Option<RateLimiterPermit>,
},
GenericRequest {
peer_id: PeerId,
Expand Down
136 changes: 134 additions & 2 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use backoff::future::retry;
use backoff::ExponentialBackoff;
use futures::StreamExt;
use libp2p::PeerId;
use std::collections::HashSet;
use std::error::Error;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -119,8 +120,9 @@ where
None
}

/// Returns piece by its index. Uses retry policy for error handling.
pub async fn get_piece(
/// Returns piece by its index from farmer's piece cache (L2).
/// Uses retry policy for error handling.
pub async fn get_piece_from_dsn_cache(
&self,
piece_index: PieceIndex,
retry_policy: RetryPolicy,
Expand Down Expand Up @@ -205,4 +207,134 @@ where

None
}

/// Get piece from archival storage (L1). The algorithm tries to get a piece from currently
/// connected peers and falls back to random walking.
pub async fn get_piece_from_archival_storage(
&self,
piece_index: PieceIndex,
max_random_walking_rounds: usize,
) -> Option<Piece> {
// TODO: consider using retry policy for L1 lookups as well.
trace!(%piece_index, "Getting piece from archival storage..");

let connected_peers = {
let connected_peers = match self.node.connected_peers().await {
Ok(connected_peers) => connected_peers,
Err(err) => {
debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");

Default::default()
}
};

HashSet::<PeerId>::from_iter(connected_peers)
};

if connected_peers.is_empty() {
debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
} else {
for peer_id in connected_peers.iter() {
let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await;

if maybe_piece.is_some() {
trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");

return maybe_piece;
}
}
}

trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
let random_walk_result = self
.get_piece_by_random_walking(piece_index, max_random_walking_rounds)
.await;

if random_walk_result.is_some() {
trace!(%piece_index, "DSN L1 lookup via random walk succeeded");

return random_walk_result;
} else {
debug!(
%piece_index,
%max_random_walking_rounds,
"Cannot acquire piece from DSN L1: random walk failed"
);
}

None
}

/// Get piece from L1 by random walking
async fn get_piece_by_random_walking(
&self,
piece_index: PieceIndex,
walking_rounds: usize,
) -> Option<Piece> {
for round in 0..walking_rounds {
debug!(%piece_index, round, "Random walk round");

let result = self
.get_piece_by_random_walking_from_single_round(piece_index, round)
.await;

if result.is_some() {
return result;
}
}

debug!(%piece_index, "Random walking piece retrieval failed.");

None
}

/// Get piece from L1 by random walking (single round)
async fn get_piece_by_random_walking_from_single_round(
&self,
piece_index: PieceIndex,
round: usize,
) -> Option<Piece> {
trace!(%piece_index, "get_piece_by_random_walking round");

// Random walk key
let key = PeerId::random();

let mut request_batch = self.node.get_requests_batch_handle().await;
let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await;

match get_closest_peers_result {
Ok(mut get_closest_peers_stream) => {
while let Some(peer_id) = get_closest_peers_stream.next().await {
trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");

let request_result = request_batch
.send_generic_request(peer_id, PieceByIndexRequest { piece_index })
.await;

match request_result {
Ok(PieceByIndexResponse { piece: Some(piece) }) => {
trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded.");

if let Some(validator) = &self.piece_validator {
return validator.validate_piece(peer_id, piece_index, piece).await;
} else {
return Some(piece);
}
}
Ok(PieceByIndexResponse { piece: None }) => {
debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
}
Err(error) => {
debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed.");
}
}
}
}
Err(err) => {
warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error");
}
}

None
}
}
2 changes: 1 addition & 1 deletion crates/subspace-service/src/sync_from_dsn/import_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ where
}
};
let maybe_piece = match piece_provider
.get_piece(
.get_piece_from_dsn_cache(
piece_index,
RetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()),
)
Expand Down

0 comments on commit 8f53e8b

Please sign in to comment.