From ab5105fba2da475aa36eb798894678cba749a0b8 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 12 Jul 2024 18:39:19 +0400 Subject: [PATCH 1/5] Refactor snap-sync. - add optional target block - add conditional block import --- .../src/sync_from_dsn/snap_sync.rs | 198 +++++++++++++----- 1 file changed, 140 insertions(+), 58 deletions(-) diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index a9cfc85738..838263d63b 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use subspace_archiving::reconstructor::Reconstructor; -use subspace_core_primitives::SegmentIndex; +use subspace_core_primitives::{BlockNumber, SegmentIndex}; use subspace_networking::Node; use tokio::time::sleep; use tracing::{debug, error}; @@ -69,6 +69,8 @@ pub(crate) async fn snap_sync( import_queue_service.as_mut(), &network_request, &sync_service, + None, + true, ); match snap_sync_fut.await { @@ -95,55 +97,85 @@ pub(crate) async fn snap_sync( .await; } -#[allow(clippy::too_many_arguments)] -async fn sync( +// Get blocks from the last segment or from the segment containing the target block. +// Returns encoded blocks collection and used segment index. +pub(crate) async fn get_blocks_from_target_segment( segment_headers_store: &SegmentHeadersStore, node: &Node, piece_getter: &PG, - fork_id: Option<&str>, - client: &Arc, - import_queue_service: &mut IQS, - network_request: &NR, - sync_service: &SyncingService, -) -> Result<(), Error> + target_block: Option, +) -> Result)>)>, Error> where - B: sc_client_api::Backend, - PG: DsnSyncPieceGetter, AS: AuxStore, - Block: BlockT, - Client: HeaderBackend - + ClientExt - + ProvideRuntimeApi - + ProofProvider - + LockImportRun - + Send - + Sync - + 'static, - Client::Api: SubspaceApi + ObjectsApi, - IQS: ImportQueueService + ?Sized, - NR: NetworkRequest, + PG: DsnSyncPieceGetter, { - debug!("Starting snap sync..."); - sync_segment_headers(segment_headers_store, node) .await .map_err(|error| format!("Failed to sync segment headers: {}", error))?; - let last_segment_index = segment_headers_store - .max_segment_index() - .expect("Successfully synced above; qed"); + let target_segment_index = { + let last_segment_index = segment_headers_store + .max_segment_index() + .expect("Successfully synced above; qed"); + + if let Some(target_block) = target_block { + let mut segment_header = segment_headers_store + .get_segment_header(last_segment_index) + .ok_or(format!( + "Can't get segment header from the store: {last_segment_index}" + ))?; + + if segment_header.last_archived_block().number < target_block { + return Err(format!( + "Target block is greater than the last archived block. \ + Last segment index = {last_segment_index}, target block = {target_block}, \ + last block from the segment = {} + ", + segment_header.last_archived_block().number + ) + .into()); + } + + let mut current_segment_index = last_segment_index; + + loop { + if segment_header.last_archived_block().number == target_block + || current_segment_index <= SegmentIndex::ONE + { + break; + } + + if segment_header.last_archived_block().number < target_block { + current_segment_index += SegmentIndex::ONE; + break; + } + + current_segment_index -= SegmentIndex::ONE; + + segment_header = segment_headers_store + .get_segment_header(current_segment_index) + .ok_or(format!( + "Can't get segment header from the store: {last_segment_index}" + ))?; + } + + current_segment_index + } else { + last_segment_index + } + }; // Skip the snap sync if there is just one segment header built on top of genesis, it is // more efficient to sync it regularly - if last_segment_index <= SegmentIndex::ONE { + if target_segment_index <= SegmentIndex::ONE { debug!("Snap sync was skipped due to too early chain history"); - return Ok(()); + return Ok(None); } // Identify all segment headers that would need to be reconstructed in order to get first // block of last segment header - let mut segments_to_reconstruct = VecDeque::from([last_segment_index]); + let mut segments_to_reconstruct = VecDeque::from([target_segment_index]); { let mut last_segment_first_block_number = None; @@ -204,6 +236,54 @@ where blocks = VecDeque::from(blocks_fut.await?); } } + + Ok(Some((target_segment_index, blocks))) +} + +#[allow(clippy::too_many_arguments)] +async fn sync( + segment_headers_store: &SegmentHeadersStore, + node: &Node, + piece_getter: &PG, + fork_id: Option<&str>, + client: &Arc, + import_queue_service: &mut IQS, + network_request: &NR, + sync_service: &SyncingService, + target_block: Option, + import_blocks_from_downloaded_segment: bool, +) -> Result<(), Error> +where + B: sc_client_api::Backend, + PG: DsnSyncPieceGetter, + AS: AuxStore, + Block: BlockT, + Client: HeaderBackend + + ClientExt + + ProvideRuntimeApi + + ProofProvider + + LockImportRun + + Send + + Sync + + 'static, + Client::Api: SubspaceApi + ObjectsApi, + IQS: ImportQueueService + ?Sized, + NR: NetworkRequest, +{ + debug!("Starting snap sync..."); + + let Some((target_segment_index, mut blocks)) = + get_blocks_from_target_segment(segment_headers_store, node, piece_getter, target_block) + .await? + else { + return Ok(()); // Snap-sync skipped + }; + + debug!( + "Segments data received. Target segment index: {:?}", + target_segment_index + ); + let mut blocks_to_import = Vec::with_capacity(blocks.len()); let last_block_number; @@ -221,10 +301,10 @@ where }); debug!( - %last_segment_index, + %target_segment_index, %first_block_number, %last_block_number, - "Blocks from last segment downloaded" + "Blocks from target segment downloaded" ); let signed_block = decode_block::(&first_block_bytes) @@ -236,10 +316,10 @@ where let state = download_state(&header, client, fork_id, network_request, sync_service) .await .map_err(|error| { - format!("Failed to download state for the first block of last segment: {error}") + format!("Failed to download state for the first block of target segment: {error}") })?; - debug!("Downloaded state of the first block of the last segment"); + debug!("Downloaded state of the first block of the target segment"); blocks_to_import.push(IncomingBlock { hash: header.hash(), @@ -255,28 +335,30 @@ where }); } - debug!( - blocks_count = %blocks.len(), - "Queuing importing remaining blocks from last segment" - ); - - for (_block_number, block_bytes) in blocks { - let signed_block = decode_block::(&block_bytes) - .map_err(|error| format!("Failed to decode archived block: {error}"))?; - let (header, extrinsics) = signed_block.block.deconstruct(); + if import_blocks_from_downloaded_segment { + debug!( + blocks_count = %blocks.len(), + "Queuing importing remaining blocks from target segment" + ); - blocks_to_import.push(IncomingBlock { - hash: header.hash(), - header: Some(header), - body: Some(extrinsics), - indexed_body: None, - justifications: signed_block.justifications, - origin: None, - allow_missing_state: false, - import_existing: false, - skip_execution: false, - state: None, - }); + for (_block_number, block_bytes) in blocks { + let signed_block = decode_block::(&block_bytes) + .map_err(|error| format!("Failed to decode archived block: {error}"))?; + let (header, extrinsics) = signed_block.block.deconstruct(); + + blocks_to_import.push(IncomingBlock { + hash: header.hash(), + header: Some(header), + body: Some(extrinsics), + indexed_body: None, + justifications: signed_block.justifications, + origin: None, + allow_missing_state: false, + import_existing: false, + skip_execution: false, + state: None, + }); + } } let maybe_last_block_to_import = blocks_to_import.pop(); @@ -289,8 +371,8 @@ where if let Some(last_block_to_import) = maybe_last_block_to_import { debug!( %last_block_number, - %last_segment_index, - "Importing the last block from the last segment" + %target_segment_index, + "Importing the last block from the target segment" ); import_queue_service From 0dd42d07544c634cd529311799d753a1097df58a Mon Sep 17 00:00:00 2001 From: shamil-gadelshin Date: Mon, 15 Jul 2024 14:22:00 +0400 Subject: [PATCH 2/5] Update crates/subspace-service/src/sync_from_dsn/snap_sync.rs Co-authored-by: Nazar Mokrynskyi --- crates/subspace-service/src/sync_from_dsn/snap_sync.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index 838263d63b..66907fe7c5 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -276,7 +276,8 @@ where get_blocks_from_target_segment(segment_headers_store, node, piece_getter, target_block) .await? else { - return Ok(()); // Snap-sync skipped + // Snap-sync skipped + return Ok(()); }; debug!( From 00d4f9108c9a207330b638ef314de1cae7624939 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 17 Jul 2024 17:58:01 +0400 Subject: [PATCH 3/5] Change condition for target_block (snap-sync). --- crates/subspace-service/src/sync_from_dsn/snap_sync.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index 66907fe7c5..c6df548e2a 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -125,7 +125,7 @@ where "Can't get segment header from the store: {last_segment_index}" ))?; - if segment_header.last_archived_block().number < target_block { + if target_block > segment_header.last_archived_block().number { return Err(format!( "Target block is greater than the last archived block. \ Last segment index = {last_segment_index}, target block = {target_block}, \ @@ -139,13 +139,13 @@ where let mut current_segment_index = last_segment_index; loop { - if segment_header.last_archived_block().number == target_block + if target_block == segment_header.last_archived_block().number || current_segment_index <= SegmentIndex::ONE { break; } - if segment_header.last_archived_block().number < target_block { + if target_block > segment_header.last_archived_block().number { current_segment_index += SegmentIndex::ONE; break; } From 43d991e2fa4931279b33a87d4258c62da95fe648 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Thu, 18 Jul 2024 15:28:48 +0400 Subject: [PATCH 4/5] Refactor snap-sync. --- .../src/sync_from_dsn/snap_sync.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index c6df548e2a..691be24ec4 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -70,7 +70,7 @@ pub(crate) async fn snap_sync( &network_request, &sync_service, None, - true, + false, ); match snap_sync_fut.await { @@ -139,9 +139,7 @@ where let mut current_segment_index = last_segment_index; loop { - if target_block == segment_header.last_archived_block().number - || current_segment_index <= SegmentIndex::ONE - { + if current_segment_index <= SegmentIndex::ONE { break; } @@ -241,6 +239,12 @@ where } #[allow(clippy::too_many_arguments)] +/// Synchronize the blockchain to the target_block (approximate value based on the containing +/// segment) or to the last archived block. +/// +/// Note: setting import_state_block_only will import only the block related to the state (a block +/// number equal or less than target_block or the first block of the last archived segment) and +/// disable importing the remaining blocks of the downloaded segment. async fn sync( segment_headers_store: &SegmentHeadersStore, node: &Node, @@ -251,7 +255,7 @@ async fn sync( network_request: &NR, sync_service: &SyncingService, target_block: Option, - import_blocks_from_downloaded_segment: bool, + import_state_block_only: bool, ) -> Result<(), Error> where B: sc_client_api::Backend, @@ -336,7 +340,7 @@ where }); } - if import_blocks_from_downloaded_segment { + if !import_state_block_only { debug!( blocks_count = %blocks.len(), "Queuing importing remaining blocks from target segment" From 57c7ef6007efd0a97241a0598d36f939e682dd38 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Tue, 23 Jul 2024 14:03:59 +0400 Subject: [PATCH 5/5] Remove import_state_block_only variable --- .../src/sync_from_dsn/snap_sync.rs | 50 ++++++++----------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs index 691be24ec4..2644e3124a 100644 --- a/crates/subspace-service/src/sync_from_dsn/snap_sync.rs +++ b/crates/subspace-service/src/sync_from_dsn/snap_sync.rs @@ -70,7 +70,6 @@ pub(crate) async fn snap_sync( &network_request, &sync_service, None, - false, ); match snap_sync_fut.await { @@ -241,10 +240,6 @@ where #[allow(clippy::too_many_arguments)] /// Synchronize the blockchain to the target_block (approximate value based on the containing /// segment) or to the last archived block. -/// -/// Note: setting import_state_block_only will import only the block related to the state (a block -/// number equal or less than target_block or the first block of the last archived segment) and -/// disable importing the remaining blocks of the downloaded segment. async fn sync( segment_headers_store: &SegmentHeadersStore, node: &Node, @@ -255,7 +250,6 @@ async fn sync( network_request: &NR, sync_service: &SyncingService, target_block: Option, - import_state_block_only: bool, ) -> Result<(), Error> where B: sc_client_api::Backend, @@ -340,30 +334,28 @@ where }); } - if !import_state_block_only { - debug!( - blocks_count = %blocks.len(), - "Queuing importing remaining blocks from target segment" - ); + debug!( + blocks_count = %blocks.len(), + "Queuing importing remaining blocks from target segment" + ); - for (_block_number, block_bytes) in blocks { - let signed_block = decode_block::(&block_bytes) - .map_err(|error| format!("Failed to decode archived block: {error}"))?; - let (header, extrinsics) = signed_block.block.deconstruct(); - - blocks_to_import.push(IncomingBlock { - hash: header.hash(), - header: Some(header), - body: Some(extrinsics), - indexed_body: None, - justifications: signed_block.justifications, - origin: None, - allow_missing_state: false, - import_existing: false, - skip_execution: false, - state: None, - }); - } + for (_block_number, block_bytes) in blocks { + let signed_block = decode_block::(&block_bytes) + .map_err(|error| format!("Failed to decode archived block: {error}"))?; + let (header, extrinsics) = signed_block.block.deconstruct(); + + blocks_to_import.push(IncomingBlock { + hash: header.hash(), + header: Some(header), + body: Some(extrinsics), + indexed_body: None, + justifications: signed_block.justifications, + origin: None, + allow_missing_state: false, + import_existing: false, + skip_execution: false, + state: None, + }); } let maybe_last_block_to_import = blocks_to_import.pop();