Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor snap-sync. #2916

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 141 additions & 58 deletions crates/subspace-service/src/sync_from_dsn/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::reconstructor::Reconstructor;
use subspace_core_primitives::SegmentIndex;
use subspace_core_primitives::{BlockNumber, SegmentIndex};
use subspace_networking::Node;
use tokio::time::sleep;
use tracing::{debug, error};
Expand Down Expand Up @@ -69,6 +69,8 @@ pub(crate) async fn snap_sync<Backend, Block, AS, Client, PG, NR>(
import_queue_service.as_mut(),
&network_request,
&sync_service,
None,
true,
);

match snap_sync_fut.await {
Expand All @@ -95,55 +97,85 @@ pub(crate) async fn snap_sync<Backend, Block, AS, Client, PG, NR>(
.await;
}

#[allow(clippy::too_many_arguments)]
async fn sync<PG, AS, Block, Client, IQS, B, NR>(
// Get blocks from the last segment or from the segment containing the target block.
// Returns encoded blocks collection and used segment index.
pub(crate) async fn get_blocks_from_target_segment<AS, PG>(
segment_headers_store: &SegmentHeadersStore<AS>,
node: &Node,
piece_getter: &PG,
fork_id: Option<&str>,
client: &Arc<Client>,
import_queue_service: &mut IQS,
network_request: &NR,
sync_service: &SyncingService<Block>,
) -> Result<(), Error>
target_block: Option<BlockNumber>,
) -> Result<Option<(SegmentIndex, VecDeque<(BlockNumber, Vec<u8>)>)>, Error>
where
B: sc_client_api::Backend<Block>,
PG: DsnSyncPieceGetter,
AS: AuxStore,
Block: BlockT,
Client: HeaderBackend<Block>
+ ClientExt<Block, B>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ LockImportRun<Block, B>
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
IQS: ImportQueueService<Block> + ?Sized,
NR: NetworkRequest,
PG: DsnSyncPieceGetter,
{
debug!("Starting snap sync...");

sync_segment_headers(segment_headers_store, node)
.await
.map_err(|error| format!("Failed to sync segment headers: {}", error))?;

let last_segment_index = segment_headers_store
.max_segment_index()
.expect("Successfully synced above; qed");
let target_segment_index = {
let last_segment_index = segment_headers_store
.max_segment_index()
.expect("Successfully synced above; qed");

if let Some(target_block) = target_block {
let mut segment_header = segment_headers_store
.get_segment_header(last_segment_index)
.ok_or(format!(
"Can't get segment header from the store: {last_segment_index}"
))?;

if target_block > segment_header.last_archived_block().number {
return Err(format!(
"Target block is greater than the last archived block. \
Last segment index = {last_segment_index}, target block = {target_block}, \
last block from the segment = {}
",
segment_header.last_archived_block().number
)
.into());
}

let mut current_segment_index = last_segment_index;

loop {
if target_block == segment_header.last_archived_block().number
|| current_segment_index <= SegmentIndex::ONE
{
break;
}

if target_block > segment_header.last_archived_block().number {
current_segment_index += SegmentIndex::ONE;
break;
}

current_segment_index -= SegmentIndex::ONE;
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved

segment_header = segment_headers_store
.get_segment_header(current_segment_index)
.ok_or(format!(
"Can't get segment header from the store: {last_segment_index}"
))?;
}

current_segment_index
} else {
last_segment_index
}
};

// Skip the snap sync if there is just one segment header built on top of genesis, it is
// more efficient to sync it regularly
if last_segment_index <= SegmentIndex::ONE {
if target_segment_index <= SegmentIndex::ONE {
debug!("Snap sync was skipped due to too early chain history");

return Ok(());
return Ok(None);
}

// Identify all segment headers that would need to be reconstructed in order to get first
// block of last segment header
let mut segments_to_reconstruct = VecDeque::from([last_segment_index]);
let mut segments_to_reconstruct = VecDeque::from([target_segment_index]);
{
let mut last_segment_first_block_number = None;

Expand Down Expand Up @@ -204,6 +236,55 @@ where
blocks = VecDeque::from(blocks_fut.await?);
}
}

Ok(Some((target_segment_index, blocks)))
}

#[allow(clippy::too_many_arguments)]
async fn sync<PG, AS, Block, Client, IQS, B, NR>(
segment_headers_store: &SegmentHeadersStore<AS>,
node: &Node,
piece_getter: &PG,
fork_id: Option<&str>,
client: &Arc<Client>,
import_queue_service: &mut IQS,
network_request: &NR,
sync_service: &SyncingService<Block>,
target_block: Option<BlockNumber>,
import_blocks_from_downloaded_segment: bool,
) -> Result<(), Error>
where
B: sc_client_api::Backend<Block>,
PG: DsnSyncPieceGetter,
AS: AuxStore,
Block: BlockT,
Client: HeaderBackend<Block>
+ ClientExt<Block, B>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ LockImportRun<Block, B>
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
IQS: ImportQueueService<Block> + ?Sized,
NR: NetworkRequest,
{
debug!("Starting snap sync...");

let Some((target_segment_index, mut blocks)) =
get_blocks_from_target_segment(segment_headers_store, node, piece_getter, target_block)
.await?
else {
// Snap-sync skipped
return Ok(());
};

debug!(
"Segments data received. Target segment index: {:?}",
target_segment_index
);

let mut blocks_to_import = Vec::with_capacity(blocks.len());
let last_block_number;

Expand All @@ -221,10 +302,10 @@ where
});

