diff --git a/crates/subspace-farmer/src/node_client/node_rpc_client.rs b/crates/subspace-farmer/src/node_client/node_rpc_client.rs index 793042202f1..6f8a974c452 100644 --- a/crates/subspace-farmer/src/node_client/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_client/node_rpc_client.rs @@ -1,6 +1,6 @@ use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt}; use crate::utils::AsyncJoinOnDrop; -use async_lock::RwLock as AsyncRwLock; +use async_lock::{RwLock as AsyncRwLock, Semaphore}; use async_trait::async_trait; use futures::{Stream, StreamExt}; use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT}; @@ -15,10 +15,15 @@ use subspace_rpc_primitives::{ }; use tracing::{info, trace, warn}; +/// Node is having a hard time responding for many piece requests +// TODO: Remove this once https://github.com/paritytech/jsonrpsee/issues/1189 is resolved +const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10; + /// `WsClient` wrapper. #[derive(Debug, Clone)] pub struct NodeRpcClient { client: Arc, + piece_request_semaphore: Arc, segment_headers: Arc>>, _background_task: Arc>, } @@ -32,6 +37,7 @@ impl NodeRpcClient { .build(url) .await?, ); + let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS)); let mut segment_headers = Vec::::new(); let mut archived_segments_notifications = Box::pin( @@ -123,6 +129,7 @@ impl NodeRpcClient { let node_client = Self { client, + piece_request_semaphore, segment_headers, _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)), }; @@ -234,6 +241,7 @@ impl NodeClient for NodeRpcClient { } async fn piece(&self, piece_index: PieceIndex) -> Result, RpcError> { + let _permit = self.piece_request_semaphore.acquire().await; let client = Arc::clone(&self.client); // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes // issues for jsonrpsee