Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor snap-sync. #2916

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 140 additions & 58 deletions crates/subspace-service/src/sync_from_dsn/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::reconstructor::Reconstructor;
use subspace_core_primitives::SegmentIndex;
use subspace_core_primitives::{BlockNumber, SegmentIndex};
use subspace_networking::Node;
use tokio::time::sleep;
use tracing::{debug, error};
Expand Down Expand Up @@ -69,6 +69,8 @@ pub(crate) async fn snap_sync<Backend, Block, AS, Client, PG, NR>(
import_queue_service.as_mut(),
&network_request,
&sync_service,
None,
true,
);

match snap_sync_fut.await {
Expand All @@ -95,55 +97,85 @@ pub(crate) async fn snap_sync<Backend, Block, AS, Client, PG, NR>(
.await;
}

#[allow(clippy::too_many_arguments)]
async fn sync<PG, AS, Block, Client, IQS, B, NR>(
// Get blocks from the last segment or from the segment containing the target block.
// Returns encoded blocks collection and used segment index.
pub(crate) async fn get_blocks_from_target_segment<AS, PG>(
segment_headers_store: &SegmentHeadersStore<AS>,
node: &Node,
piece_getter: &PG,
fork_id: Option<&str>,
client: &Arc<Client>,
import_queue_service: &mut IQS,
network_request: &NR,
sync_service: &SyncingService<Block>,
) -> Result<(), Error>
target_block: Option<BlockNumber>,
) -> Result<Option<(SegmentIndex, VecDeque<(BlockNumber, Vec<u8>)>)>, Error>
where
B: sc_client_api::Backend<Block>,
PG: DsnSyncPieceGetter,
AS: AuxStore,
Block: BlockT,
Client: HeaderBackend<Block>
+ ClientExt<Block, B>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ LockImportRun<Block, B>
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
IQS: ImportQueueService<Block> + ?Sized,
NR: NetworkRequest,
PG: DsnSyncPieceGetter,
{
debug!("Starting snap sync...");

sync_segment_headers(segment_headers_store, node)
.await
.map_err(|error| format!("Failed to sync segment headers: {}", error))?;

let last_segment_index = segment_headers_store
.max_segment_index()
.expect("Successfully synced above; qed");
let target_segment_index = {
let last_segment_index = segment_headers_store
.max_segment_index()
.expect("Successfully synced above; qed");

if let Some(target_block) = target_block {
let mut segment_header = segment_headers_store
.get_segment_header(last_segment_index)
.ok_or(format!(
"Can't get segment header from the store: {last_segment_index}"
))?;

if segment_header.last_archived_block().number < target_block {
return Err(format!(
"Target block is greater than the last archived block. \
Last segment index = {last_segment_index}, target block = {target_block}, \
last block from the segment = {}
",
segment_header.last_archived_block().number
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
)
.into());
}

let mut current_segment_index = last_segment_index;

loop {
if segment_header.last_archived_block().number == target_block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is a correct check. The fact that you have a specific last archived block number doesn't mean it is contained fully in this segment. I think this segment_header.last_archived_block().number == target_block part should simply be removed, the actual correct check is done below with < target_block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check would be incorrect if we have blocks spanning more than two segments. Otherwise, we will download and import an extra segment. However, your requested change is consistent with the previous discussion about the theoretical big blocks. Please, let me know if you want to change it despite the extra segments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to change it. It will make no difference now and will allow us to potentially avoid very tricky bugs in the future. We do not have any inherent guarantees about block size here. Also it is not like we need to add code, we need to remove a condition that makes things easier to understand, not harder (in this particular case).

|| current_segment_index <= SegmentIndex::ONE
{
break;
}

if segment_header.last_archived_block().number < target_block {
current_segment_index += SegmentIndex::ONE;
break;
}

current_segment_index -= SegmentIndex::ONE;
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved

segment_header = segment_headers_store
.get_segment_header(current_segment_index)
.ok_or(format!(
"Can't get segment header from the store: {last_segment_index}"
))?;
}

current_segment_index
} else {
last_segment_index
}
};

// Skip the snap sync if there is just one segment header built on top of genesis, it is
// more efficient to sync it regularly
if last_segment_index <= SegmentIndex::ONE {
if target_segment_index <= SegmentIndex::ONE {
debug!("Snap sync was skipped due to too early chain history");

return Ok(());
return Ok(None);
}

// Identify all segment headers that would need to be reconstructed in order to get first
// block of last segment header
let mut segments_to_reconstruct = VecDeque::from([last_segment_index]);
let mut segments_to_reconstruct = VecDeque::from([target_segment_index]);
{
let mut last_segment_first_block_number = None;

Expand Down Expand Up @@ -204,6 +236,54 @@ where
blocks = VecDeque::from(blocks_fut.await?);
}
}

Ok(Some((target_segment_index, blocks)))
}

#[allow(clippy::too_many_arguments)]
async fn sync<PG, AS, Block, Client, IQS, B, NR>(
segment_headers_store: &SegmentHeadersStore<AS>,
node: &Node,
piece_getter: &PG,
fork_id: Option<&str>,
client: &Arc<Client>,
import_queue_service: &mut IQS,
network_request: &NR,
sync_service: &SyncingService<Block>,
target_block: Option<BlockNumber>,
import_blocks_from_downloaded_segment: bool,
) -> Result<(), Error>
where
B: sc_client_api::Backend<Block>,
PG: DsnSyncPieceGetter,
AS: AuxStore,
Block: BlockT,
Client: HeaderBackend<Block>
+ ClientExt<Block, B>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ LockImportRun<Block, B>
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
IQS: ImportQueueService<Block> + ?Sized,
NR: NetworkRequest,
{
debug!("Starting snap sync...");

let Some((target_segment_index, mut blocks)) =
get_blocks_from_target_segment(segment_headers_store, node, piece_getter, target_block)
.await?
else {
return Ok(()); // Snap-sync skipped
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
};

debug!(
"Segments data received. Target segment index: {:?}",
target_segment_index
);

let mut blocks_to_import = Vec::with_capacity(blocks.len());
let last_block_number;

Expand All @@ -221,10 +301,10 @@ where
});

