Skip to content

Commit

Permalink
Merge pull request #2487 from subspace/plot-while-syncing
Browse files Browse the repository at this point in the history
Plot while syncing
  • Loading branch information
nazar-pc authored Feb 1, 2024
2 parents 3c73368 + 9234d40 commit b38f026
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 214 deletions.
69 changes: 3 additions & 66 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ use subspace_core_primitives::{
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_networking::libp2p::Multiaddr;
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tracing::{debug, error, warn};

/// This is essentially equal to expected number of votes per block, one more is added implicitly by
/// the fact that channel sender exists
const SOLUTION_SENDER_CHANNEL_CAPACITY: usize = 9;
const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500);
const NODE_SYNC_STATUS_CHECK_INTERVAL: Duration = Duration::from_secs(1);

/// Provides rpc methods for interacting with Subspace.
#[rpc(client, server)]
Expand Down Expand Up @@ -112,14 +111,6 @@ pub trait SubspaceRpcApi {
)]
fn subscribe_archived_segment_header(&self);

/// Archived segment header subscription
#[subscription(
name = "subspace_subscribeNodeSyncStatusChange" => "subspace_node_sync_status_change",
unsubscribe = "subspace_unsubscribeNodeSyncStatusChange",
item = NodeSyncStatus,
)]
fn subscribe_node_sync_status_change(&self);

#[method(name = "subspace_segmentHeaders")]
async fn segment_headers(
&self,
Expand Down Expand Up @@ -322,6 +313,7 @@ where
FarmerAppInfo {
genesis_hash: self.genesis_hash,
dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
syncing: self.sync_oracle.is_major_syncing(),
farming_timeout: chain_constants
.slot_duration()
.as_duration()
Expand Down Expand Up @@ -649,61 +641,6 @@ where
Ok(())
}

fn subscribe_node_sync_status_change(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let sync_oracle = self.sync_oracle.clone();
let fut = async move {
let mut last_is_major_syncing = None;
loop {
let is_major_syncing = sync_oracle.is_major_syncing();

// Update subscriber if value has changed
if last_is_major_syncing != Some(is_major_syncing) {
// In case change is detected, wait for another interval to confirm.
// TODO: This is primarily because Substrate seems to lose peers for brief
// periods of time sometimes that needs to be investigated separately
futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await;

// If status returned back to what it was, ignore
if last_is_major_syncing == Some(sync_oracle.is_major_syncing()) {
futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await;
continue;
}

// Otherwise save new status
last_is_major_syncing.replace(is_major_syncing);

let node_sync_status = if is_major_syncing {
NodeSyncStatus::MajorSyncing
} else {
NodeSyncStatus::Synced
};
match sink.send(&node_sync_status) {
Ok(true) => {
// Success
}
Ok(false) => {
// Subscription closed
return;
}
Err(error) => {
error!("Failed to serialize node sync status: {}", error);
}
}
}

futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await;
}
};

self.subscription_executor.spawn(
"subspace-node-sync-status-change-subscription",
Some("rpc"),
fut.boxed(),
);

Ok(())
}

async fn acknowledge_archived_segment_header(
&self,
segment_index: SegmentIndex,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ substrate-bip39 = "0.4.5"
supports-color = "2.1.0"
tempfile = "3.9.0"
thiserror = "1.0.56"
tokio = { version = "1.35.1", features = ["macros", "parking_lot", "rt-multi-thread", "signal"] }
tokio = { version = "1.35.1", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
ulid = { version = "1.0.0", features = ["serde"] }
Expand Down
8 changes: 1 addition & 7 deletions crates/subspace-farmer/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::fmt;
use std::pin::Pin;
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};

/// To become error type agnostic
Expand Down Expand Up @@ -46,11 +45,6 @@ pub trait NodeClient: Clone + fmt::Debug + Send + Sync + 'static {
&self,
) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, Error>;

/// Subscribe to node sync status change
async fn subscribe_node_sync_status_change(
&self,
) -> Result<Pin<Box<dyn Stream<Item = NodeSyncStatus> + Send + 'static>>, Error>;

/// Get segment headers for the segments
async fn segment_headers(
&self,
Expand Down
20 changes: 1 addition & 19 deletions crates/subspace-farmer/src/node_client/node_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use std::pin::Pin;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tokio::sync::Semaphore;

Expand Down Expand Up @@ -132,23 +131,6 @@ impl NodeClient for NodeRpcClient {
)))
}

async fn subscribe_node_sync_status_change(
&self,
) -> Result<Pin<Box<dyn Stream<Item = NodeSyncStatus> + Send + 'static>>, RpcError> {
let subscription = self
.client
.subscribe(
"subspace_subscribeNodeSyncStatusChange",
rpc_params![],
"subspace_unsubscribeNodeSyncStatusChange",
)
.await?;

Ok(Box::pin(subscription.filter_map(
|node_sync_status_result| async move { node_sync_status_result.ok() },
)))
}

async fn segment_headers(
&self,
segment_indexes: Vec<SegmentIndex>,
Expand Down
43 changes: 30 additions & 13 deletions crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use parking_lot::RwLock;
use std::collections::HashMap;
use std::num::NonZeroU16;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, mem};
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
Expand All @@ -30,6 +31,7 @@ const CONCURRENT_PIECES_TO_DOWNLOAD: usize = 1_000;
const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed");
const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);

