From cb4bad4fb042952c3f0034871a114457fe157163 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 31 Jan 2024 10:09:44 +0200 Subject: [PATCH] Do not pause plotting when node isn't synced --- crates/sc-consensus-subspace-rpc/src/lib.rs | 68 +-------------- crates/subspace-farmer/src/node_client.rs | 8 +- .../src/node_client/node_rpc_client.rs | 20 +---- .../subspace-farmer/src/piece_cache/tests.rs | 9 +- .../src/single_disk_farm/plotting.rs | 84 +------------------ crates/subspace-rpc-primitives/src/lib.rs | 17 ---- 6 files changed, 6 insertions(+), 200 deletions(-) diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 1fec7d81b96..80007d60b2a 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -64,8 +64,8 @@ use subspace_core_primitives::{ use subspace_farmer_components::FarmerProtocolInfo; use subspace_networking::libp2p::Multiaddr; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, + MAX_SEGMENT_HEADERS_PER_REQUEST, }; use tracing::{debug, error, warn}; @@ -73,7 +73,6 @@ use tracing::{debug, error, warn}; /// the fact that channel sender exists const SOLUTION_SENDER_CHANNEL_CAPACITY: usize = 9; const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500); -const NODE_SYNC_STATUS_CHECK_INTERVAL: Duration = Duration::from_secs(1); /// Provides rpc methods for interacting with Subspace. #[rpc(client, server)] @@ -112,14 +111,6 @@ pub trait SubspaceRpcApi { )] fn subscribe_archived_segment_header(&self); - /// Archived segment header subscription - #[subscription( - name = "subspace_subscribeNodeSyncStatusChange" => "subspace_node_sync_status_change", - unsubscribe = "subspace_unsubscribeNodeSyncStatusChange", - item = NodeSyncStatus, - )] - fn subscribe_node_sync_status_change(&self); - #[method(name = "subspace_segmentHeaders")] async fn segment_headers( &self, @@ -650,61 +641,6 @@ where Ok(()) } - fn subscribe_node_sync_status_change(&self, mut sink: SubscriptionSink) -> SubscriptionResult { - let sync_oracle = self.sync_oracle.clone(); - let fut = async move { - let mut last_is_major_syncing = None; - loop { - let is_major_syncing = sync_oracle.is_major_syncing(); - - // Update subscriber if value has changed - if last_is_major_syncing != Some(is_major_syncing) { - // In case change is detected, wait for another interval to confirm. - // TODO: This is primarily because Substrate seems to lose peers for brief - // periods of time sometimes that needs to be investigated separately - futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await; - - // If status returned back to what it was, ignore - if last_is_major_syncing == Some(sync_oracle.is_major_syncing()) { - futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await; - continue; - } - - // Otherwise save new status - last_is_major_syncing.replace(is_major_syncing); - - let node_sync_status = if is_major_syncing { - NodeSyncStatus::MajorSyncing - } else { - NodeSyncStatus::Synced - }; - match sink.send(&node_sync_status) { - Ok(true) => { - // Success - } - Ok(false) => { - // Subscription closed - return; - } - Err(error) => { - error!("Failed to serialize node sync status: {}", error); - } - } - } - - futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await; - } - }; - - self.subscription_executor.spawn( - "subspace-node-sync-status-change-subscription", - Some("rpc"), - fut.boxed(), - ); - - Ok(()) - } - async fn acknowledge_archived_segment_header( &self, segment_index: SegmentIndex, diff --git a/crates/subspace-farmer/src/node_client.rs b/crates/subspace-farmer/src/node_client.rs index 4559e377657..f0cf926cf5e 100644 --- a/crates/subspace-farmer/src/node_client.rs +++ b/crates/subspace-farmer/src/node_client.rs @@ -6,8 +6,7 @@ use std::fmt; use std::pin::Pin; use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; /// To become error type agnostic @@ -46,11 +45,6 @@ pub trait NodeClient: Clone + fmt::Debug + Send + Sync + 'static { &self, ) -> Result + Send + 'static>>, Error>; - /// Subscribe to node sync status change - async fn subscribe_node_sync_status_change( - &self, - ) -> Result + Send + 'static>>, Error>; - /// Get segment headers for the segments async fn segment_headers( &self, diff --git a/crates/subspace-farmer/src/node_client/node_rpc_client.rs b/crates/subspace-farmer/src/node_client/node_rpc_client.rs index b49853192a6..684cf1101e5 100644 --- a/crates/subspace-farmer/src/node_client/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_client/node_rpc_client.rs @@ -9,8 +9,7 @@ use std::pin::Pin; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; use tokio::sync::Semaphore; @@ -132,23 +131,6 @@ impl NodeClient for NodeRpcClient { ))) } - async fn subscribe_node_sync_status_change( - &self, - ) -> Result + Send + 'static>>, RpcError> { - let subscription = self - .client - .subscribe( - "subspace_subscribeNodeSyncStatusChange", - rpc_params![], - "subspace_unsubscribeNodeSyncStatusChange", - ) - .await?; - - Ok(Box::pin(subscription.filter_map( - |node_sync_status_result| async move { node_sync_status_result.ok() }, - ))) - } - async fn segment_headers( &self, segment_indexes: Vec, diff --git a/crates/subspace-farmer/src/piece_cache/tests.rs b/crates/subspace-farmer/src/piece_cache/tests.rs index c5d00ef9724..6b11150e0d8 100644 --- a/crates/subspace-farmer/src/piece_cache/tests.rs +++ b/crates/subspace-farmer/src/piece_cache/tests.rs @@ -21,8 +21,7 @@ use subspace_networking::libp2p::identity; use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::utils::multihash::ToMultihash; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; use tempfile::tempdir; @@ -99,12 +98,6 @@ impl NodeClient for MockNodeClient { Ok(Box::pin(stream)) } - async fn subscribe_node_sync_status_change( - &self, - ) -> Result + Send + 'static>>, Error> { - unimplemented!() - } - async fn segment_headers( &self, _segment_indexes: Vec, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 7a4872a4ee5..e6e3602f0ba 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -586,14 +586,6 @@ where new_segment_processing_delay, ); - let (sectors_to_plot_proxy_sender, sectors_to_plot_proxy_receiver) = mpsc::channel(0); - - let pause_plotting_if_node_not_synced_fut = pause_plotting_if_node_not_synced( - &node_client, - sectors_to_plot_proxy_receiver, - sectors_to_plot_sender, - ); - let send_plotting_notifications_fut = send_plotting_notifications( public_key_hash, sectors_indices_left_to_plot, @@ -604,14 +596,11 @@ where sectors_metadata, &last_archived_segment, archived_segments_receiver, - sectors_to_plot_proxy_sender, + sectors_to_plot_sender, initial_plotting_finished, ); select! { - result = pause_plotting_if_node_not_synced_fut.fuse() => { - result - } result = read_archived_segments_notifications_fut.fuse() => { result } @@ -663,77 +652,6 @@ where Ok(()) } -async fn pause_plotting_if_node_not_synced( - node_client: &NC, - sectors_to_plot_proxy_receiver: mpsc::Receiver, - mut sectors_to_plot_sender: mpsc::Sender, -) -> Result<(), BackgroundTaskError> -where - NC: NodeClient, -{ - let mut node_sync_status_change_notifications = node_client - .subscribe_node_sync_status_change() - .await - .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?; - - let Some(mut node_sync_status) = node_sync_status_change_notifications.next().await else { - return Ok(()); - }; - - let mut sectors_to_plot_proxy_receiver = sectors_to_plot_proxy_receiver.fuse(); - let mut node_sync_status_change_notifications = node_sync_status_change_notifications.fuse(); - - 'outer: loop { - // Pause proxying of sectors to plot until we get notification that node is synced - if !node_sync_status.is_synced() { - info!("Node is not synced yet, pausing plotting until sync status changes"); - - loop { - match node_sync_status_change_notifications.next().await { - Some(new_node_sync_status) => { - node_sync_status = new_node_sync_status; - - if node_sync_status.is_synced() { - info!("Node is synced, resuming plotting"); - continue 'outer; - } - } - None => { - // Subscription ended, nothing left to do - return Ok(()); - } - } - } - } - - select! { - maybe_sector_to_plot = sectors_to_plot_proxy_receiver.next() => { - let Some(sector_to_plot) = maybe_sector_to_plot else { - // Subscription ended, nothing left to do - return Ok(()); - }; - - if let Err(_error) = sectors_to_plot_sender.send(sector_to_plot).await { - // Receiver disconnected, nothing left to do - return Ok(()); - } - }, - - maybe_node_sync_status = node_sync_status_change_notifications.next() => { - match maybe_node_sync_status { - Some(new_node_sync_status) => { - node_sync_status = new_node_sync_status; - } - None => { - // Subscription ended, nothing left to do - return Ok(()); - } - } - }, - } - } -} - struct SectorToReplot { sector_index: SectorIndex, expires_at: SegmentIndex, diff --git a/crates/subspace-rpc-primitives/src/lib.rs b/crates/subspace-rpc-primitives/src/lib.rs index f5e7657c26b..166af22842f 100644 --- a/crates/subspace-rpc-primitives/src/lib.rs +++ b/crates/subspace-rpc-primitives/src/lib.rs @@ -92,20 +92,3 @@ pub struct RewardSignatureResponse { /// Pre-header or vote hash signature. pub signature: Option, } - -/// Information about new slot that just arrived -#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum NodeSyncStatus { - /// Node is fully synced - Synced, - /// Node is major syncing - MajorSyncing, -} - -impl NodeSyncStatus { - /// Whether node is synced - pub fn is_synced(&self) -> bool { - matches!(self, Self::Synced) - } -}