diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index f1c0484b96..80007d60b2 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -64,8 +64,8 @@ use subspace_core_primitives::{ use subspace_farmer_components::FarmerProtocolInfo; use subspace_networking::libp2p::Multiaddr; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, + MAX_SEGMENT_HEADERS_PER_REQUEST, }; use tracing::{debug, error, warn}; @@ -73,7 +73,6 @@ use tracing::{debug, error, warn}; /// the fact that channel sender exists const SOLUTION_SENDER_CHANNEL_CAPACITY: usize = 9; const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500); -const NODE_SYNC_STATUS_CHECK_INTERVAL: Duration = Duration::from_secs(1); /// Provides rpc methods for interacting with Subspace. #[rpc(client, server)] @@ -112,14 +111,6 @@ pub trait SubspaceRpcApi { )] fn subscribe_archived_segment_header(&self); - /// Archived segment header subscription - #[subscription( - name = "subspace_subscribeNodeSyncStatusChange" => "subspace_node_sync_status_change", - unsubscribe = "subspace_unsubscribeNodeSyncStatusChange", - item = NodeSyncStatus, - )] - fn subscribe_node_sync_status_change(&self); - #[method(name = "subspace_segmentHeaders")] async fn segment_headers( &self, @@ -322,6 +313,7 @@ where FarmerAppInfo { genesis_hash: self.genesis_hash, dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(), + syncing: self.sync_oracle.is_major_syncing(), farming_timeout: chain_constants .slot_duration() .as_duration() @@ -649,61 +641,6 @@ where Ok(()) } - fn subscribe_node_sync_status_change(&self, mut sink: SubscriptionSink) -> SubscriptionResult { - let sync_oracle = self.sync_oracle.clone(); - let fut = async move { - let mut last_is_major_syncing = None; - loop { - let is_major_syncing = sync_oracle.is_major_syncing(); - - // Update subscriber if value has changed - if last_is_major_syncing != Some(is_major_syncing) { - // In case change is detected, wait for another interval to confirm. - // TODO: This is primarily because Substrate seems to lose peers for brief - // periods of time sometimes that needs to be investigated separately - futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await; - - // If status returned back to what it was, ignore - if last_is_major_syncing == Some(sync_oracle.is_major_syncing()) { - futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await; - continue; - } - - // Otherwise save new status - last_is_major_syncing.replace(is_major_syncing); - - let node_sync_status = if is_major_syncing { - NodeSyncStatus::MajorSyncing - } else { - NodeSyncStatus::Synced - }; - match sink.send(&node_sync_status) { - Ok(true) => { - // Success - } - Ok(false) => { - // Subscription closed - return; - } - Err(error) => { - error!("Failed to serialize node sync status: {}", error); - } - } - } - - futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await; - } - }; - - self.subscription_executor.spawn( - "subspace-node-sync-status-change-subscription", - Some("rpc"), - fut.boxed(), - ); - - Ok(()) - } - async fn acknowledge_archived_segment_header( &self, segment_index: SegmentIndex, diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 985739ffe9..fdd1d717f7 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -56,7 +56,7 @@ substrate-bip39 = "0.4.5" supports-color = "2.1.0" tempfile = "3.9.0" thiserror = "1.0.56" -tokio = { version = "1.35.1", features = ["macros", "parking_lot", "rt-multi-thread", "signal"] } +tokio = { version = "1.35.1", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } ulid = { version = "1.0.0", features = ["serde"] } diff --git a/crates/subspace-farmer/src/node_client.rs b/crates/subspace-farmer/src/node_client.rs index 4559e37765..f0cf926cf5 100644 --- a/crates/subspace-farmer/src/node_client.rs +++ b/crates/subspace-farmer/src/node_client.rs @@ -6,8 +6,7 @@ use std::fmt; use std::pin::Pin; use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; /// To become error type agnostic @@ -46,11 +45,6 @@ pub trait NodeClient: Clone + fmt::Debug + Send + Sync + 'static { &self, ) -> Result + Send + 'static>>, Error>; - /// Subscribe to node sync status change - async fn subscribe_node_sync_status_change( - &self, - ) -> Result + Send + 'static>>, Error>; - /// Get segment headers for the segments async fn segment_headers( &self, diff --git a/crates/subspace-farmer/src/node_client/node_rpc_client.rs b/crates/subspace-farmer/src/node_client/node_rpc_client.rs index b49853192a..684cf1101e 100644 --- a/crates/subspace-farmer/src/node_client/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_client/node_rpc_client.rs @@ -9,8 +9,7 @@ use std::pin::Pin; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; use tokio::sync::Semaphore; @@ -132,23 +131,6 @@ impl NodeClient for NodeRpcClient { ))) } - async fn subscribe_node_sync_status_change( - &self, - ) -> Result + Send + 'static>>, RpcError> { - let subscription = self - .client - .subscribe( - "subspace_subscribeNodeSyncStatusChange", - rpc_params![], - "subspace_unsubscribeNodeSyncStatusChange", - ) - .await?; - - Ok(Box::pin(subscription.filter_map( - |node_sync_status_result| async move { node_sync_status_result.ok() }, - ))) - } - async fn segment_headers( &self, segment_indexes: Vec, diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/piece_cache.rs index 5d6291a182..f11e7b0a4c 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/piece_cache.rs @@ -12,6 +12,7 @@ use parking_lot::RwLock; use std::collections::HashMap; use std::num::NonZeroU16; use std::sync::Arc; +use std::time::Duration; use std::{fmt, mem}; use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; @@ -30,6 +31,7 @@ const CONCURRENT_PIECES_TO_DOWNLOAD: usize = 1_000; const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100; /// Get piece retry attempts number. const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed"); +const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1); type HandlerFn = Arc; type Handler = Bag, A>; @@ -303,21 +305,36 @@ where info!("Synchronizing piece cache"); - // TODO: Query from the DSN too such that we don't build outdated cache at start if node is - // not synced fully - let last_segment_index = match self.node_client.farmer_app_info().await { - Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(), - Err(error) => { - error!( - %error, - "Failed to get farmer app info from node, keeping old cache state without \ - updates" - ); + let last_segment_index = loop { + match self.node_client.farmer_app_info().await { + Ok(farmer_app_info) => { + let last_segment_index = + farmer_app_info.protocol_info.history_size.segment_index(); + // Wait for node to be either fully synced or to be aware of non-zero segment + // index, which would indicate it has started DSN sync and knows about + // up-to-date archived history. + // + // While this doesn't account for situations where node was offline for a long + // time and is aware of old segment headers, this is good enough for piece cache + // sync to proceed and should result in better user experience on average. + if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO { + break last_segment_index; + } + } + Err(error) => { + error!( + %error, + "Failed to get farmer app info from node, keeping old cache state without \ + updates" + ); - // Not the latest, but at least something - *self.caches.write() = caches; - return; + // Not the latest, but at least something + *self.caches.write() = caches; + return; + } } + + tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await; }; debug!(%last_segment_index, "Identified last segment index"); diff --git a/crates/subspace-farmer/src/piece_cache/tests.rs b/crates/subspace-farmer/src/piece_cache/tests.rs index 83287dbed8..6b11150e0d 100644 --- a/crates/subspace-farmer/src/piece_cache/tests.rs +++ b/crates/subspace-farmer/src/piece_cache/tests.rs @@ -21,8 +21,7 @@ use subspace_networking::libp2p::identity; use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::utils::multihash::ToMultihash; use subspace_rpc_primitives::{ - FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; use tempfile::tempdir; @@ -42,6 +41,7 @@ impl NodeClient for MockNodeClient { Ok(FarmerAppInfo { genesis_hash: [0; 32], dsn_bootstrap_nodes: Vec::new(), + syncing: false, farming_timeout: Duration::default(), protocol_info: FarmerProtocolInfo { history_size: HistorySize::from(SegmentIndex::from( @@ -98,12 +98,6 @@ impl NodeClient for MockNodeClient { Ok(Box::pin(stream)) } - async fn subscribe_node_sync_status_change( - &self, - ) -> Result + Send + 'static>>, Error> { - unimplemented!() - } - async fn segment_headers( &self, _segment_indexes: Vec, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 7a4872a4ee..e6e3602f0b 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -586,14 +586,6 @@ where new_segment_processing_delay, ); - let (sectors_to_plot_proxy_sender, sectors_to_plot_proxy_receiver) = mpsc::channel(0); - - let pause_plotting_if_node_not_synced_fut = pause_plotting_if_node_not_synced( - &node_client, - sectors_to_plot_proxy_receiver, - sectors_to_plot_sender, - ); - let send_plotting_notifications_fut = send_plotting_notifications( public_key_hash, sectors_indices_left_to_plot, @@ -604,14 +596,11 @@ where sectors_metadata, &last_archived_segment, archived_segments_receiver, - sectors_to_plot_proxy_sender, + sectors_to_plot_sender, initial_plotting_finished, ); select! { - result = pause_plotting_if_node_not_synced_fut.fuse() => { - result - } result = read_archived_segments_notifications_fut.fuse() => { result } @@ -663,77 +652,6 @@ where Ok(()) } -async fn pause_plotting_if_node_not_synced( - node_client: &NC, - sectors_to_plot_proxy_receiver: mpsc::Receiver, - mut sectors_to_plot_sender: mpsc::Sender, -) -> Result<(), BackgroundTaskError> -where - NC: NodeClient, -{ - let mut node_sync_status_change_notifications = node_client - .subscribe_node_sync_status_change() - .await - .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?; - - let Some(mut node_sync_status) = node_sync_status_change_notifications.next().await else { - return Ok(()); - }; - - let mut sectors_to_plot_proxy_receiver = sectors_to_plot_proxy_receiver.fuse(); - let mut node_sync_status_change_notifications = node_sync_status_change_notifications.fuse(); - - 'outer: loop { - // Pause proxying of sectors to plot until we get notification that node is synced - if !node_sync_status.is_synced() { - info!("Node is not synced yet, pausing plotting until sync status changes"); - - loop { - match node_sync_status_change_notifications.next().await { - Some(new_node_sync_status) => { - node_sync_status = new_node_sync_status; - - if node_sync_status.is_synced() { - info!("Node is synced, resuming plotting"); - continue 'outer; - } - } - None => { - // Subscription ended, nothing left to do - return Ok(()); - } - } - } - } - - select! { - maybe_sector_to_plot = sectors_to_plot_proxy_receiver.next() => { - let Some(sector_to_plot) = maybe_sector_to_plot else { - // Subscription ended, nothing left to do - return Ok(()); - }; - - if let Err(_error) = sectors_to_plot_sender.send(sector_to_plot).await { - // Receiver disconnected, nothing left to do - return Ok(()); - } - }, - - maybe_node_sync_status = node_sync_status_change_notifications.next() => { - match maybe_node_sync_status { - Some(new_node_sync_status) => { - node_sync_status = new_node_sync_status; - } - None => { - // Subscription ended, nothing left to do - return Ok(()); - } - } - }, - } - } -} - struct SectorToReplot { sector_index: SectorIndex, expires_at: SegmentIndex, diff --git a/crates/subspace-rpc-primitives/src/lib.rs b/crates/subspace-rpc-primitives/src/lib.rs index 615566fe13..166af22842 100644 --- a/crates/subspace-rpc-primitives/src/lib.rs +++ b/crates/subspace-rpc-primitives/src/lib.rs @@ -35,6 +35,8 @@ pub struct FarmerAppInfo { pub genesis_hash: [u8; 32], /// Bootstrap nodes for DSN pub dsn_bootstrap_nodes: Vec, + /// Whether node is syncing right now + pub syncing: bool, /// How much time farmer has to audit sectors and generate a solution pub farming_timeout: Duration, /// Protocol info for farmer @@ -90,20 +92,3 @@ pub struct RewardSignatureResponse { /// Pre-header or vote hash signature. pub signature: Option, } - -/// Information about new slot that just arrived -#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum NodeSyncStatus { - /// Node is fully synced - Synced, - /// Node is major syncing - MajorSyncing, -} - -impl NodeSyncStatus { - /// Whether node is synced - pub fn is_synced(&self) -> bool { - matches!(self, Self::Synced) - } -}