diff --git a/crates/iroha_core/src/block.rs b/crates/iroha_core/src/block.rs index 98ff226687b..3c76671ab90 100644 --- a/crates/iroha_core/src/block.rs +++ b/crates/iroha_core/src/block.rs @@ -153,7 +153,8 @@ mod pending { Self(Pending { transactions }) } - fn make_header( + /// Create a block header + pub fn make_header( prev_block: Option<&SignedBlock>, view_change_index: usize, transactions: &[CommittedTransaction], @@ -270,7 +271,7 @@ mod chained { pub struct Chained(pub(super) BlockPayload); impl BlockBuilder { - /// Sign this block and get [`SignedBlock`]. + /// Sign this block as Leader and get [`SignedBlock`]. pub fn sign(self, private_key: &PrivateKey) -> WithEvents { WithEvents::new(ValidBlock(self.0 .0.sign(private_key))) } diff --git a/crates/iroha_core/src/sumeragi/main_loop.rs b/crates/iroha_core/src/sumeragi/main_loop.rs index 5183728ef81..6e685c0bc10 100644 --- a/crates/iroha_core/src/sumeragi/main_loop.rs +++ b/crates/iroha_core/src/sumeragi/main_loop.rs @@ -132,10 +132,9 @@ impl Sumeragi { .control_message_receiver .try_recv() .map_err(|recv_error| { - assert!( - recv_error != mpsc::TryRecvError::Disconnected, - "INTERNAL ERROR: Sumeragi control message pump disconnected" - ) + if recv_error == mpsc::TryRecvError::Disconnected { + error!("INTERNAL ERROR: Sumeragi control message pump disconnected"); + } }) { should_sleep = false; @@ -171,10 +170,9 @@ impl Sumeragi { .message_receiver .try_recv() .map_err(|recv_error| { - assert!( - recv_error != mpsc::TryRecvError::Disconnected, - "INTERNAL ERROR: Sumeragi message pump disconnected" - ) + if recv_error == mpsc::TryRecvError::Disconnected { + error!("INTERNAL ERROR: Sumeragi message pump disconnected"); + } }) .ok()?; @@ -220,8 +218,7 @@ impl Sumeragi { match self.message_receiver.try_recv() { Ok(message) => { let block = match message { - BlockMessage::BlockCreated(BlockCreated { block }) - | BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block, + BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block, msg => { trace!(?msg, "Not handling the message, waiting for genesis..."); continue; @@ -312,14 +309,15 @@ impl Sumeragi { // NOTE: By this time genesis block is executed and list of trusted peers is updated self.topology = Topology::new(state_block.world.trusted_peers_ids.clone()); - let msg = BlockCreated::from(&genesis); let genesis = genesis .commit(&self.topology) .unpack(|e| self.send_event(e)) .expect("Genesis invalid"); - self.broadcast_packet(msg); - self.commit_block(genesis, state_block); + self.commit_block(genesis.clone(), state_block); + self.broadcast_packet(BlockSyncUpdate { + block: genesis.into(), + }); } fn commit_block(&mut self, block: CommittedBlock, state_block: StateBlock<'_>) { @@ -595,7 +593,6 @@ impl Sumeragi { block=%block_created.block.hash(), "Block received" ); - if let Some(mut valid_block) = self.validate_block( state, &self.topology, @@ -605,7 +602,6 @@ impl Sumeragi { ) { // NOTE: Up until this point it was unknown which block is expected to be received, // therefore all the signatures (of any hash) were collected and will now be pruned - for signature in core::mem::take(voting_signatures) { if let Err(error) = valid_block.block.add_signature(signature, &self.topology) @@ -827,6 +823,7 @@ impl Sumeragi { fn try_create_block<'state>( &mut self, state: &'state State, + genesis_account: &AccountId, voting_block: &mut Option>, ) { assert_eq!(self.role(), Role::Leader); @@ -841,57 +838,74 @@ impl Sumeragi { .expect("INTERNAL BUG: transactions in block exceed usize::MAX"); let block_time = state.world.view().parameters.sumeragi.block_time(); let tx_cache_full = self.transaction_cache.len() >= max_transactions.get(); - let deadline_reached = self.round_start_time.elapsed() > block_time; + let deadline_reached = + self.topology.view_change_index() > 0 || self.round_start_time.elapsed() > block_time; let tx_cache_non_empty = !self.transaction_cache.is_empty(); if tx_cache_full || (deadline_reached && tx_cache_non_empty) { let transactions = self .transaction_cache .iter() - .map(|tx| tx.deref().clone()) + .map(|tx| CommittedTransaction { + value: tx.deref().clone().into(), + error: None, + }) .collect::>(); - let mut state_block = state.block(); - let create_block_start_time = Instant::now(); - let new_block = BlockBuilder::new(transactions) - .chain(self.topology.view_change_index(), &mut state_block) - .sign(self.key_pair.private_key()) - .unpack(|e| self.send_event(e)); - - let created_in = create_block_start_time.elapsed(); - let pipeline_time = state.world.view().parameters().sumeragi.pipeline_time(); - if created_in > pipeline_time / 2 { - warn!( - role=%self.role(), - peer_id=%self.peer_id, - "Creating block takes too much time. \ - This might prevent consensus from operating. \ - Consider increasing `commit_time` or decreasing `max_transactions_in_block`" - ); + let pre_signed_block = BlockPayload { + header: BlockBuilder::make_header( + state.view().latest_block().as_deref(), + self.topology.view_change_index(), + &transactions, + state + .view() + .world + .parameters() + .sumeragi + .consensus_estimation(), + ), + transactions, } + .sign(self.key_pair.private_key()); + let block_created_msg = BlockCreated { + block: pre_signed_block, + }; if self.topology.is_consensus_required().is_some() { - info!( - peer_id=%self.peer_id, - block=%new_block.as_ref().hash(), - view_change_index=%self.topology.view_change_index(), - txns=%new_block.as_ref().transactions().len(), - created_in_ms=%created_in.as_millis(), - "Block created" - ); + self.broadcast_packet(block_created_msg.clone()); + } - let msg = BlockCreated::from(&new_block); - *voting_block = Some(VotingBlock::new(new_block, state_block)); - self.broadcast_packet(msg); + info!( + peer_id=%self.peer_id, + view_change_index=%self.topology.view_change_index(), + block_hash=%block_created_msg.block.hash(), + txns=%block_created_msg.block.transactions().len(), + "Block created" + ); + + let new_voting_block = self + .validate_block( + state, + &self.topology, + genesis_account, + block_created_msg, + voting_block, + ) + .expect("We just created this block ourselves, it has to be valid."); + + if self.topology.is_consensus_required().is_some() { + *voting_block = Some(new_voting_block); } else { - let committed_block = new_block + let committed_block = new_voting_block + .block .commit(&self.topology) .unpack(|e| self.send_event(e)) .expect("INTERNAL BUG: Leader failed to commit created block"); let msg = BlockCommitted::from(&committed_block); self.broadcast_packet(msg); - self.commit_block(committed_block, state_block); + self.commit_block(committed_block, new_voting_block.state_block); + *voting_block = None; } } } @@ -1009,7 +1023,6 @@ pub(crate) fn run( let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter(); let state_view = state.view(); - sumeragi .transaction_cache // Checking if transactions are in the blockchain is costly @@ -1167,7 +1180,7 @@ pub(crate) fn run( .set(sumeragi.topology.view_change_index() as u64); if sumeragi.role() == Role::Leader && voting_block.is_none() { - sumeragi.try_create_block(&state, &mut voting_block); + sumeragi.try_create_block(&state, &genesis_account, &mut voting_block); } } } diff --git a/crates/iroha_core/src/sumeragi/message.rs b/crates/iroha_core/src/sumeragi/message.rs index 6e6cdf8195b..714f3b23bc1 100644 --- a/crates/iroha_core/src/sumeragi/message.rs +++ b/crates/iroha_core/src/sumeragi/message.rs @@ -43,15 +43,6 @@ pub struct BlockCreated { pub block: SignedBlock, } -impl From<&ValidBlock> for BlockCreated { - fn from(block: &ValidBlock) -> Self { - Self { - // TODO: Redundant clone - block: block.clone().into(), - } - } -} - /// `BlockSigned` message structure. #[derive(Debug, Clone, Decode, Encode)] pub struct BlockSigned {