type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
type Handler<A> = Bag<HandlerFn<A>, A>;
Expand Down Expand Up @@ -303,21 +305,36 @@ where

info!("Synchronizing piece cache");

// TODO: Query from the DSN too such that we don't build outdated cache at start if node is
// not synced fully
let last_segment_index = match self.node_client.farmer_app_info().await {
Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
Err(error) => {
error!(
%error,
"Failed to get farmer app info from node, keeping old cache state without \
updates"
);
let last_segment_index = loop {
match self.node_client.farmer_app_info().await {
Ok(farmer_app_info) => {
let last_segment_index =
farmer_app_info.protocol_info.history_size.segment_index();
// Wait for node to be either fully synced or to be aware of non-zero segment
// index, which would indicate it has started DSN sync and knows about
// up-to-date archived history.
//
// While this doesn't account for situations where node was offline for a long
// time and is aware of old segment headers, this is good enough for piece cache
// sync to proceed and should result in better user experience on average.
if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
break last_segment_index;
}
}
Err(error) => {
error!(
%error,
"Failed to get farmer app info from node, keeping old cache state without \
updates"
);

// Not the latest, but at least something
*self.caches.write() = caches;
return;
// Not the latest, but at least something
*self.caches.write() = caches;
return;
}
}

tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
};

debug!(%last_segment_index, "Identified last segment index");
Expand Down
10 changes: 2 additions & 8 deletions crates/subspace-farmer/src/piece_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use subspace_networking::libp2p::identity;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tempfile::tempdir;

Expand All @@ -42,6 +41,7 @@ impl NodeClient for MockNodeClient {
Ok(FarmerAppInfo {
genesis_hash: [0; 32],
dsn_bootstrap_nodes: Vec::new(),
syncing: false,
farming_timeout: Duration::default(),
protocol_info: FarmerProtocolInfo {
history_size: HistorySize::from(SegmentIndex::from(
Expand Down Expand Up @@ -98,12 +98,6 @@ impl NodeClient for MockNodeClient {
Ok(Box::pin(stream))
}

async fn subscribe_node_sync_status_change(
&self,
) -> Result<Pin<Box<dyn Stream<Item = NodeSyncStatus> + Send + 'static>>, Error> {
unimplemented!()
}

async fn segment_headers(
&self,
_segment_indexes: Vec<SegmentIndex>,
Expand Down
84 changes: 1 addition & 83 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,6 @@ where
new_segment_processing_delay,
);

let (sectors_to_plot_proxy_sender, sectors_to_plot_proxy_receiver) = mpsc::channel(0);

let pause_plotting_if_node_not_synced_fut = pause_plotting_if_node_not_synced(
&node_client,
sectors_to_plot_proxy_receiver,
sectors_to_plot_sender,
);

let send_plotting_notifications_fut = send_plotting_notifications(
public_key_hash,
sectors_indices_left_to_plot,
Expand All @@ -604,14 +596,11 @@ where
sectors_metadata,
&last_archived_segment,
archived_segments_receiver,
sectors_to_plot_proxy_sender,
sectors_to_plot_sender,
initial_plotting_finished,
);

select! {
result = pause_plotting_if_node_not_synced_fut.fuse() => {
result
}
result = read_archived_segments_notifications_fut.fuse() => {
result
}
Expand Down Expand Up @@ -663,77 +652,6 @@ where
Ok(())
}

async fn pause_plotting_if_node_not_synced<NC>(
node_client: &NC,
sectors_to_plot_proxy_receiver: mpsc::Receiver<SectorToPlot>,
mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
) -> Result<(), BackgroundTaskError>
where
NC: NodeClient,
{
let mut node_sync_status_change_notifications = node_client
.subscribe_node_sync_status_change()
.await
.map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;

let Some(mut node_sync_status) = node_sync_status_change_notifications.next().await else {
return Ok(());
};

let mut sectors_to_plot_proxy_receiver = sectors_to_plot_proxy_receiver.fuse();
let mut node_sync_status_change_notifications = node_sync_status_change_notifications.fuse();

'outer: loop {
// Pause proxying of sectors to plot until we get notification that node is synced
if !node_sync_status.is_synced() {
info!("Node is not synced yet, pausing plotting until sync status changes");

loop {
match node_sync_status_change_notifications.next().await {
Some(new_node_sync_status) => {
node_sync_status = new_node_sync_status;

if node_sync_status.is_synced() {
info!("Node is synced, resuming plotting");
continue 'outer;
}
}
None => {
// Subscription ended, nothing left to do
return Ok(());
}
}
}
}

select! {
maybe_sector_to_plot = sectors_to_plot_proxy_receiver.next() => {
let Some(sector_to_plot) = maybe_sector_to_plot else {
// Subscription ended, nothing left to do
return Ok(());
};

if let Err(_error) = sectors_to_plot_sender.send(sector_to_plot).await {
// Receiver disconnected, nothing left to do
return Ok(());
}
},

maybe_node_sync_status = node_sync_status_change_notifications.next() => {
match maybe_node_sync_status {
Some(new_node_sync_status) => {
node_sync_status = new_node_sync_status;
}
None => {
// Subscription ended, nothing left to do
return Ok(());
}
}
},
}
}
}

struct SectorToReplot {
sector_index: SectorIndex,
expires_at: SegmentIndex,
Expand Down
Loading

0 comments on commit b38f026

Please sign in to comment.