debug!(
%last_segment_index,
%target_segment_index,
%first_block_number,
%last_block_number,
"Blocks from last segment downloaded"
"Blocks from target segment downloaded"
);

let signed_block = decode_block::<Block>(&first_block_bytes)
Expand All @@ -236,10 +317,10 @@ where
let state = download_state(&header, client, fork_id, network_request, sync_service)
.await
.map_err(|error| {
format!("Failed to download state for the first block of last segment: {error}")
format!("Failed to download state for the first block of target segment: {error}")
})?;

debug!("Downloaded state of the first block of the last segment");
debug!("Downloaded state of the first block of the target segment");

blocks_to_import.push(IncomingBlock {
hash: header.hash(),
Expand All @@ -255,28 +336,30 @@ where
});
}

debug!(
blocks_count = %blocks.len(),
"Queuing importing remaining blocks from last segment"
);

for (_block_number, block_bytes) in blocks {
let signed_block = decode_block::<Block>(&block_bytes)
.map_err(|error| format!("Failed to decode archived block: {error}"))?;
let (header, extrinsics) = signed_block.block.deconstruct();
if import_blocks_from_downloaded_segment {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case of not doing this though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name it something like import_a_single_block or similar? Right now ti indicates we do not import blocks, but we do, we just import one instead of all.

debug!(
blocks_count = %blocks.len(),
"Queuing importing remaining blocks from target segment"
);

blocks_to_import.push(IncomingBlock {
hash: header.hash(),
header: Some(header),
body: Some(extrinsics),
indexed_body: None,
justifications: signed_block.justifications,
origin: None,
allow_missing_state: false,
import_existing: false,
skip_execution: false,
state: None,
});
for (_block_number, block_bytes) in blocks {
let signed_block = decode_block::<Block>(&block_bytes)
.map_err(|error| format!("Failed to decode archived block: {error}"))?;
let (header, extrinsics) = signed_block.block.deconstruct();

blocks_to_import.push(IncomingBlock {
hash: header.hash(),
header: Some(header),
body: Some(extrinsics),
indexed_body: None,
justifications: signed_block.justifications,
origin: None,
allow_missing_state: false,
import_existing: false,
skip_execution: false,
state: None,
});
}
}

let maybe_last_block_to_import = blocks_to_import.pop();
Expand All @@ -289,8 +372,8 @@ where
if let Some(last_block_to_import) = maybe_last_block_to_import {
debug!(
%last_block_number,
%last_segment_index,
"Importing the last block from the last segment"
%target_segment_index,
"Importing the last block from the target segment"
);

import_queue_service
Expand Down
Loading