Skip to content

Commit

Permalink
Merge pull request #2854 from subspace/faster-piece-getting-via-rpc
Browse files Browse the repository at this point in the history
Faster piece getting via RPC
  • Loading branch information
nazar-pc authored Jun 17, 2024
2 parents cc8a2b7 + 744d52b commit 1f2d60e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ where
debug!(%segment_index, "Downloading potentially useful pieces");

// We do not insert pieces into cache/heap yet, so we don't know if all of these pieces
// will be included, but there is a good chance they will be and we want to acknowledge
// will be included, but there is a good chance they will be, and we want to acknowledge
// new segment header as soon as possible
let pieces_to_maybe_include = segment_index
.segment_piece_indexes()
Expand Down
23 changes: 18 additions & 5 deletions crates/subspace-farmer/src/node_client/node_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt};
use crate::utils::AsyncJoinOnDrop;
use async_lock::RwLock as AsyncRwLock;
use async_lock::{RwLock as AsyncRwLock, Semaphore};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
Expand All @@ -15,10 +15,15 @@ use subspace_rpc_primitives::{
};
use tracing::{info, trace, warn};

/// TODO: Node is having a hard time responding for many piece requests, specifically this results
/// in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;

/// `WsClient` wrapper.
#[derive(Debug, Clone)]
pub struct NodeRpcClient {
client: Arc<WsClient>,
piece_request_semaphore: Arc<Semaphore>,
segment_headers: Arc<AsyncRwLock<Vec<SegmentHeader>>>,
_background_task: Arc<AsyncJoinOnDrop<()>>,
}
Expand All @@ -32,6 +37,7 @@ impl NodeRpcClient {
.build(url)
.await?,
);
let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));

let mut segment_headers = Vec::<SegmentHeader>::new();
let mut archived_segments_notifications = Box::pin(
Expand Down Expand Up @@ -123,6 +129,7 @@ impl NodeRpcClient {

let node_client = Self {
client,
piece_request_semaphore,
segment_headers,
_background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
};
Expand Down Expand Up @@ -234,10 +241,16 @@ impl NodeClient for NodeRpcClient {
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, RpcError> {
Ok(self
.client
.request("subspace_piece", rpc_params![&piece_index])
.await?)
let _permit = self.piece_request_semaphore.acquire().await;
let client = Arc::clone(&self.client);
// Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes
// issues for jsonrpsee
let piece_fut = tokio::task::spawn(async move {
client
.request("subspace_piece", rpc_params![&piece_index])
.await
});
Ok(piece_fut.await??)
}

async fn acknowledge_archived_segment_header(
Expand Down

0 comments on commit 1f2d60e

Please sign in to comment.