diff --git a/client/benches/tps/utils.rs b/client/benches/tps/utils.rs index cca409724ae..2fc498c0067 100644 --- a/client/benches/tps/utils.rs +++ b/client/benches/tps/utils.rs @@ -1,4 +1,4 @@ -use std::{fmt, fs::File, io::BufReader, path::Path, sync::mpsc, thread, time}; +use std::{fmt, fs::File, io::BufReader, num::NonZeroUsize, path::Path, sync::mpsc, thread, time}; use eyre::{Result, WrapErr}; use iroha::{ @@ -108,7 +108,8 @@ impl Config { .expect("Must be some") .state() .view(); - let mut blocks = state_view.all_blocks().skip(blocks_out_of_measure as usize); + let mut blocks = + state_view.all_blocks(NonZeroUsize::new(blocks_out_of_measure as usize + 1).unwrap()); let (txs_accepted, txs_rejected) = (0..self.blocks) .map(|_| { let block = blocks diff --git a/client/tests/integration/roles.rs b/client/tests/integration/roles.rs index 6b16ce7aa64..dc43608fcc6 100644 --- a/client/tests/integration/roles.rs +++ b/client/tests/integration/roles.rs @@ -174,7 +174,6 @@ fn role_permissions_are_deduplicated() { .add_permission(allow_alice_to_transfer_rose_1) .add_permission(allow_alice_to_transfer_rose_2); - println!("KITA: {role:?}"); test_client .submit_blocking(Register::role(role)) .expect("failed to register role"); diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs index 860c8c97e10..a0ae5c35a32 100644 --- a/core/src/block_sync.rs +++ b/core/src/block_sync.rs @@ -2,7 +2,7 @@ use std::{ collections::BTreeSet, fmt::Debug, - num::{NonZeroU32, NonZeroU64}, + num::{NonZeroU32, NonZeroUsize}, sync::Arc, time::Duration, }; @@ -50,7 +50,7 @@ pub struct BlockSynchronizer { gossip_size: NonZeroU32, network: IrohaNetwork, state: Arc, - seen_blocks: BTreeSet<(NonZeroU64, HashOf)>, + seen_blocks: BTreeSet<(NonZeroUsize, HashOf)>, latest_height: usize, } @@ -91,7 +91,7 @@ impl BlockSynchronizer { self.latest_height = now_height; self.seen_blocks - .retain(|(height, _hash)| height.get() >= now_height as u64); + .retain(|(height, _hash)| height.get() >= now_height); if let Some(random_peer) = self.network.online_peers(Self::random_peer) { self.request_latest_blocks_from_peer(random_peer.id().clone()) @@ -101,7 +101,7 @@ impl BlockSynchronizer { /// Get a random online peer. #[allow(clippy::disallowed_types)] - pub fn random_peer(peers: &std::collections::HashSet) -> Option { + fn random_peer(peers: &std::collections::HashSet) -> Option { use rand::{seq::IteratorRandom, SeedableRng}; let rng = &mut rand::rngs::StdRng::from_entropy(); @@ -115,13 +115,13 @@ impl BlockSynchronizer { (state_view.prev_block_hash(), state_view.latest_block_hash()) }; message::Message::GetBlocksAfter(message::GetBlocksAfter::new( - latest_hash, + self.peer_id.clone(), prev_hash, + latest_hash, self.seen_blocks .iter() - .map(|(_height, hash)| hash.clone()) + .map(|(_height, hash)| *hash) .collect(), - self.peer_id.clone(), )) .send_to(&self.network, peer_id) .await; @@ -152,53 +152,52 @@ impl BlockSynchronizer { pub mod message { //! Module containing messages for [`BlockSynchronizer`](super::BlockSynchronizer). - use std::num::NonZeroUsize; use super::*; /// Get blocks after some block - #[derive(Debug, Clone, Decode, Encode)] + #[derive(Debug, Clone, Encode)] pub struct GetBlocksAfter { - /// Hash of latest available block - pub latest_hash: Option>, + /// Peer id + pub peer_id: PeerId, /// Hash of second to latest block pub prev_hash: Option>, + /// Hash of latest available block + pub latest_hash: Option>, /// The block hashes already seen pub seen_blocks: BTreeSet>, - /// Peer id - pub peer_id: PeerId, } impl GetBlocksAfter { /// Construct [`GetBlocksAfter`]. pub const fn new( - latest_hash: Option>, + peer_id: PeerId, prev_hash: Option>, + latest_hash: Option>, seen_blocks: BTreeSet>, - peer_id: PeerId, ) -> Self { Self { - latest_hash, + peer_id, prev_hash, + latest_hash, seen_blocks, - peer_id, } } } /// Message variant to share blocks to peer - #[derive(Debug, Clone, Decode, Encode)] + #[derive(Debug, Clone, Encode)] pub struct ShareBlocks { - /// Blocks - pub blocks: Vec, /// Peer id pub peer_id: PeerId, + /// Blocks + pub blocks: Vec, } impl ShareBlocks { /// Construct [`ShareBlocks`]. pub const fn new(blocks: Vec, peer_id: PeerId) -> Self { - Self { blocks, peer_id } + Self { peer_id, blocks } } } @@ -214,13 +213,13 @@ pub mod message { impl Message { /// Handles the incoming message. #[iroha_futures::telemetry_future] - pub async fn handle_message(&self, block_sync: &mut BlockSynchronizer) { + pub(super) async fn handle_message(&self, block_sync: &mut BlockSynchronizer) { match self { Message::GetBlocksAfter(GetBlocksAfter { - latest_hash, + peer_id, prev_hash, + latest_hash, seen_blocks, - peer_id, }) => { let local_latest_block_hash = block_sync.state.view().latest_block_hash(); @@ -230,42 +229,37 @@ pub mod message { return; } - let start_height = match prev_hash { - Some(hash) => match block_sync.kura.get_block_height_by_hash(hash) { - None => { - error!( - peer_id=%block_sync.peer_id, - block=%hash, - "Block hash not found" - ); - return; - } - // It's get blocks *after*, so we add 1. - Some(height) => height - .checked_add(1) - .expect("INTERNAL BUG: Block height exceeds usize::MAX"), - }, - None => nonzero_ext::nonzero!(1_usize), + let start_height = if let Some(hash) = *prev_hash { + let Some(height) = block_sync.kura.get_block_height_by_hash(hash) else { + error!( + peer=%block_sync.peer_id, + block=%hash, + "Block hash not found" + ); + + return; + }; + + height + .checked_add(1) + .expect("INTERNAL BUG: Blockchain height overflow") + } else { + nonzero_ext::nonzero!(1_usize) }; - let blocks = (start_height.get()..) - .take(block_sync.gossip_size.get() as usize + 1) - .map_while(|height| { - NonZeroUsize::new(height) - .and_then(|height| block_sync.kura.get_block_by_height(height)) - }) + let blocks = block_sync + .state + .view() + .all_blocks(start_height) .skip_while(|block| Some(block.hash()) == *latest_hash) - .filter(|block| !seen_blocks.contains(&block.hash())) + .skip_while(|block| seen_blocks.contains(&block.hash())) + .take(block_sync.gossip_size.get() as usize) .map(|block| (*block).clone()) .collect::>(); - if blocks.is_empty() { - // The only case where the blocks array could be empty is if we got queried for blocks - // after the latest hash. There is a check earlier in the function that returns early - // so it should not be possible for us to get here. - error!(hash=?prev_hash, "Blocks array is empty but shouldn't be."); - } else { + if !blocks.is_empty() { trace!(hash=?prev_hash, "Sharing blocks after hash"); + Message::ShareBlocks(ShareBlocks::new(blocks, block_sync.peer_id.clone())) .send_to(&block_sync.network, peer_id.clone()) .await; @@ -275,9 +269,13 @@ pub mod message { use crate::sumeragi::message::BlockSyncUpdate; for block in blocks.clone() { - block_sync - .seen_blocks - .insert((block.header().height(), block.hash())); + let height = block + .header() + .height() + .try_into() + .expect("INTERNAL BUG: block height exceeds usize::MAX"); + + block_sync.seen_blocks.insert((height, block.hash())); let msg = BlockSyncUpdate::from(&block); block_sync.sumeragi.incoming_block_message(msg); } @@ -288,7 +286,7 @@ pub mod message { /// Send this message over the network to the specified `peer`. #[iroha_futures::telemetry_future] #[log("TRACE")] - pub async fn send_to(self, network: &IrohaNetwork, peer: PeerId) { + pub(super) async fn send_to(self, network: &IrohaNetwork, peer: PeerId) { let data = NetworkMessage::BlockSync(Box::new(self)); let message = Post { data, @@ -297,4 +295,79 @@ pub mod message { network.post(message); } } + + mod candidate { + use parity_scale_codec::Input; + + use super::*; + + #[derive(Decode)] + struct GetBlocksAfterCandidate { + peer: PeerId, + prev_hash: Option>, + latest_hash: Option>, + seen_blocks: BTreeSet>, + } + + #[derive(Decode)] + struct ShareBlocksCandidate { + peer: PeerId, + blocks: Vec, + } + + impl GetBlocksAfterCandidate { + fn validate(self) -> Result { + if self.prev_hash.is_some() && self.latest_hash.is_none() { + return Err(parity_scale_codec::Error::from( + "Latest hash must be defined if previous hash is", + )); + } + + Ok(GetBlocksAfter { + peer_id: self.peer, + prev_hash: self.prev_hash, + latest_hash: self.latest_hash, + seen_blocks: self.seen_blocks, + }) + } + } + + impl ShareBlocksCandidate { + fn validate(self) -> Result { + if self.blocks.is_empty() { + return Err(parity_scale_codec::Error::from("Blocks are empty")); + } + + if !self.blocks.windows(2).all(|wnd| { + wnd[1].header().height.get() == wnd[0].header().height.get() - 1 + && wnd[1].header().prev_block_hash == Some(wnd[0].hash()) + }) { + return Err(parity_scale_codec::Error::from( + "Blocks are not ordered correctly", + )); + } + + Ok(ShareBlocks { + peer_id: self.peer, + blocks: self.blocks, + }) + } + } + + impl Decode for ShareBlocks { + fn decode(input: &mut I) -> Result { + ShareBlocksCandidate::decode(input)? + .validate() + .map_err(Into::into) + } + } + + impl Decode for GetBlocksAfter { + fn decode(input: &mut I) -> Result { + GetBlocksAfterCandidate::decode(input)? + .validate() + .map_err(Into::into) + } + } + } } diff --git a/core/src/kura.rs b/core/src/kura.rs index bc094a4c5ab..d89ab30c4b4 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -284,11 +284,11 @@ impl Kura { } /// Search through blocks for the height of the block with the given hash. - pub fn get_block_height_by_hash(&self, hash: &HashOf) -> Option { + pub fn get_block_height_by_hash(&self, hash: HashOf) -> Option { self.block_data .lock() .iter() - .position(|(block_hash, _block_arc)| block_hash == hash) + .position(|(block_hash, _block_arc)| *block_hash == hash) .and_then(|idx| idx.checked_add(1)) .and_then(NonZeroUsize::new) } diff --git a/core/src/smartcontracts/isi/block.rs b/core/src/smartcontracts/isi/block.rs index 5b255c28545..a8b67df2e7d 100644 --- a/core/src/smartcontracts/isi/block.rs +++ b/core/src/smartcontracts/isi/block.rs @@ -12,6 +12,7 @@ use iroha_data_model::{ }, }; use iroha_telemetry::metrics; +use nonzero_ext::nonzero; use super::*; use crate::{smartcontracts::ValidQuery, state::StateReadOnly}; @@ -24,7 +25,7 @@ impl ValidQuery for FindBlocks { state_ro: &'state impl StateReadOnly, ) -> Result + 'state, QueryExecutionFail> { Ok(state_ro - .all_blocks() + .all_blocks(nonzero!(1_usize)) .rev() .filter(move |block| filter.applies(block)) .map(|block| (*block).clone())) @@ -39,7 +40,7 @@ impl ValidQuery for FindBlockHeaders { state_ro: &'state impl StateReadOnly, ) -> Result + 'state, QueryExecutionFail> { Ok(state_ro - .all_blocks() + .all_blocks(nonzero!(1_usize)) .rev() .filter(move |block| filter.applies(block.header())) .map(|block| block.header().clone())) @@ -52,8 +53,9 @@ impl ValidSingularQuery for FindBlockHeaderByHash { let hash = self.hash; let block = state_ro - .all_blocks() - .find(|block| block.hash() == hash) + .kura() + .get_block_height_by_hash(hash) + .and_then(|height| state_ro.kura().get_block_by_height(height)) .ok_or_else(|| QueryExecutionFail::Find(FindError::Block(hash)))?; Ok(block.header().clone()) diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs index 683ae519fff..172e831206a 100644 --- a/core/src/smartcontracts/isi/query.rs +++ b/core/src/smartcontracts/isi/query.rs @@ -551,7 +551,10 @@ mod tests { async fn find_block_header_by_hash() -> Result<()> { let state = state_with_test_blocks_and_transactions(1, 1, 1)?; let state_view = state.view(); - let block = state_view.all_blocks().last().expect("state is empty"); + let block = state_view + .all_blocks(nonzero!(1_usize)) + .last() + .expect("state is empty"); assert_eq!( FindBlockHeaderByHash::new(block.hash()).execute(&state_view)?, diff --git a/core/src/smartcontracts/isi/tx.rs b/core/src/smartcontracts/isi/tx.rs index f2e2154101e..4872c61ba00 100644 --- a/core/src/smartcontracts/isi/tx.rs +++ b/core/src/smartcontracts/isi/tx.rs @@ -17,6 +17,7 @@ use iroha_data_model::{ transaction::CommittedTransaction, }; use iroha_telemetry::metrics; +use nonzero_ext::nonzero; use super::*; use crate::smartcontracts::ValidQuery; @@ -75,7 +76,7 @@ impl ValidQuery for FindTransactions { state_ro: &'state impl StateReadOnly, ) -> Result + 'state, QueryExecutionFail> { Ok(state_ro - .all_blocks() + .all_blocks(nonzero!(1_usize)) .flat_map(BlockTransactionIter::new) .map(|tx| TransactionQueryOutput { block_hash: tx.block_hash(), @@ -95,7 +96,7 @@ impl ValidQuery for FindTransactionsByAccountId { let account_id = self.account.clone(); Ok(state_ro - .all_blocks() + .all_blocks(nonzero!(1_usize)) .flat_map(BlockTransactionIter::new) .filter(move |tx| *tx.authority() == account_id) .map(|tx| TransactionQueryOutput { diff --git a/core/src/state.rs b/core/src/state.rs index 01f24376355..b2c7008de9c 100644 --- a/core/src/state.rs +++ b/core/src/state.rs @@ -24,6 +24,7 @@ use iroha_data_model::{ }; use iroha_logger::prelude::*; use iroha_primitives::{must_use::MustUse, numeric::Numeric, small::SmallVec}; +use nonzero_ext::nonzero; use parking_lot::Mutex; use range_bounds::*; use serde::{ @@ -1218,8 +1219,11 @@ pub trait StateReadOnly { } /// Load all blocks in the block chain from disc - fn all_blocks(&self) -> impl DoubleEndedIterator> + '_ { - (1..=self.height()).map(|height| { + fn all_blocks( + &self, + start: NonZeroUsize, + ) -> impl DoubleEndedIterator> + '_ { + (start.get()..=self.height()).map(|height| { NonZeroUsize::new(height) .and_then(|height| self.kura().get_block_by_height(height)) .expect("INTERNAL BUG: Failed to load block") @@ -1275,7 +1279,7 @@ pub trait StateReadOnly { } else { let opt = self .kura() - .get_block_by_height(nonzero_ext::nonzero!(1_usize)) + .get_block_by_height(nonzero!(1_usize)) .map(|genesis_block| genesis_block.header().creation_time()); if opt.is_none() { @@ -2235,8 +2239,7 @@ mod tests { assert_eq!( &state_block - .all_blocks() - .skip(7) + .all_blocks(nonzero!(8_usize)) .map(|block| block.header().height().get()) .collect::>(), &[8, 9, 10]