Skip to content

Commit

Permalink
Merge pull request #2781 from subspace/improve-piece-retrieval-debugging
Browse files Browse the repository at this point in the history
Improve piece retrieval debugging by consistently logging short hex key
  • Loading branch information
nazar-pc authored May 22, 2024
2 parents 76aa651 + 87b364d commit 211e666
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 20 deletions.
10 changes: 5 additions & 5 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::channel::mpsc::SendError;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, Stream, StreamExt};
use libp2p::gossipsub::{Sha256Topic, SubscriptionError};
use libp2p::kad::PeerRecord;
use libp2p::kad::{PeerRecord, RecordKey};
use libp2p::{Multiaddr, PeerId};
use parity_scale_codec::Decode;
use std::pin::Pin;
Expand Down Expand Up @@ -436,14 +436,14 @@ impl Node {
/// Get item providers by its key. Initiate 'providers' Kademlia operation.
pub async fn get_providers(
&self,
key: Multihash,
key: RecordKey,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
self.get_providers_internal(key, true).await
}

async fn get_providers_internal(
&self,
key: Multihash,
key: RecordKey,
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
let permit = if acquire_permit {
Expand All @@ -454,7 +454,7 @@ impl Node {

let (result_sender, result_receiver) = mpsc::unbounded();

trace!(?key, "Starting 'get_providers' request.");
trace!(key = hex::encode(&key), "Starting 'get_providers' request");

self.shared
.command_sender
Expand Down Expand Up @@ -598,7 +598,7 @@ impl NodeRequestsBatchHandle {
/// Get item providers by its key. Initiate 'providers' Kademlia operation.
pub async fn get_providers(
&mut self,
key: Multihash,
key: RecordKey,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
self.node.get_providers_internal(key, false).await
}
Expand Down
15 changes: 11 additions & 4 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use libp2p::kad::{
Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
InboundRequest, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult, Quorum, Record,
RecordKey,
};
use libp2p::metrics::{Metrics, Recorder};
use libp2p::multiaddr::Protocol;
Expand Down Expand Up @@ -57,6 +58,7 @@ enum QueryResultSender {
_permit: Option<OwnedSemaphorePermit>,
},
Providers {
key: RecordKey,
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: Option<OwnedSemaphorePermit>,
Expand Down Expand Up @@ -963,7 +965,7 @@ where
..
} => {
let mut cancelled = false;
if let Some(QueryResultSender::Providers { sender, .. }) =
if let Some(QueryResultSender::Providers { key, sender, .. }) =
self.query_id_receivers.get(&id)
{
match result {
Expand All @@ -984,8 +986,12 @@ where
) || cancelled;
}
}
Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { .. }) => {
trace!("Get providers query yielded no results");
Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
trace!(
key = hex::encode(key),
closest_peers = %closest_peers.len(),
"Get providers query yielded no results"
);
}
Err(error) => {
let GetProvidersError::Timeout { key, .. } = error;
Expand Down Expand Up @@ -1375,11 +1381,12 @@ where
.swarm
.behaviour_mut()
.kademlia
.get_providers(key.into());
.get_providers(key.clone());

self.query_id_receivers.insert(
query_id,
QueryResultSender::Providers {
key,
sender: result_sender,
_permit: permit,
},
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::utils::Handler;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use libp2p::gossipsub::{PublishError, Sha256Topic, SubscriptionError};
use libp2p::kad::PeerRecord;
use libp2p::kad::{PeerRecord, RecordKey};
use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -89,7 +89,7 @@ pub(crate) enum Command {
result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
GetProviders {
key: Multihash,
key: RecordKey,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: Option<OwnedSemaphorePermit>,
},
Expand Down
40 changes: 31 additions & 9 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::utils::multihash::ToMultihash;
use crate::{Node, PieceByIndexRequest, PieceByIndexResponse};
use async_trait::async_trait;
use futures::StreamExt;
use libp2p::kad::RecordKey;
use libp2p::PeerId;
use std::collections::HashSet;
use std::fmt;
Expand Down Expand Up @@ -59,23 +60,33 @@ where

/// Returns piece by its index from farmer's piece cache (L2)
pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option<Piece> {
let key = piece_index.to_multihash();
let key = RecordKey::from(piece_index.to_multihash());

let mut request_batch = self.node.get_requests_batch_handle().await;
let get_providers_result = request_batch.get_providers(key).await;
let get_providers_result = request_batch.get_providers(key.clone()).await;

match get_providers_result {
Ok(mut get_providers_stream) => {
while let Some(provider_id) = get_providers_stream.next().await {
trace!(%piece_index, %provider_id, "get_providers returned an item");
trace!(
%piece_index,
key = hex::encode(&key),
%provider_id,
"get_providers returned an item"
);

let request_result = request_batch
.send_generic_request(provider_id, PieceByIndexRequest { piece_index })
.await;

match request_result {
Ok(PieceByIndexResponse { piece: Some(piece) }) => {
trace!(%provider_id, %piece_index, ?key, "Piece request succeeded.");
trace!(
%piece_index,
key = hex::encode(&key),
%provider_id,
"Piece request succeeded"
);

if let Some(validator) = &self.piece_validator {
return validator
Expand All @@ -86,10 +97,21 @@ where
}
}
Ok(PieceByIndexResponse { piece: None }) => {
debug!(%provider_id, %piece_index, ?key, "Piece request returned empty piece.");
debug!(
%piece_index,
key = hex::encode(&key),
%provider_id,
"Piece request returned empty piece"
);
}
Err(error) => {
debug!(%provider_id, %piece_index, ?key, ?error, "Piece request failed.");
debug!(
%piece_index,
key = hex::encode(&key),
%provider_id,
?error,
"Piece request failed"
);
}
}
}
Expand All @@ -115,7 +137,7 @@ where

match request_result {
Ok(PieceByIndexResponse { piece: Some(piece) }) => {
trace!(%peer_id, %piece_index, "Piece request succeeded.");
trace!(%peer_id, %piece_index, "Piece request succeeded");

if let Some(validator) = &self.piece_validator {
return validator.validate_piece(peer_id, piece_index, piece).await;
Expand All @@ -124,10 +146,10 @@ where
}
}
Ok(PieceByIndexResponse { piece: None }) => {
debug!(%peer_id, %piece_index, "Piece request returned empty piece.");
debug!(%peer_id, %piece_index, "Piece request returned empty piece");
}
Err(error) => {
debug!(%peer_id, %piece_index, ?error, "Piece request failed.");
debug!(%peer_id, %piece_index, ?error, "Piece request failed");
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/subspace-service/src/sync_from_dsn/import_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use subspace_archiving::reconstructor::Reconstructor;
use subspace_core_primitives::{
ArchivedHistorySegment, BlockNumber, Piece, PieceIndex, RecordedHistorySegment, SegmentIndex,
};
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use tokio::sync::Semaphore;
use tracing::warn;
Expand Down Expand Up @@ -327,8 +328,11 @@ where
}
};

let key =
subspace_networking::libp2p::kad::RecordKey::from(piece_index.to_multihash());
trace!(
?piece_index,
key = hex::encode(&key),
piece_found = maybe_piece.is_some(),
"Piece request succeeded",
);
Expand Down

0 comments on commit 211e666

Please sign in to comment.