diff --git a/Cargo.lock b/Cargo.lock index f1f328f401..332403286d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12749,6 +12749,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.12.2", "prometheus-client 0.22.2", + "prost 0.12.4", "sc-basic-authorship", "sc-chain-spec", "sc-client-api", diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index ed9f7594bf..293c46cd6e 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -865,6 +865,10 @@ where // Special sync mode where verified blocks were inserted into blockchain // directly, archiving of this block will naturally happen later continue; + } else if best_archived_block_number.is_zero() { + // We may have imported some block using special sync mode right after genesis, + // in which case archiver will be stuck at genesis block + continue; } else { let error = format!( "There was a gap in blockchain history and the last contiguous series of \ @@ -1007,15 +1011,15 @@ where .filter(|block_number| *block_number > client.info().finalized_number); if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { - let block_hash_to_finalize = client - .hash(block_number_to_finalize)? - .expect("Block about to be finalized must always exist"); - finalize_block( - client.as_ref(), - telemetry.clone(), - block_hash_to_finalize, - block_number_to_finalize, - ); + // Block is not guaranteed to be present this deep if we have only synced recent blocks + if let Some(block_hash_to_finalize) = client.hash(block_number_to_finalize)? { + finalize_block( + client.as_ref(), + telemetry.clone(), + block_hash_to_finalize, + block_number_to_finalize, + ); + } } } diff --git a/crates/sc-consensus-subspace/src/block_import.rs b/crates/sc-consensus-subspace/src/block_import.rs index 4b102c1f10..362beb8b2f 100644 --- a/crates/sc-consensus-subspace/src/block_import.rs +++ b/crates/sc-consensus-subspace/src/block_import.rs @@ -37,6 +37,7 @@ use sc_client_api::BlockBackend; use sc_consensus::block_import::{ BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, }; +use sc_consensus::StateAction; use sc_proof_of_time::verifier::PotVerifier; use sp_api::{ApiError, ApiExt, ProvideRuntimeApi}; use sp_block_builder::BlockBuilder as BlockBuilderApi; @@ -170,9 +171,6 @@ pub enum Error { /// Farmer in block list #[error("Farmer {0} is in block list")] FarmerInBlockList(FarmerPublicKey), - /// No block weight for parent header - #[error("No block weight for parent header {0}")] - NoBlockWeight(Header::Hash), /// Segment commitment not found #[error("Segment commitment for segment index {0} not found")] SegmentCommitmentNotFound(SegmentIndex), @@ -337,7 +335,6 @@ where } } - #[allow(clippy::too_many_arguments)] async fn block_import_verification( &self, block_hash: Block::Hash, @@ -350,7 +347,6 @@ where FarmerSignature, >, justifications: &Option, - skip_runtime_access: bool, ) -> Result<(), Error> { let block_number = *header.number(); let parent_hash = *header.parent_hash(); @@ -367,14 +363,7 @@ where if self .client .runtime_api() - .is_in_block_list(parent_hash, &pre_digest.solution().public_key) - .or_else(|error| { - if skip_runtime_access { - Ok(false) - } else { - Err(Error::RuntimeApi(error)) - } - })? + .is_in_block_list(parent_hash, &pre_digest.solution().public_key)? { warn!( public_key = %pre_digest.solution().public_key, @@ -492,9 +481,6 @@ where pre_digest.solution().sector_index, ); - // TODO: Below `skip_runtime_access` has no impact on this, but ideally it - // should (though we don't support fast sync yet, so doesn't matter in - // practice) let max_pieces_in_sector = self .client .runtime_api() @@ -542,9 +528,6 @@ where recent_segments: chain_constants.recent_segments(), recent_history_fraction: chain_constants.recent_history_fraction(), min_sector_lifetime: chain_constants.min_sector_lifetime(), - // TODO: Below `skip_runtime_access` has no impact on this, but ideally it - // should (though we don't support fast sync yet, so doesn't matter in - // practice) current_history_size: self.client.runtime_api().history_size(parent_hash)?, sector_expiration_check_segment_commitment, }), @@ -553,37 +536,35 @@ where ) .map_err(|error| VerificationError::VerificationError(pre_digest.slot(), error))?; - if !skip_runtime_access { - // If the body is passed through, we need to use the runtime to check that the - // internally-set timestamp in the inherents actually matches the slot set in the seal - // and segment headers in the inherents are set correctly. - if let Some(extrinsics) = extrinsics { - let create_inherent_data_providers = self - .create_inherent_data_providers - .create_inherent_data_providers(parent_hash, ()) - .await - .map_err(|error| Error::Client(sp_blockchain::Error::from(error)))?; - - let inherent_data = create_inherent_data_providers - .create_inherent_data() - .await - .map_err(Error::CreateInherents)?; - - let inherent_res = self.client.runtime_api().check_inherents( - parent_hash, - Block::new(header, extrinsics), - inherent_data, - )?; - - if !inherent_res.ok() { - for (i, e) in inherent_res.into_errors() { - match create_inherent_data_providers - .try_handle_error(&i, &e) - .await - { - Some(res) => res.map_err(Error::CheckInherents)?, - None => return Err(Error::CheckInherentsUnhandled(i)), - } + // If the body is passed through, we need to use the runtime to check that the + // internally-set timestamp in the inherents actually matches the slot set in the seal + // and segment headers in the inherents are set correctly. + if let Some(extrinsics) = extrinsics { + let create_inherent_data_providers = self + .create_inherent_data_providers + .create_inherent_data_providers(parent_hash, ()) + .await + .map_err(|error| Error::Client(sp_blockchain::Error::from(error)))?; + + let inherent_data = create_inherent_data_providers + .create_inherent_data() + .await + .map_err(Error::CreateInherents)?; + + let inherent_res = self.client.runtime_api().check_inherents( + parent_hash, + Block::new(header, extrinsics), + inherent_data, + )?; + + if !inherent_res.ok() { + for (i, e) in inherent_res.into_errors() { + match create_inherent_data_providers + .try_handle_error(&i, &e) + .await + { + Some(res) => res.map_err(Error::CheckInherents)?, + None => return Err(Error::CheckInherentsUnhandled(i)), } } } @@ -634,27 +615,33 @@ where } let subspace_digest_items = extract_subspace_digest_items(&block.header)?; - let skip_execution_checks = block.state_action.skip_execution_checks(); - let root_plot_public_key = self - .client - .runtime_api() - .root_plot_public_key(*block.header.parent_hash())?; - - self.block_import_verification( - block_hash, - block.header.clone(), - block.body.clone(), - &root_plot_public_key, - &subspace_digest_items, - &block.justifications, - skip_execution_checks, - ) - .await?; + // Only do import verification if we do not have the state already. If state needs to be + // applied this means verification and execution already happened before and doesn't need to + // be done again here (often can't because parent block would be missing in special sync + // modes). + if !matches!(block.state_action, StateAction::ApplyChanges(_)) { + let root_plot_public_key = self + .client + .runtime_api() + .root_plot_public_key(*block.header.parent_hash())?; + + self.block_import_verification( + block_hash, + block.header.clone(), + block.body.clone(), + &root_plot_public_key, + &subspace_digest_items, + &block.justifications, + ) + .await?; + } let parent_weight = if block_number.is_one() { 0 } else { + // Parent block weight might be missing in special sync modes where block is imported in + // the middle of the blockchain history directly aux_schema::load_block_weight(self.client.as_ref(), block.header.parent_hash())? .unwrap_or_default() }; @@ -695,8 +682,9 @@ where // need to cover again here parent_weight } else { - aux_schema::load_block_weight(&*self.client, info.best_hash)? - .ok_or_else(|| Error::NoBlockWeight(info.best_hash))? + // Best block weight might be missing in special sync modes where block is imported + // in the middle of the blockchain history right after genesis + aux_schema::load_block_weight(&*self.client, info.best_hash)?.unwrap_or_default() }; ForkChoiceStrategy::Custom(total_weight > last_best_weight) diff --git a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs index 3b118b7c99..63021f1cfa 100644 --- a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs +++ b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs @@ -207,7 +207,7 @@ fn main() -> Result<(), Error> { force_new_slot_notifications: true, subspace_networking: SubspaceNetworking::Create { config: dsn_config }, dsn_piece_getter: None, - sync_from_dsn: true, + sync: Default::default(), is_timekeeper: false, timekeeper_cpu_cores: Default::default(), }; diff --git a/crates/subspace-node/src/commands/run/consensus.rs b/crates/subspace-node/src/commands/run/consensus.rs index 4ca7b358d9..2014b939e6 100644 --- a/crates/subspace-node/src/commands/run/consensus.rs +++ b/crates/subspace-node/src/commands/run/consensus.rs @@ -8,7 +8,7 @@ use sc_cli::{ TransactionPoolParams, RPC_DEFAULT_PORT, }; use sc_informant::OutputFormat; -use sc_network::config::{MultiaddrWithPeerId, NonReservedPeerMode, SetConfig}; +use sc_network::config::{MultiaddrWithPeerId, NonReservedPeerMode, SetConfig, SyncMode}; use sc_service::{BlocksPruning, Configuration, PruningMode}; use sc_storage_monitor::StorageMonitorParams; use sc_telemetry::TelemetryEndpoints; @@ -20,7 +20,7 @@ use std::str::FromStr; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::libp2p::Multiaddr; use subspace_service::config::{ - SubspaceConfiguration, SubspaceNetworking, SubstrateConfiguration, + ChainSyncMode, SubspaceConfiguration, SubspaceNetworking, SubstrateConfiguration, SubstrateNetworkConfiguration, SubstrateRpcConfiguration, }; use subspace_service::dsn::DsnConfig; @@ -162,6 +162,8 @@ enum StatePruningMode { Archive, /// Keep only the data of finalized blocks. ArchiveCanonical, + /// Keep the data of the last number of finalized blocks. + Number(u32), } impl FromStr for StatePruningMode { @@ -171,17 +173,21 @@ impl FromStr for StatePruningMode { match input { "archive" => Ok(Self::Archive), "archive-canonical" => Ok(Self::ArchiveCanonical), - _ => Err("Invalid state pruning mode specified".to_string()), + n => n + .parse() + .map_err(|_| "Invalid state pruning mode specified".to_string()) + .map(Self::Number), } } } impl fmt::Display for StatePruningMode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(match self { - Self::Archive => "archive", - Self::ArchiveCanonical => "archive-canonical", - }) + match self { + Self::Archive => f.write_str("archive"), + Self::ArchiveCanonical => f.write_str("archive-canonical"), + Self::Number(n) => f.write_str(n.to_string().as_str()), + } } } @@ -256,6 +262,7 @@ impl PruningOptions { match self.state_pruning { StatePruningMode::Archive => PruningMode::ArchiveAll, StatePruningMode::ArchiveCanonical => PruningMode::ArchiveCanonical, + StatePruningMode::Number(num) => PruningMode::blocks_pruning(num), } } @@ -387,16 +394,16 @@ pub(super) struct ConsensusChainOptions { #[clap(flatten)] dsn_options: DsnOptions, - /// Enables DSN-sync on startup. - #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] - sync_from_dsn: bool, - /// Parameters used to create the storage monitor. #[clap(flatten)] storage_monitor: StorageMonitorParams, #[clap(flatten)] timekeeper_options: TimekeeperOptions, + + /// Sync mode + #[arg(long, default_value_t = ChainSyncMode::Full)] + sync: ChainSyncMode, } pub(super) struct PrometheusConfiguration { @@ -437,9 +444,9 @@ pub(super) fn create_consensus_chain_configuration( mut force_authoring, pot_external_entropy, dsn_options, - sync_from_dsn, storage_monitor, mut timekeeper_options, + sync, } = consensus_node_options; let transaction_pool; @@ -584,7 +591,14 @@ pub(super) fn create_consensus_chain_configuration( chain_spec: Box::new(chain_spec), informant_output_format: OutputFormat { enable_color }, }; - let consensus_chain_config = Configuration::from(consensus_chain_config); + let mut consensus_chain_config = Configuration::from(consensus_chain_config); + // TODO: revisit SyncMode change after https://github.com/paritytech/polkadot-sdk/issues/4407 + if sync == ChainSyncMode::Snap { + consensus_chain_config.network.sync_mode = SyncMode::LightState { + skip_proofs: true, + storage_chain_mode: false, + }; + } let pot_external_entropy = derive_pot_external_entropy(&consensus_chain_config, pot_external_entropy)?; @@ -650,7 +664,7 @@ pub(super) fn create_consensus_chain_configuration( force_new_slot_notifications: domains_enabled, subspace_networking: SubspaceNetworking::Create { config: dsn_config }, dsn_piece_getter: None, - sync_from_dsn, + sync, is_timekeeper: timekeeper_options.timekeeper, timekeeper_cpu_cores: timekeeper_options.timekeeper_cpu_cores, }, diff --git a/crates/subspace-service/Cargo.toml b/crates/subspace-service/Cargo.toml index 3ab01b3c61..0b80eda764 100644 --- a/crates/subspace-service/Cargo.toml +++ b/crates/subspace-service/Cargo.toml @@ -29,6 +29,7 @@ pallet-transaction-payment-rpc = { git = "https://github.com/subspace/polkadot-s parity-scale-codec = "3.6.9" parking_lot = "0.12.2" prometheus-client = "0.22.2" +prost = "0.12" sc-basic-authorship = { git = "https://github.com/subspace/polkadot-sdk", rev = "6da3c45e1d5b3c1f09b5e54152b8848149f9d5e6" } sc-chain-spec = { git = "https://github.com/subspace/polkadot-sdk", rev = "6da3c45e1d5b3c1f09b5e54152b8848149f9d5e6" } sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "6da3c45e1d5b3c1f09b5e54152b8848149f9d5e6" } diff --git a/crates/subspace-service/src/config.rs b/crates/subspace-service/src/config.rs index 96d39789d2..7949e68a0c 100644 --- a/crates/subspace-service/src/config.rs +++ b/crates/subspace-service/src/config.rs @@ -1,5 +1,5 @@ use crate::dsn::DsnConfig; -use crate::sync_from_dsn::DsnSyncPieceGetter; +use crate::sync_from_dsn::import_blocks::DsnSyncPieceGetter; use sc_chain_spec::ChainSpec; use sc_network::config::{ MultiaddrWithPeerId, NetworkConfiguration, NodeKeyConfig, SetConfig, SyncMode, TransportConfig, @@ -14,10 +14,12 @@ use sc_service::{ }; use sc_telemetry::TelemetryEndpoints; use std::collections::HashSet; +use std::fmt; use std::net::SocketAddr; use std::num::{NonZeroU32, NonZeroUsize}; use std::ops::Deref; use std::path::PathBuf; +use std::str::FromStr; use std::sync::atomic::AtomicBool; use std::sync::Arc; use subspace_networking::libp2p::Multiaddr; @@ -263,12 +265,48 @@ pub struct SubspaceConfiguration { pub subspace_networking: SubspaceNetworking, /// DSN piece getter pub dsn_piece_getter: Option>, - /// Enables DSN-sync on startup. - pub sync_from_dsn: bool, /// Is this node a Timekeeper pub is_timekeeper: bool, /// CPU cores that timekeeper can use pub timekeeper_cpu_cores: HashSet, + /// Defines blockchain sync mode + pub sync: ChainSyncMode, +} + +/// Syncing mode. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ChainSyncMode { + /// Full sync. Download and verify all blocks from DSN. + Full, + /// Download latest state and related blocks only. Run full DSN-sync afterwards. + Snap, +} + +impl FromStr for ChainSyncMode { + type Err = String; + + fn from_str(input: &str) -> Result { + match input { + "full" => Ok(Self::Full), + "snap" => Ok(Self::Snap), + _ => Err("Unsupported sync type".to_string()), + } + } +} + +impl fmt::Display for ChainSyncMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Full => f.write_str("full"), + Self::Snap => f.write_str("snap"), + } + } +} + +impl Default for ChainSyncMode { + fn default() -> Self { + Self::Full + } } impl Deref for SubspaceConfiguration { diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index eeb827ef9c..1c0ce4e314 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -32,10 +32,11 @@ pub mod rpc; pub mod sync_from_dsn; pub mod transaction_pool; -use crate::config::{SubspaceConfiguration, SubspaceNetworking}; +use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking}; use crate::dsn::{create_dsn_instance, DsnConfigurationError}; use crate::metrics::NodeMetrics; use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator; +use crate::sync_from_dsn::snap_sync::snap_sync; use crate::transaction_pool::FullPool; use core::sync::atomic::{AtomicU32, Ordering}; use cross_domain_message_gossip::xdm_gossip_peers_set_config; @@ -280,12 +281,23 @@ where let parent_header = match client.header(parent_hash) { Ok(Some(parent_header)) => parent_header, Ok(None) => { - error!( - %parent_hash, - "Header not found during proof of time verification" - ); - - return false; + if quick_verification { + error!( + %parent_hash, + "Header not found during proof of time verification" + ); + + return false; + } else { + debug!( + %parent_hash, + "Header not found during proof of time verification" + ); + + // This can only happen during special sync modes there are no other + // cases where parent header may not be available, hence allow it + return true; + } } Err(error) => { error!( @@ -297,6 +309,7 @@ where return false; } }; + let parent_pre_digest = match extract_pre_digest(&parent_header) { Ok(parent_pre_digest) => parent_pre_digest, Err(error) => { @@ -804,7 +817,8 @@ where } }; - let import_queue_service = import_queue.service(); + let import_queue_service1 = import_queue.service(); + let import_queue_service2 = import_queue.service(); let network_wrapper = Arc::new(NetworkWrapper::default()); let block_relay = Some( build_consensus_relay( @@ -888,48 +902,66 @@ where ); network_wrapper.set(network_service.clone()); - if config.sync_from_dsn { - let dsn_sync_piece_getter = config.dsn_piece_getter.unwrap_or_else(|| { - Arc::new(PieceProvider::new( + + let dsn_sync_piece_getter = config.dsn_piece_getter.unwrap_or_else(|| { + Arc::new(PieceProvider::new( + node.clone(), + Some(SegmentCommitmentPieceValidator::new( node.clone(), - Some(SegmentCommitmentPieceValidator::new( - node.clone(), - subspace_link.kzg().clone(), - segment_headers_store.clone(), - )), - )) - }); + subspace_link.kzg().clone(), + segment_headers_store.clone(), + )), + )) + }); - if !config.base.network.force_synced { - // Start with DSN sync in this case - pause_sync.store(true, Ordering::Release); - } + if !config.base.network.force_synced { + // Start with DSN sync in this case + pause_sync.store(true, Ordering::Release); + } - let (observer, worker) = sync_from_dsn::create_observer_and_worker( - segment_headers_store.clone(), - Arc::clone(&network_service), - node.clone(), - Arc::clone(&client), - import_queue_service, - sync_target_block_number, - pause_sync, - dsn_sync_piece_getter, + let fork_id = config.base.chain_spec.fork_id().map(String::from); + + let snap_sync_task = snap_sync( + segment_headers_store.clone(), + node.clone(), + fork_id, + Arc::clone(&client), + import_queue_service1, + pause_sync.clone(), + dsn_sync_piece_getter.clone(), + Arc::clone(&network_service), + sync_service.clone(), + ); + + let (observer, worker) = sync_from_dsn::create_observer_and_worker( + segment_headers_store.clone(), + Arc::clone(&network_service), + node.clone(), + Arc::clone(&client), + import_queue_service2, + sync_target_block_number, + pause_sync, + dsn_sync_piece_getter, + ); + task_manager + .spawn_handle() + .spawn("observer", Some("sync-from-dsn"), observer); + task_manager + .spawn_essential_handle() + .spawn_essential_blocking( + "worker", + Some("sync-from-dsn"), + Box::pin(async move { + // Run snap-sync before DSN-sync. + if config.sync == ChainSyncMode::Snap { + snap_sync_task.await; + } + + if let Err(error) = worker.await { + error!(%error, "Sync from DSN exited with an error"); + } + }), ); - task_manager - .spawn_handle() - .spawn("observer", Some("sync-from-dsn"), observer); - task_manager - .spawn_essential_handle() - .spawn_essential_blocking( - "worker", - Some("sync-from-dsn"), - Box::pin(async move { - if let Err(error) = worker.await { - error!(%error, "Sync from DSN exited with an error"); - } - }), - ); - } if let Some(registry) = config.base.prometheus_registry() { match NodeMetrics::new( diff --git a/crates/subspace-service/src/sync_from_dsn.rs b/crates/subspace-service/src/sync_from_dsn.rs index 9243ddafc3..7f4f591361 100644 --- a/crates/subspace-service/src/sync_from_dsn.rs +++ b/crates/subspace-service/src/sync_from_dsn.rs @@ -1,9 +1,10 @@ -mod import_blocks; -pub(super) mod piece_validator; -mod segment_header_downloader; +pub(crate) mod import_blocks; +pub(crate) mod piece_validator; +pub(crate) mod segment_header_downloader; +pub(crate) mod snap_sync; +pub(crate) mod snap_sync_engine; -use crate::sync_from_dsn::import_blocks::import_blocks_from_dsn; -pub use crate::sync_from_dsn::import_blocks::DsnSyncPieceGetter; +use crate::sync_from_dsn::import_blocks::{import_blocks_from_dsn, DsnSyncPieceGetter}; use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader; use futures::channel::mpsc; use futures::{select, FutureExt, StreamExt}; @@ -22,7 +23,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use subspace_core_primitives::SegmentIndex; use subspace_networking::Node; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; /// How much time to wait for new block to be imported before timing out and starting sync from DSN const NO_IMPORTED_BLOCKS_TIMEOUT: Duration = Duration::from_secs(10 * 60); @@ -294,6 +295,8 @@ where } } + debug!("Finished DSN sync"); + pause_sync.store(false, Ordering::Release); while notifications.try_next().is_ok() { diff --git a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs index 2fe6057293..8fcdf3743b 100644 --- a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs +++ b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs @@ -279,7 +279,7 @@ where Ok(downloaded_blocks) } -async fn download_and_reconstruct_blocks( +pub(super) async fn download_and_reconstruct_blocks( segment_index: SegmentIndex, piece_getter: &PG, reconstructor: &mut Reconstructor, diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs new file mode 100644 index 0000000000..7a274e71b8 --- /dev/null +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -0,0 +1,455 @@ +use crate::sync_from_dsn::import_blocks::download_and_reconstruct_blocks; +use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader; +use crate::sync_from_dsn::snap_sync_engine::SnapSyncingEngine; +use crate::sync_from_dsn::DsnSyncPieceGetter; +use sc_client_api::{AuxStore, LockImportRun, ProofProvider}; +use sc_consensus::import_queue::ImportQueueService; +use sc_consensus::{ImportedState, IncomingBlock}; +use sc_consensus_subspace::archiver::{decode_block, SegmentHeadersStore}; +use sc_network::{NetworkRequest, PeerId}; +use sc_network_sync::service::syncing_service::SyncRestartArgs; +use sc_network_sync::SyncingService; +use sc_service::config::SyncMode; +use sc_service::{ClientExt, Error}; +use sp_api::ProvideRuntimeApi; +use sp_blockchain::HeaderBackend; +use sp_consensus::BlockOrigin; +use sp_consensus_subspace::{FarmerPublicKey, SubspaceApi}; +use sp_objects::ObjectsApi; +use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; +use std::collections::{HashSet, VecDeque}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use subspace_archiving::reconstructor::Reconstructor; +use subspace_core_primitives::SegmentIndex; +use subspace_networking::Node; +use tokio::time::sleep; +use tracing::{debug, error}; + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn snap_sync( + segment_headers_store: SegmentHeadersStore, + node: Node, + fork_id: Option, + client: Arc, + mut import_queue_service: Box>, + pause_sync: Arc, + piece_getter: PG, + network_request: NR, + sync_service: Arc>, +) where + Backend: sc_client_api::Backend, + Block: BlockT, + AS: AuxStore, + Client: HeaderBackend + + ClientExt + + ProvideRuntimeApi + + ProofProvider + + LockImportRun + + Send + + Sync + + 'static, + Client::Api: SubspaceApi + ObjectsApi, + PG: DsnSyncPieceGetter, + NR: NetworkRequest, +{ + let info = client.info(); + // Only attempt snap sync with genesis state + // TODO: Support snap sync from any state + if info.best_hash == info.genesis_hash { + pause_sync.store(true, Ordering::Release); + + let snap_sync_fut = sync( + &segment_headers_store, + &node, + &piece_getter, + fork_id.as_deref(), + &client, + import_queue_service.as_mut(), + &network_request, + &sync_service, + ); + + match snap_sync_fut.await { + Ok(()) => { + debug!("Snap sync finished successfully"); + } + Err(error) => { + error!(%error, "Snap sync failed"); + } + } + + pause_sync.store(false, Ordering::Release); + } else { + debug!("Snap sync can only work with genesis state, skipping"); + } + + // Switch back to full sync mode + let info = client.info(); + sync_service + .restart(SyncRestartArgs { + sync_mode: SyncMode::Full, + new_best_block: Some(info.best_number), + }) + .await; +} + +#[allow(clippy::too_many_arguments)] +async fn sync( + segment_headers_store: &SegmentHeadersStore, + node: &Node, + piece_getter: &PG, + fork_id: Option<&str>, + client: &Arc, + import_queue_service: &mut IQS, + network_request: &NR, + sync_service: &SyncingService, +) -> Result<(), Error> +where + B: sc_client_api::Backend, + PG: DsnSyncPieceGetter, + AS: AuxStore, + Block: BlockT, + Client: HeaderBackend + + ClientExt + + ProvideRuntimeApi + + ProofProvider + + LockImportRun + + Send + + Sync + + 'static, + Client::Api: SubspaceApi + ObjectsApi, + IQS: ImportQueueService + ?Sized, + NR: NetworkRequest, +{ + debug!("Starting snap sync..."); + + sync_segment_headers(segment_headers_store, node) + .await + .map_err(|error| format!("Failed to sync segment headers: {}", error))?; + + let last_segment_index = segment_headers_store + .max_segment_index() + .expect("Successfully synced above; qed"); + + // Skip the snap sync if there is just one segment header built on top of genesis, it is + // more efficient to sync it regularly + if last_segment_index <= SegmentIndex::ONE { + debug!("Snap sync was skipped due to too early chain history"); + + return Ok(()); + } + + // Identify all segment headers that would need to be reconstructed in order to get first + // block of last segment header + let mut segments_to_reconstruct = VecDeque::from([last_segment_index]); + { + let mut last_segment_first_block_number = None; + + loop { + let oldest_segment_index = *segments_to_reconstruct.front().expect("Not empty; qed"); + let segment_index = oldest_segment_index + .checked_sub(SegmentIndex::ONE) + .ok_or_else(|| { + format!( + "Attempted to get segment index before {oldest_segment_index} during \ + snap sync" + ) + })?; + let segment_header = segment_headers_store + .get_segment_header(segment_index) + .ok_or_else(|| { + format!("Failed to get segment index {segment_index} during snap sync") + })?; + let last_archived_block = segment_header.last_archived_block(); + + // If older segment header ends with fully archived block then no additional + // information is necessary + if last_archived_block.partial_archived().is_none() { + break; + } + + match last_segment_first_block_number { + Some(block_number) => { + if block_number == last_archived_block.number { + // If older segment ends with the same block number as the first block + // in the last segment then add it to the list of segments that need to + // be reconstructed + segments_to_reconstruct.push_front(segment_index); + } else { + // Otherwise we're done here + break; + } + } + None => { + last_segment_first_block_number.replace(last_archived_block.number); + // This segment will definitely be needed to reconstruct first block of the + // last segment + segments_to_reconstruct.push_front(segment_index); + } + } + } + } + + // Reconstruct blocks of the last segment + let mut blocks = VecDeque::new(); + { + let mut reconstructor = Reconstructor::new().map_err(|error| error.to_string())?; + + for segment_index in segments_to_reconstruct { + let blocks_fut = + download_and_reconstruct_blocks(segment_index, piece_getter, &mut reconstructor); + + blocks = VecDeque::from(blocks_fut.await?); + } + } + let mut blocks_to_import = Vec::with_capacity(blocks.len()); + let last_block_number; + + // First block is special because we need to download state for it + { + let (first_block_number, first_block_bytes) = blocks + .pop_front() + .expect("List of blocks is not empty according to logic above; qed"); + + // Sometimes first block is the only block + last_block_number = blocks + .back() + .map_or(first_block_number, |(block_number, _block_bytes)| { + *block_number + }); + + debug!( + %last_segment_index, + %first_block_number, + %last_block_number, + "Blocks from last segment downloaded" + ); + + let signed_block = decode_block::(&first_block_bytes) + .map_err(|error| format!("Failed to decode archived block: {error}"))?; + drop(first_block_bytes); + let (header, extrinsics) = signed_block.block.deconstruct(); + + // Download state for the first block, so it can be imported even without doing execution + let state = download_state(&header, client, fork_id, network_request, sync_service) + .await + .map_err(|error| { + format!("Failed to download state for the first block of last segment: {error}") + })?; + + debug!("Downloaded state of the first block of the last segment"); + + blocks_to_import.push(IncomingBlock { + hash: header.hash(), + header: Some(header), + body: Some(extrinsics), + indexed_body: None, + justifications: signed_block.justifications, + origin: None, + allow_missing_state: true, + import_existing: true, + skip_execution: true, + state: Some(state), + }); + } + + debug!( + blocks_count = %blocks.len(), + "Queuing importing remaining blocks from last segment" + ); + + for (_block_number, block_bytes) in blocks { + let signed_block = decode_block::(&block_bytes) + .map_err(|error| format!("Failed to decode archived block: {error}"))?; + let (header, extrinsics) = signed_block.block.deconstruct(); + + blocks_to_import.push(IncomingBlock { + hash: header.hash(), + header: Some(header), + body: Some(extrinsics), + indexed_body: None, + justifications: signed_block.justifications, + origin: None, + allow_missing_state: false, + import_existing: false, + skip_execution: false, + state: None, + }); + } + + let maybe_last_block_to_import = blocks_to_import.pop(); + + if !blocks_to_import.is_empty() { + import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import); + } + + // Import last block (if there was more than one) and notify Substrate sync about it + if let Some(last_block_to_import) = maybe_last_block_to_import { + debug!( + %last_block_number, + %last_segment_index, + "Importing the last block from the last segment" + ); + + import_queue_service + .import_blocks(BlockOrigin::NetworkBroadcast, vec![last_block_to_import]); + } + + // Wait for blocks to be imported + // TODO: Replace this hack with actual watching of block import + wait_for_block_import(client.as_ref(), last_block_number.into()).await; + + // Clear the block gap that arises from first block import with a much higher number than + // previously (resulting in a gap) + // TODO: This is a hack and better solution is needed: https://github.com/paritytech/polkadot-sdk/issues/4407 + client.clear_block_gap()?; + + debug!(info = ?client.info(), "Snap sync finished successfully"); + + Ok(()) +} + +async fn wait_for_block_import( + client: &Client, + waiting_block_number: NumberFor, +) where + Block: BlockT, + Client: HeaderBackend, +{ + const WAIT_DURATION: Duration = Duration::from_secs(5); + const MAX_NO_NEW_IMPORT_ITERATIONS: u32 = 10; + let mut current_iteration = 0; + let mut last_best_block_number = client.info().best_number; + loop { + let info = client.info(); + debug!(%current_iteration, %waiting_block_number, "Waiting client info: {:?}", info); + + tokio::time::sleep(WAIT_DURATION).await; + + if info.best_number >= waiting_block_number { + break; + } + + if last_best_block_number == info.best_number { + current_iteration += 1; + } else { + current_iteration = 0; + } + + if current_iteration >= MAX_NO_NEW_IMPORT_ITERATIONS { + debug!(%current_iteration, %waiting_block_number, "Max idle period reached. {:?}", info); + break; + } + + last_best_block_number = info.best_number; + } +} + +async fn sync_segment_headers( + segment_headers_store: &SegmentHeadersStore, + node: &Node, +) -> Result<(), Error> +where + AS: AuxStore, +{ + let max_segment_index = segment_headers_store.max_segment_index().ok_or_else(|| { + Error::Other( + "Archiver needs to be initialized before syncing from DSN to populate the very first \ + segment" + .to_string(), + ) + })?; + let new_segment_headers = SegmentHeaderDownloader::new(node) + .get_segment_headers(max_segment_index) + .await + .map_err(|error| error.to_string())?; + + debug!("Found {} new segment headers", new_segment_headers.len()); + + if !new_segment_headers.is_empty() { + segment_headers_store.add_segment_headers(&new_segment_headers)?; + } + + Ok(()) +} + +/// Download and return state for specified block +async fn download_state( + header: &Block::Header, + client: &Arc, + fork_id: Option<&str>, + network_request: &NR, + sync_service: &SyncingService, +) -> Result, Error> +where + Block: BlockT, + Client: HeaderBackend + ProofProvider + Send + Sync + 'static, + NR: NetworkRequest, +{ + let block_number = *header.number(); + + const STATE_SYNC_RETRIES: u32 = 5; + const LOOP_PAUSE: Duration = Duration::from_secs(20); + + for attempt in 1..=STATE_SYNC_RETRIES { + debug!(%attempt, "Starting state sync..."); + + debug!("Gathering peers for state sync."); + let mut tried_peers = HashSet::::new(); + + // TODO: add loop timeout + let current_peer_id = loop { + let connected_full_peers = sync_service + .peers_info() + .await + .expect("Network service must be available.") + .iter() + .filter_map(|(peer_id, info)| { + (info.roles.is_full() && info.best_number > block_number).then_some(*peer_id) + }) + .collect::>(); + + debug!(?tried_peers, "Sync peers: {}", connected_full_peers.len()); + + let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter()); + + if let Some(peer_id) = active_peers_set.difference(&tried_peers).next().cloned() { + break peer_id; + } + + sleep(LOOP_PAUSE).await; + }; + + tried_peers.insert(current_peer_id); + + let sync_engine = SnapSyncingEngine::::new( + client.clone(), + fork_id, + header.clone(), + false, + (current_peer_id, block_number), + network_request, + ) + .map_err(Error::Client)?; + + let last_block_from_sync_result = sync_engine.download_state().await; + + match last_block_from_sync_result { + Ok(block_to_import) => { + debug!("Sync worker handle result: {:?}", block_to_import); + + return block_to_import.state.ok_or_else(|| { + Error::Other("Imported state was missing in synced block".into()) + }); + } + Err(error) => { + error!(%error, "State sync error"); + continue; + } + } + } + + Err(Error::Other("All snap sync retries failed".into())) +} diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync_engine.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync_engine.rs new file mode 100644 index 0000000000..13fb7f81aa --- /dev/null +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync_engine.rs @@ -0,0 +1,221 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! `SyncingEngine` is the actor responsible for syncing Substrate chain +//! to tip and keep the blockchain up to date with network updates. + +use futures::channel::oneshot; +use futures::{FutureExt, StreamExt}; +use prost::Message; +use sc_client_api::ProofProvider; +use sc_consensus::IncomingBlock; +use sc_network::request_responses::IfDisconnected; +use sc_network::types::ProtocolName; +use sc_network::{NetworkRequest, PeerId}; +use sc_network_sync::pending_responses::{PendingResponses, ResponseEvent}; +use sc_network_sync::schema::v1::{StateRequest, StateResponse}; +use sc_network_sync::state_request_handler::generate_protocol_name; +use sc_network_sync::strategy::state::{StateStrategy, StateStrategyAction}; +use sc_network_sync::strategy::StrategyKey; +use sc_network_sync::types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, PeerRequest}; +use sp_blockchain::{Error as ClientError, HeaderBackend}; +use sp_runtime::traits::{Block as BlockT, NumberFor}; +use std::sync::Arc; +use tracing::{debug, error, trace, warn}; + +pub struct SnapSyncingEngine<'a, Block, NR> +where + Block: BlockT, +{ + /// Syncing strategy + strategy: StateStrategy, + /// Network request handle + network_request: &'a NR, + /// Pending responses + pending_responses: PendingResponses, + /// Protocol name used to send out state requests + state_request_protocol_name: ProtocolName, +} + +impl<'a, Block, NR> SnapSyncingEngine<'a, Block, NR> +where + Block: BlockT, + NR: NetworkRequest, +{ + pub fn new( + client: Arc, + fork_id: Option<&str>, + target_header: Block::Header, + skip_proof: bool, + current_sync_peer: (PeerId, NumberFor), + network_request: &'a NR, + ) -> Result + where + Client: HeaderBackend + ProofProvider + Send + Sync + 'static, + { + let state_request_protocol_name = + generate_protocol_name(client.info().genesis_hash, fork_id).into(); + + // Initialize syncing strategy. + let strategy = StateStrategy::new( + client, + target_header, + // We only care about the state, this value is just forwarded back into block to + // import that is thrown away below + None, + // We only care about the state, this value is just forwarded back into block to + // import that is thrown away below + None, + skip_proof, + vec![current_sync_peer].into_iter(), + ); + + Ok(Self { + strategy, + network_request, + pending_responses: PendingResponses::new(), + state_request_protocol_name, + }) + } + + // Downloads state and returns incoming block with state pre-populated and ready for importing + pub async fn download_state(mut self) -> Result, ClientError> { + debug!("Starting state downloading"); + + loop { + // Process actions requested by a syncing strategy. + let mut actions = self.strategy.actions().peekable(); + if actions.peek().is_none() { + return Err(ClientError::Backend( + "Sync state download failed: no further actions".into(), + )); + } + + for action in actions { + match action { + StateStrategyAction::SendStateRequest { peer_id, request } => { + self.send_state_request(peer_id, StrategyKey::State, request); + } + StateStrategyAction::DropPeer(BadPeer(peer_id, rep)) => { + self.pending_responses.remove(peer_id, StrategyKey::State); + + trace!(%peer_id, "Peer dropped: {rep:?}"); + } + StateStrategyAction::ImportBlocks { blocks, .. } => { + return blocks.into_iter().next().ok_or_else(|| { + ClientError::Application( + "StateStrategyAction::ImportBlocks didn't contain any blocks to import" + .into(), + ) + }); + } + StateStrategyAction::Finished => { + return Err(ClientError::Backend( + "Sync state finished without blocks to import".into(), + )); + } + } + } + + let response_event = self.pending_responses.select_next_some().await; + self.process_response_event(response_event); + } + } + + fn send_state_request( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: OpaqueStateRequest, + ) { + let (tx, rx) = oneshot::channel(); + + self.pending_responses + .insert(peer_id, key, PeerRequest::State, rx.boxed()); + + match Self::encode_state_request(&request) { + Ok(data) => { + self.network_request.start_request( + peer_id, + self.state_request_protocol_name.clone(), + data, + None, + tx, + IfDisconnected::ImmediateError, + ); + } + Err(err) => { + warn!("Failed to encode state request {request:?}: {err:?}",); + } + } + } + + fn encode_state_request(request: &OpaqueStateRequest) -> Result, String> { + let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { + "Failed to downcast opaque state response during encoding, this is an implementation \ + bug" + .to_string() + })?; + + Ok(request.encode_to_vec()) + } + + fn decode_state_response(response: &[u8]) -> Result { + let response = StateResponse::decode(response) + .map_err(|error| format!("Failed to decode state response: {error}"))?; + + Ok(OpaqueStateResponse(Box::new(response))) + } + + fn process_response_event(&mut self, response_event: ResponseEvent) { + let ResponseEvent { + peer_id, + request, + response, + .. + } = response_event; + + match response { + Ok(Ok((resp, _))) => match request { + PeerRequest::Block(req) => { + error!("Unexpected PeerRequest::Block - {:?}", req); + } + PeerRequest::State => { + let response = match Self::decode_state_response(&resp[..]) { + Ok(proto) => proto, + Err(e) => { + debug!("Failed to decode state response from peer {peer_id:?}: {e:?}.",); + return; + } + }; + + self.strategy.on_state_response(peer_id, response); + } + PeerRequest::WarpProof => { + error!("Unexpected PeerRequest::WarpProof",); + } + }, + Ok(Err(e)) => { + debug!("Request to peer {peer_id:?} failed: {e:?}."); + } + Err(oneshot::Canceled) => { + trace!("Request to peer {peer_id:?} failed due to oneshot being canceled.",); + } + } + } +}