Skip to content

Commit

Permalink
Do not pause plotting when node isn't synced
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Jan 31, 2024
1 parent 0322435 commit cb4bad4
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 200 deletions.
68 changes: 2 additions & 66 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ use subspace_core_primitives::{
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_networking::libp2p::Multiaddr;
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tracing::{debug, error, warn};

/// This is essentially equal to expected number of votes per block, one more is added implicitly by
/// the fact that channel sender exists
const SOLUTION_SENDER_CHANNEL_CAPACITY: usize = 9;
const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500);
const NODE_SYNC_STATUS_CHECK_INTERVAL: Duration = Duration::from_secs(1);

/// Provides rpc methods for interacting with Subspace.
#[rpc(client, server)]
Expand Down Expand Up @@ -112,14 +111,6 @@ pub trait SubspaceRpcApi {
)]
fn subscribe_archived_segment_header(&self);

/// Archived segment header subscription
#[subscription(
name = "subspace_subscribeNodeSyncStatusChange" => "subspace_node_sync_status_change",
unsubscribe = "subspace_unsubscribeNodeSyncStatusChange",
item = NodeSyncStatus,
)]
fn subscribe_node_sync_status_change(&self);

#[method(name = "subspace_segmentHeaders")]
async fn segment_headers(
&self,
Expand Down Expand Up @@ -650,61 +641,6 @@ where
Ok(())
}

fn subscribe_node_sync_status_change(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let sync_oracle = self.sync_oracle.clone();
let fut = async move {
let mut last_is_major_syncing = None;
loop {
let is_major_syncing = sync_oracle.is_major_syncing();

// Update subscriber if value has changed
if last_is_major_syncing != Some(is_major_syncing) {
// In case change is detected, wait for another interval to confirm.
// TODO: This is primarily because Substrate seems to lose peers for brief
// periods of time sometimes that needs to be investigated separately
futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await;

// If status returned back to what it was, ignore
if last_is_major_syncing == Some(sync_oracle.is_major_syncing()) {
futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await;
continue;
}

// Otherwise save new status
last_is_major_syncing.replace(is_major_syncing);

let node_sync_status = if is_major_syncing {
NodeSyncStatus::MajorSyncing
} else {
NodeSyncStatus::Synced
};
match sink.send(&node_sync_status) {
Ok(true) => {
// Success
}
Ok(false) => {
// Subscription closed
return;
}
Err(error) => {
error!("Failed to serialize node sync status: {}", error);
}
}
}

futures_timer::Delay::new(NODE_SYNC_STATUS_CHECK_INTERVAL).await;
}
};

self.subscription_executor.spawn(
"subspace-node-sync-status-change-subscription",
Some("rpc"),
fut.boxed(),
);

Ok(())
}

async fn acknowledge_archived_segment_header(
&self,
segment_index: SegmentIndex,
Expand Down
8 changes: 1 addition & 7 deletions crates/subspace-farmer/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::fmt;
use std::pin::Pin;
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};

/// To become error type agnostic
Expand Down Expand Up @@ -46,11 +45,6 @@ pub trait NodeClient: Clone + fmt::Debug + Send + Sync + 'static {
&self,
) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, Error>;

/// Subscribe to node sync status change
async fn subscribe_node_sync_status_change(
&self,
) -> Result<Pin<Box<dyn Stream<Item = NodeSyncStatus> + Send + 'static>>, Error>;

/// Get segment headers for the segments
async fn segment_headers(
&self,
Expand Down
20 changes: 1 addition & 19 deletions crates/subspace-farmer/src/node_client/node_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use std::pin::Pin;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tokio::sync::Semaphore;

Expand Down Expand Up @@ -132,23 +131,6 @@ impl NodeClient for NodeRpcClient {
)))
}

async fn subscribe_node_sync_status_change(
&self,
) -> Result<Pin<Box<dyn Stream<Item = NodeSyncStatus> + Send + 'static>>, RpcError> {
let subscription = self
.client
.subscribe(
"subspace_subscribeNodeSyncStatusChange",
rpc_params![],
"subspace_unsubscribeNodeSyncStatusChange",
)
.await?;

Ok(Box::pin(subscription.filter_map(
|node_sync_status_result| async move { node_sync_status_result.ok() },
)))
}