debug!(
%last_segment_index,
%target_segment_index,
%first_block_number,
%last_block_number,
"Blocks from last segment downloaded"
"Blocks from target segment downloaded"
);

let signed_block = decode_block::<Block>(&first_block_bytes)
Expand All @@ -236,10 +316,10 @@ where
let state = download_state(&header, client, fork_id, network_request, sync_service)
.await
.map_err(|error| {
format!("Failed to download state for the first block of last segment: {error}")
format!("Failed to download state for the first block of target segment: {error}")
})?;

debug!("Downloaded state of the first block of the last segment");
debug!("Downloaded state of the first block of the target segment");

blocks_to_import.push(IncomingBlock {
hash: header.hash(),
Expand All @@ -255,28 +335,30 @@ where
});
}

debug!(
blocks_count = %blocks.len(),
"Queuing importing remaining blocks from last segment"
);

for (_block_number, block_bytes) in blocks {
let signed_block = decode_block::<Block>(&block_bytes)
.map_err(|error| format!("Failed to decode archived block: {error}"))?;
let (header, extrinsics) = signed_block.block.deconstruct();
if import_blocks_from_downloaded_segment {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case of not doing this though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name it something like import_a_single_block or similar? Right now ti indicates we do not import blocks, but we do, we just import one instead of all.

debug!(
blocks_count = %blocks.len(),
"Queuing importing remaining blocks from target segment"
);

blocks_to_import.push(IncomingBlock {
hash: header.hash(),
header: Some(header),
body: Some(extrinsics),
indexed_body: None,
justifications: signed_block.justifications,
origin: None,
allow_missing_state: false,
import_existing: false,
skip_execution: false,
state: None,
});
for (_block_number, block_bytes) in blocks {
let signed_block = decode_block::<Block>(&block_bytes)
.map_err(|error| format!("Failed to decode archived block: {error}"))?;
let (header, extrinsics) = signed_block.block.deconstruct();

blocks_to_import.push(IncomingBlock {
hash: header.hash(),
header: Some(header),
body: Some(extrinsics),
indexed_body: None,
justifications: signed_block.justifications,
origin: None,
allow_missing_state: false,
import_existing: false,
skip_execution: false,
state: None,
});
}
}

let maybe_last_block_to_import = blocks_to_import.pop();
Expand All @@ -289,8 +371,8 @@ where
if let Some(last_block_to_import) = maybe_last_block_to_import {
debug!(
%last_block_number,
%last_segment_index,
"Importing the last block from the last segment"
%target_segment_index,
"Importing the last block from the target segment"
);

import_queue_service
Expand Down
Loading