diff --git a/core/src/block.rs b/core/src/block.rs index 57cb6721f49..3bbf9098e7d 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -158,6 +158,7 @@ mod pending { view_change_index: usize, transactions: &[CommittedTransaction], consensus_estimation: Duration, + now_time_ms: u64, ) -> BlockHeader { let prev_block_time = prev_block.map_or(Duration::ZERO, |block| block.header().creation_time()); @@ -168,9 +169,7 @@ mod pending { .max() .expect("INTERNAL BUG: Block empty"); - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); + let now = Duration::from_millis(now_time_ms); // NOTE: Lower time bound must always be upheld for a valid block // If the clock has drifted too far this block will be rejected @@ -246,6 +245,24 @@ mod pending { self, view_change_index: usize, state: &mut StateBlock<'_>, + ) -> BlockBuilder { + self.chain_with_creation_time( + view_change_index, + state, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("INTERNAL BUG: Failed to get the current system time") + .as_millis() + .try_into() + .expect("Time should fit into u64"), + ) + } + /// Chain the block with existing blockchain. + pub fn chain_with_creation_time( + self, + view_change_index: usize, + state: &mut StateBlock<'_>, + creation_time_ms: u64, ) -> BlockBuilder { let transactions = Self::categorize_transactions(self.0.transactions, state); @@ -255,6 +272,7 @@ mod pending { view_change_index, &transactions, state.world.parameters().sumeragi.consensus_estimation(), + creation_time_ms, ), transactions, })) @@ -270,9 +288,21 @@ mod chained { pub struct Chained(pub(super) BlockPayload); impl BlockBuilder { - /// Sign this block and get [`SignedBlock`]. + /// Sign this block as Leader and get [`SignedBlock`]. pub fn sign(self, private_key: &PrivateKey) -> WithEvents { - WithEvents::new(ValidBlock(self.0 .0.sign(private_key))) + self.sign_with_peer_topology_index(private_key, 0) + } + /// Sign this block and get [`SignedBlock`]. + pub fn sign_with_peer_topology_index( + self, + private_key: &PrivateKey, + peer_topology_index: u64, + ) -> WithEvents { + WithEvents::new(ValidBlock( + self.0 + .0 + .sign_with_peer_topology_index(private_key, peer_topology_index), + )) } } } @@ -299,18 +329,16 @@ mod valid { topology: &Topology, ) -> Result<(), SignatureVerificationError> { let leader_index = topology.leader_index(); - let mut block_signatures = block.signatures(); + let mut leader_signatures = + topology.filter_signatures_by_roles(&[Role::Leader], block.signatures()); - let leader_signature = match block_signatures.next() { + let leader_signature = match leader_signatures.next() { Some(BlockSignature(signatory, signature)) if usize::try_from(*signatory) .map_err(|_err| SignatureVerificationError::LeaderMissing)? == leader_index => { - let mut additional_leader_signatures = - topology.filter_signatures_by_roles(&[Role::Leader], block_signatures); - - if additional_leader_signatures.next().is_some() { + if leader_signatures.next().is_some() { return Err(SignatureVerificationError::DuplicateSignatures { signatory: leader_index, }); @@ -390,9 +418,11 @@ mod valid { topology: &Topology, ) -> Result<(), SignatureVerificationError> { let proxy_tail_index = topology.proxy_tail_index(); - let mut signatures = block.signatures().rev(); + let mut signatures = block.signatures(); - let proxy_tail_signature = match signatures.next() { + let proxy_tail_signature = match signatures + .find(|BlockSignature(signatory, _signature)| *signatory == proxy_tail_index as u64) + { Some(BlockSignature(signatory, signature)) if usize::try_from(*signatory) .map_err(|_err| SignatureVerificationError::ProxyTailMissing)? @@ -645,7 +675,6 @@ mod valid { .expect("INTERNAL BUG: Number of peers exceeds usize::MAX"); let signatory = &topology.as_ref()[signatory_idx]; - assert_ne!(Role::Leader, topology.role(signatory)); if topology.view_change_index() == 0 { assert_ne!(Role::ObservingPeer, topology.role(signatory),); } diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 30f77b9809b..1835bc59608 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -1,5 +1,5 @@ //! The main event loop that powers sumeragi. -use std::{collections::BTreeSet, ops::Deref, sync::mpsc}; +use std::{collections::BTreeSet, ops::Deref, sync::mpsc, time::SystemTime}; use iroha_crypto::{HashOf, KeyPair}; use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId}; @@ -162,7 +162,7 @@ impl Sumeragi { latest_block: HashOf, view_change_proof_chain: &ProofChain, ) -> Option { - let current_view_change_index = + let _current_view_change_index = view_change_proof_chain.verify_with_state(&self.topology, latest_block); loop { @@ -177,22 +177,7 @@ impl Sumeragi { }) .ok()?; - let block_vc_index = match &block_msg { - BlockMessage::BlockCreated(bc) => { - Some(bc.block.header().view_change_index as usize) - } - // Signed and Committed contain no block. - // Block sync updates are exempt from early pruning. - BlockMessage::BlockSigned(_) - | BlockMessage::BlockCommitted(_) - | BlockMessage::BlockSyncUpdate(_) => None, - }; - if let Some(block_vc_index) = block_vc_index { - if block_vc_index < current_view_change_index { - // ignore block_message - continue; - } - } + // TODO: Add pruning maybe. return Some(block_msg); } } @@ -219,8 +204,7 @@ impl Sumeragi { match self.message_receiver.try_recv() { Ok(message) => { let block = match message { - BlockMessage::BlockCreated(BlockCreated { block }) - | BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block, + BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block, msg => { trace!(?msg, "Not handling the message, waiting for genesis..."); continue; @@ -311,13 +295,11 @@ impl Sumeragi { // NOTE: By this time genesis block is executed and list of trusted peers is updated self.topology = Topology::new(state_block.world.trusted_peers_ids.clone()); - let msg = BlockCreated::from(&genesis); let genesis = genesis .commit(&self.topology) .unpack(|e| self.send_event(e)) .expect("Genesis invalid"); - self.broadcast_packet(msg); self.commit_block(genesis, state_block); } @@ -385,40 +367,34 @@ impl Sumeragi { &self, state: &'state State, topology: &Topology, - genesis_account: &AccountId, - BlockCreated { block }: BlockCreated, - existing_voting_block: &mut Option, - ) -> Option> { - if state.view().height() == 1 && block.header().height.get() == 1 { - // Consider our peer has genesis, - // and some other peer has genesis and broadcast it to our peer, - // then we can ignore such genesis block because we already has genesis. - // Note: `ValidBlock::validate` also checks it, - // but we don't want warning to be printed since this is correct behaviour. - return None; - } + BlockCreated { + transactions, + creation_time_ms, + }: BlockCreated, + ) -> VotingBlock<'state> { + let transactions = transactions + .into_iter() + .map(|stx| AcceptedTransaction(stx)) + .collect(); + let mut state_block = state.block(); + let vb = VotingBlock::new( + BlockBuilder::new(transactions) + .chain_with_creation_time( + topology.view_change_index(), + &mut state_block, + creation_time_ms, + ) + .sign_with_peer_topology_index( + self.key_pair.private_key(), + topology.position(self.key_pair.public_key()).unwrap_or(0) as u64, + ) + .unpack(|e| self.send_event(e)), + state_block, + ); - ValidBlock::validate_keep_voting_block( - block, - topology, - &self.chain_id, - genesis_account, - state, - existing_voting_block, - false, - ) - .unpack(|e| self.send_event(e)) - .map(|(block, state_block)| VotingBlock::new(block, state_block)) - .map_err(|(block, error)| { - warn!( - peer_id=%self.peer_id, - role=%self.role(), - block=%block.hash(), - ?error, - "Block validation failed" - ); - }) - .ok() + debug!(role = %self.role(), block_hash = %vb.block.as_ref().hash(), "Validation of block"); + + vb } fn prune_view_change_proofs_and_calculate_current_index( @@ -542,20 +518,12 @@ impl Sumeragi { .is_consensus_required() .expect("INTERNAL BUG: Consensus required for validating peer"); - if let Some(mut v_block) = self.validate_block( - state, - topology, - genesis_account, - block_created, - voting_block, - ) { - v_block.block.sign(&self.key_pair, topology); + let v_block = self.validate_block(state, topology, block_created); - let msg = BlockSigned::from(&v_block.block); - self.broadcast_packet_to(msg, [topology.proxy_tail()]); + let msg = BlockSigned::from(&v_block.block); + self.broadcast_packet_to(msg, [topology.proxy_tail()]); - *voting_block = Some(v_block); - } + *voting_block = Some(v_block); } (BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => { let topology = &self @@ -563,58 +531,33 @@ impl Sumeragi { .is_consensus_required() .expect("INTERNAL BUG: Consensus required for observing peer"); - if let Some(mut v_block) = self.validate_block( - state, - topology, - genesis_account, - block_created, - voting_block, - ) { - if view_change_index >= 1 { - v_block.block.sign(&self.key_pair, topology); - - let msg = BlockSigned::from(&v_block.block); - self.broadcast_packet_to(msg, [topology.proxy_tail()]); + if view_change_index >= 1 { + let v_block = self.validate_block(state, topology, block_created); - info!( - peer_id=%self.peer_id, - role=%self.role(), - block=%v_block.block.as_ref().hash(), - "Block signed and forwarded" - ); - } + let msg = BlockSigned::from(&v_block.block); + self.broadcast_packet_to(msg, [topology.proxy_tail()]); *voting_block = Some(v_block); } } (BlockMessage::BlockCreated(block_created), Role::ProxyTail) => { + let mut valid_block = self.validate_block(state, &self.topology, block_created); info!( peer_id=%self.peer_id, role=%self.role(), - block=%block_created.block.hash(), + block=%valid_block.block.as_ref().hash(), "Block received" ); + // NOTE: Up until this point it was unknown which block is expected to be received, + // therefore all the signatures (of any hash) were collected and will now be pruned - if let Some(mut valid_block) = self.validate_block( - state, - &self.topology, - genesis_account, - block_created, - voting_block, - ) { - // NOTE: Up until this point it was unknown which block is expected to be received, - // therefore all the signatures (of any hash) were collected and will now be pruned - - for signature in core::mem::take(voting_signatures) { - if let Err(error) = - valid_block.block.add_signature(signature, &self.topology) - { - debug!(?error, "Signature not valid"); - } + for signature in core::mem::take(voting_signatures) { + if let Err(error) = valid_block.block.add_signature(signature, &self.topology) { + debug!(?error, "Signature not valid"); } - - *voting_block = self.try_commit_block(valid_block, is_genesis_peer); } + + *voting_block = self.try_commit_block(valid_block, is_genesis_peer); } (BlockMessage::BlockSigned(BlockSigned { hash, signature }), Role::ProxyTail) => { info!( @@ -627,11 +570,6 @@ impl Sumeragi { let signatory = &self.topology.as_ref()[signatory_idx]; match self.topology.role(signatory) { - Role::Leader => error!( - peer_id=%self.peer_id, - role=%self.role(), - "Signatory is leader" - ), Role::Undefined => error!( peer_id=%self.peer_id, role=%self.role(), @@ -778,15 +716,13 @@ impl Sumeragi { /// Commits block if there are enough votes fn try_commit_block<'state>( &mut self, - mut voting_block: VotingBlock<'state>, + voting_block: VotingBlock<'state>, #[cfg_attr(not(debug_assertions), allow(unused_variables))] is_genesis_peer: bool, ) -> Option> { assert_eq!(self.role(), Role::ProxyTail); let votes_count = voting_block.block.as_ref().signatures().len(); - if votes_count + 1 >= self.topology.min_votes_for_commit() { - voting_block.block.sign(&self.key_pair, &self.topology); - + if votes_count >= self.topology.min_votes_for_commit() { let committed_block = voting_block .block .commit(&self.topology) @@ -847,50 +783,49 @@ impl Sumeragi { let transactions = self .transaction_cache .iter() - .map(|tx| tx.deref().clone()) + .map(|tx| tx.deref().clone().into()) .collect::>(); - let mut state_block = state.block(); - let create_block_start_time = Instant::now(); - let new_block = BlockBuilder::new(transactions) - .chain(self.topology.view_change_index(), &mut state_block) - .sign(self.key_pair.private_key()) - .unpack(|e| self.send_event(e)); - - let created_in = create_block_start_time.elapsed(); - let pipeline_time = state.world.view().parameters().sumeragi.pipeline_time(); - if created_in > pipeline_time / 2 { - warn!( - role=%self.role(), - peer_id=%self.peer_id, - "Creating block takes too much time. \ - This might prevent consensus from operating. \ - Consider increasing `commit_time` or decreasing `max_transactions_in_block`" - ); + let block_created_msg = BlockCreated { + transactions, + creation_time_ms: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("INTERNAL BUG: Failed to get the current system time") + .as_millis() + .try_into() + .expect("Time should fit into u64"), + }; + if self.topology.is_consensus_required().is_some() { + self.broadcast_packet(block_created_msg.clone()); } + let begin_validate = Instant::now(); + let new_voting_block = self.validate_block(state, &self.topology, block_created_msg); + + info!( + peer_id=%self.peer_id, + block=%new_voting_block.block.as_ref().hash(), + view_change_index=%self.topology.view_change_index(), + txns=%new_voting_block.block.as_ref().transactions().len(), + validated_in_ms=%begin_validate.elapsed().as_millis(), + "Block created" + ); + if self.topology.is_consensus_required().is_some() { - info!( - peer_id=%self.peer_id, - block=%new_block.as_ref().hash(), - view_change_index=%self.topology.view_change_index(), - txns=%new_block.as_ref().transactions().len(), - created_in_ms=%created_in.as_millis(), - "Block created" - ); + let msg = BlockSigned::from(&new_voting_block.block); + self.broadcast_packet_to(msg, [self.topology.proxy_tail()]); - let msg = BlockCreated::from(&new_block); - *voting_block = Some(VotingBlock::new(new_block, state_block)); - self.broadcast_packet(msg); + *voting_block = Some(new_voting_block); } else { - let committed_block = new_block + let committed_block = new_voting_block + .block .commit(&self.topology) .unpack(|e| self.send_event(e)) .expect("INTERNAL BUG: Leader failed to commit created block"); let msg = BlockCommitted::from(&committed_block); self.broadcast_packet(msg); - self.commit_block(committed_block, state_block); + self.commit_block(committed_block, new_voting_block.state_block); } } } @@ -1021,7 +956,6 @@ pub(crate) fn run( let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter(); let state_view = state.view(); - sumeragi .transaction_cache // Checking if transactions are in the blockchain is costly diff --git a/core/src/sumeragi/message.rs b/core/src/sumeragi/message.rs index c3089a2d235..cc9e6be3cb4 100644 --- a/core/src/sumeragi/message.rs +++ b/core/src/sumeragi/message.rs @@ -1,6 +1,9 @@ //! Contains message structures for p2p communication during consensus. use iroha_crypto::HashOf; -use iroha_data_model::block::{BlockHeader, BlockSignature, SignedBlock}; +use iroha_data_model::{ + block::{BlockHeader, BlockSignature, SignedBlock}, + transaction::SignedTransaction, +}; use iroha_macro::*; use parity_scale_codec::{Decode, Encode}; @@ -40,16 +43,9 @@ impl ControlFlowMessage { #[derive(Debug, Clone, Decode, Encode)] pub struct BlockCreated { /// The corresponding block. - pub block: SignedBlock, -} - -impl From<&ValidBlock> for BlockCreated { - fn from(block: &ValidBlock) -> Self { - Self { - // TODO: Redundant clone - block: block.clone().into(), - } - } + pub transactions: Vec, + /// The time that the block was created. + pub creation_time_ms: u64, } /// `BlockSigned` message structure. diff --git a/data_model/src/block.rs b/data_model/src/block.rs index e24faa5c7d0..809975db48a 100644 --- a/data_model/src/block.rs +++ b/data_model/src/block.rs @@ -153,8 +153,19 @@ impl BlockPayload { /// Create new signed block, using `key_pair` to sign `payload` #[cfg(feature = "transparent_api")] pub fn sign(self, private_key: &iroha_crypto::PrivateKey) -> SignedBlock { + self.sign_with_peer_topology_index(private_key, 0) + } + /// Create new signed block, using `key_pair` to sign `payload` + /// + /// Explicitly specify the peer topology index. + #[cfg(feature = "transparent_api")] + pub fn sign_with_peer_topology_index( + self, + private_key: &iroha_crypto::PrivateKey, + peer_topology_index: u64, + ) -> SignedBlock { let signatures = vec![BlockSignature( - 0, + peer_topology_index, SignatureOf::new(private_key, &self.header), )];