async fn segment_headers(
&self,
segment_indexes: Vec<SegmentIndex>,
Expand Down
9 changes: 1 addition & 8 deletions crates/subspace-farmer/src/piece_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use subspace_networking::libp2p::identity;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse,
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tempfile::tempdir;

Expand Down Expand Up @@ -99,12 +98,6 @@ impl NodeClient for MockNodeClient {
Ok(Box::pin(stream))
}

async fn subscribe_node_sync_status_change(
&self,
) -> Result<Pin<Box<dyn Stream<Item = NodeSyncStatus> + Send + 'static>>, Error> {
unimplemented!()
}

async fn segment_headers(
&self,
_segment_indexes: Vec<SegmentIndex>,
Expand Down
84 changes: 1 addition & 83 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,6 @@ where
new_segment_processing_delay,
);

let (sectors_to_plot_proxy_sender, sectors_to_plot_proxy_receiver) = mpsc::channel(0);

let pause_plotting_if_node_not_synced_fut = pause_plotting_if_node_not_synced(
&node_client,
sectors_to_plot_proxy_receiver,
sectors_to_plot_sender,
);

let send_plotting_notifications_fut = send_plotting_notifications(
public_key_hash,
sectors_indices_left_to_plot,
Expand All @@ -604,14 +596,11 @@ where
sectors_metadata,
&last_archived_segment,
archived_segments_receiver,
sectors_to_plot_proxy_sender,
sectors_to_plot_sender,
initial_plotting_finished,
);

select! {
result = pause_plotting_if_node_not_synced_fut.fuse() => {
result
}
result = read_archived_segments_notifications_fut.fuse() => {
result
}
Expand Down Expand Up @@ -663,77 +652,6 @@ where
Ok(())
}

async fn pause_plotting_if_node_not_synced<NC>(
node_client: &NC,
sectors_to_plot_proxy_receiver: mpsc::Receiver<SectorToPlot>,
mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
) -> Result<(), BackgroundTaskError>
where
NC: NodeClient,
{
let mut node_sync_status_change_notifications = node_client
.subscribe_node_sync_status_change()
.await
.map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;

let Some(mut node_sync_status) = node_sync_status_change_notifications.next().await else {
return Ok(());
};

let mut sectors_to_plot_proxy_receiver = sectors_to_plot_proxy_receiver.fuse();
let mut node_sync_status_change_notifications = node_sync_status_change_notifications.fuse();

'outer: loop {
// Pause proxying of sectors to plot until we get notification that node is synced
if !node_sync_status.is_synced() {
info!("Node is not synced yet, pausing plotting until sync status changes");

loop {
match node_sync_status_change_notifications.next().await {
Some(new_node_sync_status) => {
node_sync_status = new_node_sync_status;

if node_sync_status.is_synced() {
info!("Node is synced, resuming plotting");
continue 'outer;
}
}
None => {
// Subscription ended, nothing left to do
return Ok(());
}
}
}
}

select! {
maybe_sector_to_plot = sectors_to_plot_proxy_receiver.next() => {
let Some(sector_to_plot) = maybe_sector_to_plot else {
// Subscription ended, nothing left to do
return Ok(());
};

if let Err(_error) = sectors_to_plot_sender.send(sector_to_plot).await {
// Receiver disconnected, nothing left to do
return Ok(());
}
},

maybe_node_sync_status = node_sync_status_change_notifications.next() => {
match maybe_node_sync_status {
Some(new_node_sync_status) => {
node_sync_status = new_node_sync_status;
}
None => {
// Subscription ended, nothing left to do
return Ok(());
}
}
},
}
}
}

struct SectorToReplot {
sector_index: SectorIndex,
expires_at: SegmentIndex,
Expand Down
17 changes: 0 additions & 17 deletions crates/subspace-rpc-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,3 @@ pub struct RewardSignatureResponse {
/// Pre-header or vote hash signature.
pub signature: Option<RewardSignature>,
}

/// Information about new slot that just arrived
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum NodeSyncStatus {
/// Node is fully synced
Synced,
/// Node is major syncing
MajorSyncing,
}

impl NodeSyncStatus {
/// Whether node is synced
pub fn is_synced(&self) -> bool {
matches!(self, Self::Synced)
}
}

0 comments on commit cb4bad4

Please sign in to comment.