Skip to content

Commit

Permalink
Fix MMR position naming.
Browse files Browse the repository at this point in the history
  • Loading branch information
shamil-gadelshin committed Jul 17, 2024
1 parent b6fe2b6 commit f73e546
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
25 changes: 13 additions & 12 deletions crates/subspace-service/src/mmr/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::BlockNumber;
use tracing::{debug, error, trace};

const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
Expand Down Expand Up @@ -90,32 +89,32 @@ pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
#[derive(Eq, PartialEq, Clone)]
struct SeenRequestsKey {
peer: PeerId,
block_number: BlockNumber,
starting_position: u32,
}

#[allow(clippy::derived_hash_with_manual_eq)]
impl Hash for SeenRequestsKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.peer.hash(state);
self.block_number.hash(state);
self.starting_position.hash(state);
}
}

/// Request MMR data from a peer.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, Encode, Decode, Debug)]
pub struct MmrRequest {
/// Starting block for MMR
pub starting_block: BlockNumber,
/// Max returned data items
pub limit: BlockNumber,
/// Starting position for MMR node.
pub starting_position: u32,
/// Max returned nodes.
pub limit: u32,
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, Encode, Decode, Debug)]
pub struct MmrResponse {
/// MMR-leafs related to block number
pub mmr_data: BTreeMap<BlockNumber, Vec<u8>>,
/// MMR-nodes related to node position
pub mmr_data: BTreeMap<u32, Vec<u8>>,
}

/// The value of [`StateRequestHandler::seen_requests`].
Expand Down Expand Up @@ -213,7 +212,7 @@ where

let key = SeenRequestsKey {
peer: *peer,
block_number: request.starting_block,
starting_position: request.starting_position,
};

let mut reputation_changes = Vec::new();
Expand Down Expand Up @@ -244,7 +243,9 @@ where
Err(())
} else {
let mut mmr_data = BTreeMap::new();
for block_number in request.starting_block..(request.starting_block + request.limit) {
for block_number in
request.starting_position..(request.starting_position + request.limit)
{
let canon_key = get_offchain_key(block_number.into());
let storage_value = self
.offchain_db
Expand Down Expand Up @@ -282,7 +283,7 @@ where

#[derive(Debug, thiserror::Error)]
enum HandleRequestError {
#[error("Invalid request: max MMR items limit exceeded.")]
#[error("Invalid request: max MMR nodes limit exceeded.")]
MaxItemsLimitExceeded,

#[error(transparent)]
Expand Down
65 changes: 37 additions & 28 deletions crates/subspace-service/src/mmr/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use sc_network_sync::SyncingService;
use sp_blockchain::HeaderBackend;
use sp_core::offchain::storage::OffchainDb;
use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
use sp_mmr_primitives::utils::NodesUtils;
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::BlockNumber;
use tokio::time::sleep;
use tracing::{debug, error, trace};

Expand All @@ -37,29 +37,29 @@ pub async fn mmr_sync<Block, Client, NR, OS>(

let mut offchain_db = OffchainDb::new(offchain_storage);

// Look for existing local MMR-entries
let mut starting_block = {
let mut starting_block: Option<BlockNumber> = None;
for block_number in 0..=BlockNumber::MAX {
let canon_key = get_offchain_key(block_number.into());
// Look for existing local MMR-nodes
let mut starting_position = {
let mut starting_position: Option<u32> = None;
for position in 0..=u32::MAX {
let canon_key = get_offchain_key(position.into());
if offchain_db
.local_storage_get(StorageKind::PERSISTENT, &canon_key)
.is_none()
{
starting_block = Some(block_number);
starting_position = Some(position);
break;
}
}

match starting_block {
match starting_position {
None => {
error!("Can't get starting MMR block - MMR storage is corrupted.");
error!("Can't get starting MMR position - MMR storage is corrupted.");
return;
}
Some(last_processed_block) => {
debug!("MMR-sync last processed block: {last_processed_block}");
Some(last_processed_position) => {
debug!("MMR-sync last processed position: {last_processed_position}");

last_processed_block
last_processed_position
}
}
};
Expand Down Expand Up @@ -87,14 +87,25 @@ pub async fn mmr_sync<Block, Client, NR, OS>(

// Request MMR until target block reached.
loop {
let target_block_number = {
let target_position = {
let best_block = sync_service.best_seen_block().await;

match best_block {
Ok(Some(block)) => {
debug!("MMR-sync. Best seen block={block}");
let block_number: u32 = block
.try_into()
.map_err(|_| "Can't convert block number to u32")
.expect("We convert BlockNumber which is defined as u32.");
let nodes = NodesUtils::new(block_number.into());

block
let target_position = nodes.size().saturating_sub(1);

debug!(
"MMR-sync. Best seen block={}, Node target position={}",
block_number, target_position
);

target_position
}
Ok(None) => {
debug!("Can't obtain best sync block for MMR-sync.");
Expand All @@ -108,7 +119,7 @@ pub async fn mmr_sync<Block, Client, NR, OS>(
};

let request = MmrRequest {
starting_block,
starting_position,
limit: MAX_MMR_ITEMS,
};
let response =
Expand All @@ -124,22 +135,20 @@ pub async fn mmr_sync<Block, Client, NR, OS>(
break;
}

// Save the MMR-items from response to the local storage
'data: for (block_number, data) in response.mmr_data.iter() {
// Save the MMR-nodes from response to the local storage
'data: for (position, data) in response.mmr_data.iter() {
// Ensure continuous sync
if *block_number == starting_block {
let canon_key = get_offchain_key((*block_number).into());
if *position == starting_position {
let canon_key = get_offchain_key((*position).into());
offchain_db.local_storage_set(
StorageKind::PERSISTENT,
&canon_key,
data,
);

starting_block += 1;
starting_position += 1;
} else {
debug!(
"MMR-sync gap detected={peer_id}, block_number={block_number}",
);
debug!("MMR-sync gap detected={peer_id}, position={position}",);
break 'data; // We don't support gaps in MMR data
}
}
Expand All @@ -151,15 +160,15 @@ pub async fn mmr_sync<Block, Client, NR, OS>(
}
}

// Actual MMR-items may exceed this number, however, we will catch up with the rest
// Actual MMR-nodes may exceed this number, however, we will catch up with the rest
// when we sync the remaining data (consensus and domain chains).
if target_block_number <= starting_block.into() {
debug!("Target block number reached: {target_block_number}");
if target_position <= starting_position.into() {
debug!("Target position reached: {target_position}");
break 'outer;
}
}
}
debug!("No synced peers to handle the MMR-sync. Pausing...",);
debug!(%starting_position, "No synced peers to handle the MMR-sync. Pausing...",);
sleep(SYNC_PAUSE).await;
}

Expand Down

0 comments on commit f73e546

Please sign in to comment.