diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index ed5866e214..375259192f 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use subspace_archiving::reconstructor::Reconstructor; -use subspace_core_primitives::SegmentIndex; +use subspace_core_primitives::{BlockNumber, SegmentIndex}; use subspace_networking::Node; use tokio::time::sleep; use tracing::{debug, error}; @@ -69,6 +69,7 @@ pub(crate) async fn snap_sync( import_queue_service.as_mut(), &network_request, &sync_service, + None, ); match snap_sync_fut.await { @@ -95,55 +96,83 @@ pub(crate) async fn snap_sync( .await; } -#[allow(clippy::too_many_arguments)] -async fn sync( +// Get blocks from the last segment or from the segment containing the target block. +// Returns encoded blocks collection and used segment index. +pub(crate) async fn get_blocks_from_target_segment( segment_headers_store: &SegmentHeadersStore, node: &Node, piece_getter: &PG, - fork_id: Option<&str>, - client: &Arc, - import_queue_service: &mut IQS, - network_request: &NR, - sync_service: &SyncingService, -) -> Result<(), Error> + target_block: Option, +) -> Result)>)>, Error> where - B: sc_client_api::Backend, - PG: DsnSyncPieceGetter, AS: AuxStore, - Block: BlockT, - Client: HeaderBackend - + ClientExt - + ProvideRuntimeApi - + ProofProvider - + LockImportRun - + Send - + Sync - + 'static, - Client::Api: SubspaceApi + ObjectsApi, - IQS: ImportQueueService + ?Sized, - NR: NetworkRequest, + PG: DsnSyncPieceGetter, { - debug!("Starting snap sync..."); - sync_segment_headers(segment_headers_store, node) .await .map_err(|error| format!("Failed to sync segment headers: {}", error))?; - let last_segment_index = segment_headers_store - .max_segment_index() - .expect("Successfully synced above; qed"); + let target_segment_index = { + let last_segment_index = segment_headers_store + .max_segment_index() + .expect("Successfully synced above; qed"); + + if let Some(target_block) = target_block { + let mut segment_header = segment_headers_store + .get_segment_header(last_segment_index) + .ok_or(format!( + "Can't get segment header from the store: {last_segment_index}" + ))?; + + if target_block > segment_header.last_archived_block().number { + return Err(format!( + "Target block is greater than the last archived block. \ + Last segment index = {last_segment_index}, target block = {target_block}, \ + last block from the segment = {} + ", + segment_header.last_archived_block().number + ) + .into()); + } + + let mut current_segment_index = last_segment_index; + + loop { + if current_segment_index <= SegmentIndex::ONE { + break; + } + + if target_block > segment_header.last_archived_block().number { + current_segment_index += SegmentIndex::ONE; + break; + } + + current_segment_index -= SegmentIndex::ONE; + + segment_header = segment_headers_store + .get_segment_header(current_segment_index) + .ok_or(format!( + "Can't get segment header from the store: {last_segment_index}" + ))?; + } + + current_segment_index + } else { + last_segment_index + } + }; // Skip the snap sync if there is just one segment header built on top of genesis, it is // more efficient to sync it regularly - if last_segment_index <= SegmentIndex::ONE { + if target_segment_index <= SegmentIndex::ONE { debug!("Snap sync was skipped due to too early chain history"); - return Ok(()); + return Ok(None); } // Identify all segment headers that would need to be reconstructed in order to get first // block of last segment header - let mut segments_to_reconstruct = VecDeque::from([last_segment_index]); + let mut segments_to_reconstruct = VecDeque::from([target_segment_index]); { let mut last_segment_first_block_number = None; @@ -204,6 +233,56 @@ where blocks = VecDeque::from(blocks_fut.await?); } } + + Ok(Some((target_segment_index, blocks))) +} + +#[allow(clippy::too_many_arguments)] +/// Synchronize the blockchain to the target_block (approximate value based on the containing +/// segment) or to the last archived block. +async fn sync( + segment_headers_store: &SegmentHeadersStore, + node: &Node, + piece_getter: &PG, + fork_id: Option<&str>, + client: &Arc, + import_queue_service: &mut IQS, + network_request: &NR, + sync_service: &SyncingService, + target_block: Option, +) -> Result<(), Error> +where + B: sc_client_api::Backend, + PG: DsnSyncPieceGetter, + AS: AuxStore, + Block: BlockT, + Client: HeaderBackend + + ClientExt + + ProvideRuntimeApi + + ProofProvider + + LockImportRun + + Send + + Sync + + 'static, + Client::Api: SubspaceApi + ObjectsApi, + IQS: ImportQueueService + ?Sized, + NR: NetworkRequest, +{ + debug!("Starting snap sync..."); + + let Some((target_segment_index, mut blocks)) = + get_blocks_from_target_segment(segment_headers_store, node, piece_getter, target_block) + .await? + else { + // Snap-sync skipped + return Ok(()); + }; + + debug!( + "Segments data received. Target segment index: {:?}", + target_segment_index + ); + let mut blocks_to_import = Vec::with_capacity(blocks.len()); let last_block_number; @@ -221,10 +300,10 @@ where }); debug!( - %last_segment_index, + %target_segment_index, %first_block_number, %last_block_number, - "Blocks from last segment downloaded" + "Blocks from target segment downloaded" ); let signed_block = decode_block::(&first_block_bytes) @@ -236,10 +315,10 @@ where let state = download_state(&header, client, fork_id, network_request, sync_service) .await .map_err(|error| { - format!("Failed to download state for the first block of last segment: {error}") + format!("Failed to download state for the first block of target segment: {error}") })?; - debug!("Downloaded state of the first block of the last segment"); + debug!("Downloaded state of the first block of the target segment"); blocks_to_import.push(IncomingBlock { hash: header.hash(), @@ -257,7 +336,7 @@ where debug!( blocks_count = %blocks.len(), - "Queuing importing remaining blocks from last segment" + "Queuing importing remaining blocks from target segment" ); for (_block_number, block_bytes) in blocks { @@ -289,8 +368,8 @@ where if let Some(last_block_to_import) = maybe_last_block_to_import { debug!( %last_block_number, - %last_segment_index, - "Importing the last block from the last segment" + %target_segment_index, + "Importing the last block from the target segment" ); import_queue_service