diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 53ee9fe3306..92a345dce62 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -177,8 +177,11 @@ impl NetworkRelay { } match msg { - SumeragiPacket(data) => { - self.sumeragi.incoming_message(*data); + SumeragiBlock(data) => { + self.sumeragi.incoming_block_message(*data); + } + SumeragiControlFlow(data) => { + self.sumeragi.incoming_control_flow_message(*data); } BlockSync(data) => self.block_sync.message(*data).await, TransactionGossiper(data) => self.gossiper.gossip(*data).await, diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs index bf74dfcbbfd..2db1ae33857 100644 --- a/core/src/block_sync.rs +++ b/core/src/block_sync.rs @@ -129,7 +129,6 @@ impl BlockSynchronizer { pub mod message { //! Module containing messages for [`BlockSynchronizer`](super::BlockSynchronizer). use super::*; - use crate::sumeragi::view_change::ProofChain; /// Get blocks after some block #[derive(Debug, Clone, Decode, Encode)] @@ -234,12 +233,11 @@ pub mod message { } } Message::ShareBlocks(ShareBlocks { blocks, .. }) => { - use crate::sumeragi::message::{Message, MessagePacket}; + use crate::sumeragi::message::BlockMessage; for block in blocks.clone() { - block_sync.sumeragi.incoming_message(MessagePacket::new( - ProofChain::default(), - Some(Message::BlockSyncUpdate(block.into())), - )); + block_sync + .sumeragi + .incoming_block_message(BlockMessage::BlockSyncUpdate(block.into())); } } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 3d17d7a16e7..ab15b570eb5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -26,8 +26,9 @@ use parity_scale_codec::{Decode, Encode}; use tokio::sync::broadcast; use crate::{ - block_sync::message::Message as BlockSyncMessage, prelude::*, - sumeragi::message::MessagePacket as SumeragiPacket, + block_sync::message::Message as BlockSyncMessage, + prelude::*, + sumeragi::message::{BlockMessage, ControlFlowMessage}, }; /// The interval at which sumeragi checks if there are tx in the `queue`. @@ -60,8 +61,10 @@ pub type EventsSender = broadcast::Sender; /// The network message #[derive(Clone, Debug, Encode, Decode)] pub enum NetworkMessage { - /// Blockchain message - SumeragiPacket(Box), + /// Blockchain concensus data message + SumeragiBlock(Box), + /// Blockchain concensus control flow message + SumeragiControlFlow(Box), /// Block sync message BlockSync(Box), /// Transaction gossiper message diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 33e5e41515f..c5328008646 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -41,7 +41,7 @@ pub struct Sumeragi { /// Receiver channel, for control flow messages. pub control_message_receiver: mpsc::Receiver, /// Receiver channel. - pub message_receiver: mpsc::Receiver, + pub message_receiver: mpsc::Receiver, /// Only used in testing. Causes the genesis peer to withhold blocks when it /// is the proxy tail. pub debug_force_soft_fork: bool, @@ -81,12 +81,13 @@ impl Sumeragi { /// # Errors /// Fails if network sending fails #[instrument(skip(self, packet))] - fn post_packet_to(&self, packet: MessagePacket, peer: &PeerId) { + fn post_packet_to(&self, packet: BlockMessage, peer: &PeerId) { if peer == &self.peer_id { return; } + let post = iroha_p2p::Post { - data: NetworkMessage::SumeragiPacket(Box::new(packet)), + data: NetworkMessage::SumeragiBlock(Box::new(packet)), peer_id: peer.clone(), }; self.network.post(post); @@ -95,7 +96,7 @@ impl Sumeragi { #[allow(clippy::needless_pass_by_value, single_use_lifetimes)] // TODO: uncomment when anonymous lifetimes are stable fn broadcast_packet_to<'peer_id>( &self, - msg: MessagePacket, + msg: BlockMessage, ids: impl IntoIterator + Send, ) { for peer_id in ids { @@ -103,9 +104,16 @@ impl Sumeragi { } } - fn broadcast_packet(&self, msg: MessagePacket) { + fn broadcast_packet(&self, msg: BlockMessage) { + let broadcast = iroha_p2p::Broadcast { + data: NetworkMessage::SumeragiBlock(Box::new(msg)), + }; + self.network.broadcast(broadcast); + } + + fn broadcast_control_flow_packet(&self, msg: ControlFlowMessage) { let broadcast = iroha_p2p::Broadcast { - data: NetworkMessage::SumeragiPacket(Box::new(msg)), + data: NetworkMessage::SumeragiControlFlow(Box::new(msg)), }; self.network.broadcast(broadcast); } @@ -138,48 +146,45 @@ impl Sumeragi { fn receive_network_packet( &self, view_change_proof_chain: &mut ProofChain, - control_message_in_a_row_counter: &mut usize, - ) -> Option { + ) -> (Option, bool) { const MAX_CONTROL_MSG_IN_A_ROW: usize = 25; - if *control_message_in_a_row_counter < MAX_CONTROL_MSG_IN_A_ROW { - *control_message_in_a_row_counter += 1; - self.control_message_receiver + let mut should_sleep = true; + for _ in 0..MAX_CONTROL_MSG_IN_A_ROW { + if let Ok(msg) = self.control_message_receiver .try_recv() .map_err(|recv_error| { assert!( recv_error != mpsc::TryRecvError::Disconnected, "Sumeragi control message pump disconnected. This is not a recoverable error." ) - }) - .ok() - .map(std::convert::Into::into) - } else { - None - }.or_else(|| { - *control_message_in_a_row_counter = 0; - self - .message_receiver - .try_recv() - .map_err(|recv_error| { - assert!( - recv_error != mpsc::TryRecvError::Disconnected, - "Sumeragi message pump disconnected. This is not a recoverable error." - ) - }) - .ok() - }) - .and_then(|packet : MessagePacket| { - if let Err(error) = view_change_proof_chain.merge( - packet.view_change_proofs, - &self.current_topology.ordered_peers, - self.current_topology.max_faults(), - self.wsv.latest_block_hash(), - ) { - trace!(%error, "Failed to add proofs into view change proof chain") + }) { + should_sleep = false; + if let Err(error) = view_change_proof_chain.merge( + msg.view_change_proofs, + &self.current_topology.ordered_peers, + self.current_topology.max_faults(), + self.wsv.latest_block_hash(), + ) { + trace!(%error, "Failed to add proofs into view change proof chain") + } + } else { + break; } - packet.message - }) + } + + let msg = self + .message_receiver + .try_recv() + .map_err(|recv_error| { + assert!( + recv_error != mpsc::TryRecvError::Disconnected, + "Sumeragi message pump disconnected. This is not a recoverable error." + ) + }) + .ok(); + should_sleep &= msg.is_none(); + (msg, should_sleep) } fn init_listen_for_genesis( @@ -196,38 +201,36 @@ impl Sumeragi { })?; match self.message_receiver.try_recv() { - Ok(packet) => { - if let Some(message) = packet.message { - let mut new_wsv = self.wsv.clone(); - - let block = match message { - Message::BlockCreated(BlockCreated { block }) - | Message::BlockSyncUpdate(BlockSyncUpdate { block }) => block, - msg => { - trace!(?msg, "Not handling the message, waiting for genesis..."); + Ok(message) => { + let mut new_wsv = self.wsv.clone(); + + let block = match message { + BlockMessage::BlockCreated(BlockCreated { block }) + | BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block, + msg => { + trace!(?msg, "Not handling the message, waiting for genesis..."); + continue; + } + }; + + let block = + match ValidBlock::validate(block, &self.current_topology, &mut new_wsv) + .and_then(|block| { + block + .commit(&self.current_topology) + .map_err(|(block, error)| (block.into(), error)) + }) { + Ok(block) => block, + Err((_, error)) => { + error!(?error, "Received invalid genesis block"); continue; } }; - let block = - match ValidBlock::validate(block, &self.current_topology, &mut new_wsv) - .and_then(|block| { - block - .commit(&self.current_topology) - .map_err(|(block, error)| (block.into(), error)) - }) { - Ok(block) => block, - Err((_, error)) => { - error!(?error, "Received invalid genesis block"); - continue; - } - }; - - new_wsv.world_mut().trusted_peers_ids = - block.payload().commit_topology.clone(); - self.commit_block(block, new_wsv); - return Err(EarlyReturn::GenesisBlockReceivedAndCommitted); - } + new_wsv.world_mut().trusted_peers_ids = + block.payload().commit_topology.clone(); + self.commit_block(block, new_wsv); + return Err(EarlyReturn::GenesisBlockReceivedAndCommitted); } Err(mpsc::TryRecvError::Disconnected) => return Err(EarlyReturn::Disconnected), _ => (), @@ -253,10 +256,7 @@ impl Sumeragi { .sign(self.key_pair.clone()) .expect("Genesis signing failed"); - let genesis_msg = MessagePacket::new( - ProofChain::default(), - Some(BlockCreated::from(genesis.clone()).into()), - ); + let genesis_msg = BlockCreated::from(genesis.clone()).into(); let genesis = genesis .commit(&self.current_topology) @@ -366,29 +366,6 @@ impl Sumeragi { } } -fn suggest_view_change( - sumeragi: &Sumeragi, - view_change_proof_chain: &mut ProofChain, - current_view_change_index: u64, -) { - let suspect_proof = - ProofBuilder::new(sumeragi.wsv.latest_block_hash(), current_view_change_index) - .sign(sumeragi.key_pair.clone()) - .expect("Proof signing failed"); - - view_change_proof_chain - .insert_proof( - &sumeragi.current_topology.ordered_peers, - sumeragi.current_topology.max_faults(), - sumeragi.wsv.latest_block_hash(), - suspect_proof, - ) - .unwrap_or_else(|err| error!("{err}")); - - let msg = MessagePacket::new(view_change_proof_chain.clone(), None); - sumeragi.broadcast_packet(msg); -} - fn prune_view_change_proofs_and_calculate_current_index( sumeragi: &Sumeragi, view_change_proof_chain: &mut ProofChain, @@ -403,11 +380,10 @@ fn prune_view_change_proofs_and_calculate_current_index( #[allow(clippy::too_many_lines)] fn handle_message( - message: Message, + message: BlockMessage, sumeragi: &mut Sumeragi, voting_block: &mut Option, current_view_change_index: u64, - view_change_proof_chain: &mut ProofChain, voting_signatures: &mut Vec>, ) { let current_topology = &sumeragi.current_topology; @@ -416,7 +392,7 @@ fn handle_message( #[allow(clippy::suspicious_operation_groupings)] match (message, role) { - (Message::BlockSyncUpdate(BlockSyncUpdate { block }), _) => { + (BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }), _) => { let block_hash = block.hash(); info!(%addr, %role, hash=%block_hash, "Block sync update received"); @@ -469,7 +445,7 @@ fn handle_message( } } ( - Message::BlockCommitted(BlockCommitted { hash, signatures }), + BlockMessage::BlockCommitted(BlockCommitted { hash, signatures }), Role::Leader | Role::ValidatingPeer | Role::ProxyTail | Role::ObservingPeer, ) => { let is_consensus_required = current_topology.is_consensus_required().is_some(); @@ -504,7 +480,7 @@ fn handle_message( error!(%addr, %role, %hash, "Peer missing voting block") } } - (Message::BlockCreated(block_created), Role::ValidatingPeer) => { + (BlockMessage::BlockCreated(block_created), Role::ValidatingPeer) => { let current_topology = current_topology .is_consensus_required() .expect("Peer has `ValidatingPeer` role, which mean that current topology require consensus"); @@ -512,10 +488,7 @@ fn handle_message( if let Some(v_block) = vote_for_block(sumeragi, ¤t_topology, block_created) { let block_hash = v_block.block.payload().hash(); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockSigned::from(v_block.block.clone()).into()), - ); + let msg = BlockSigned::from(v_block.block.clone()).into(); sumeragi.broadcast_packet_to(msg, [current_topology.proxy_tail()]); info!(%addr, %block_hash, "Block validated, signed and forwarded"); @@ -523,7 +496,7 @@ fn handle_message( *voting_block = Some(v_block); } } - (Message::BlockCreated(block_created), Role::ObservingPeer) => { + (BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => { let current_topology = current_topology.is_consensus_required().expect( "Peer has `ObservingPeer` role, which mean that current topology require consensus", ); @@ -532,10 +505,7 @@ fn handle_message( if current_view_change_index >= 1 { let block_hash = v_block.block.payload().hash(); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockSigned::from(v_block.block.clone()).into()), - ); + let msg = BlockSigned::from(v_block.block.clone()).into(); sumeragi.broadcast_packet_to(msg, [current_topology.proxy_tail()]); info!(%addr, %block_hash, "Block validated, signed and forwarded"); @@ -545,7 +515,7 @@ fn handle_message( } } } - (Message::BlockCreated(block_created), Role::ProxyTail) => { + (BlockMessage::BlockCreated(block_created), Role::ProxyTail) => { if let Some(mut new_block) = vote_for_block(sumeragi, current_topology, block_created) { // NOTE: Up until this point it was unknown which block is expected to be received, // therefore all the signatures (of any hash) were collected and will now be pruned @@ -553,7 +523,7 @@ fn handle_message( *voting_block = Some(new_block); } } - (Message::BlockSigned(BlockSigned { hash, signatures }), Role::ProxyTail) => { + (BlockMessage::BlockSigned(BlockSigned { hash, signatures }), Role::ProxyTail) => { trace!(block_hash=%hash, "Received block signatures"); let roles: &[Role] = if current_view_change_index >= 1 { @@ -588,7 +558,6 @@ fn process_message_independent( sumeragi: &mut Sumeragi, voting_block: &mut Option, current_view_change_index: u64, - view_change_proof_chain: &mut ProofChain, round_start_time: &Instant, #[cfg_attr(not(debug_assertions), allow(unused_variables))] is_genesis_peer: bool, ) { @@ -629,10 +598,7 @@ fn process_message_independent( info!(%addr, block_payload_hash=%new_block.payload().hash(), "Block created"); *voting_block = Some(VotingBlock::new(new_block.clone(), new_wsv)); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockCreated::from(new_block).into()), - ); + let msg = BlockCreated::from(new_block).into(); if current_view_change_index >= 1 { sumeragi.broadcast_packet(msg); } else { @@ -641,10 +607,7 @@ fn process_message_independent( } else { match new_block.commit(current_topology) { Ok(committed_block) => { - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockCommitted::from(committed_block.clone()).into()), - ); + let msg = BlockCommitted::from(committed_block.clone()).into(); sumeragi.broadcast_packet(msg); sumeragi.commit_block(committed_block, new_wsv); @@ -664,10 +627,7 @@ fn process_message_independent( Ok(committed_block) => { info!(voting_block_hash = %committed_block.hash(), "Block reached required number of votes"); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockCommitted::from(committed_block.clone()).into()), - ); + let msg = BlockCommitted::from(committed_block.clone()).into(); let current_topology = current_topology .is_consensus_required() @@ -824,15 +784,11 @@ pub(crate) fn run( // Instant when the previous view change or round happened. let mut last_view_change_time = Instant::now(); - // Internal variable used to pick receiver channel. Initialize to zero. - let mut control_message_in_a_row_counter = 0; - while !should_terminate(&mut shutdown_receiver) { if should_sleep { let span = span!(Level::TRACE, "main_thread_sleep"); let _enter = span.enter(); std::thread::sleep(std::time::Duration::from_millis(5)); - should_sleep = false; } let span_for_sumeragi_cycle = span!(Level::TRACE, "main_thread_cycle"); let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter(); @@ -880,57 +836,66 @@ pub(crate) fn run( &mut view_change_time, ); + if let Some(message) = { + let (msg, sleep) = sumeragi.receive_network_packet(&mut view_change_proof_chain); + should_sleep = sleep; + msg + } { + handle_message( + message, + &mut sumeragi, + &mut voting_block, + current_view_change_index, + &mut voting_signatures, + ); + } + + // State could be changed after handling message so it is necessary to reset state before handling message independent step + let current_view_change_index = prune_view_change_proofs_and_calculate_current_index( + &sumeragi, + &mut view_change_proof_chain, + ); + + // We broadcast our view change suggestion after having processed the latest from others inside `receive_network_packet` let node_expects_block = !sumeragi.transaction_cache.is_empty(); - if node_expects_block && last_view_change_time.elapsed() > view_change_time { + if (node_expects_block || current_view_change_index > 0) + && last_view_change_time.elapsed() > view_change_time + { let role = sumeragi.current_topology.role(&sumeragi.peer_id); - if let Some(VotingBlock { block, .. }) = voting_block.as_ref() { - // NOTE: Suspecting the tail node because it hasn't yet committed a block produced by leader - warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, block=%block.payload().hash(), "Block not committed in due time, requesting view change..."); - } else { - // NOTE: Suspecting the leader node because it hasn't produced a block - // If the current node has a transaction, the leader should have as well - warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, "No block produced in due time, requesting view change..."); + if node_expects_block { + if let Some(VotingBlock { block, .. }) = voting_block.as_ref() { + // NOTE: Suspecting the tail node because it hasn't yet committed a block produced by leader + warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, block=%block.payload().hash(), "Block not committed in due time, requesting view change..."); + } else { + // NOTE: Suspecting the leader node because it hasn't produced a block + // If the current node has a transaction, the leader should have as well + warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, "No block produced in due time, requesting view change..."); + } + + let suspect_proof = + ProofBuilder::new(sumeragi.wsv.latest_block_hash(), current_view_change_index) + .sign(sumeragi.key_pair.clone()) + .expect("Proof signing failed"); + + view_change_proof_chain + .insert_proof( + &sumeragi.current_topology.ordered_peers, + sumeragi.current_topology.max_faults(), + sumeragi.wsv.latest_block_hash(), + suspect_proof, + ) + .unwrap_or_else(|err| error!("{err}")); } - suggest_view_change( - &sumeragi, - &mut view_change_proof_chain, - current_view_change_index, - ); + let msg = ControlFlowMessage::new(view_change_proof_chain.clone()); + sumeragi.broadcast_control_flow_packet(msg); // NOTE: View change must be periodically suggested until it is accepted. // Must be initialized to pipeline time but can increase by chosen amount view_change_time += sumeragi.pipeline_time(); } - sumeragi - .receive_network_packet( - &mut view_change_proof_chain, - &mut control_message_in_a_row_counter, - ) - .map_or_else( - || { - should_sleep = true; - }, - |message| { - handle_message( - message, - &mut sumeragi, - &mut voting_block, - current_view_change_index, - &mut view_change_proof_chain, - &mut voting_signatures, - ); - }, - ); - - // State could be changed after handling message so it is necessary to reset state before handling message independent step - let current_view_change_index = prune_view_change_proofs_and_calculate_current_index( - &sumeragi, - &mut view_change_proof_chain, - ); - reset_state( &sumeragi.peer_id, sumeragi.pipeline_time(), @@ -953,7 +918,6 @@ pub(crate) fn run( &mut sumeragi, &mut voting_block, current_view_change_index, - &mut view_change_proof_chain, &round_start_time, is_genesis_peer, ); diff --git a/core/src/sumeragi/message.rs b/core/src/sumeragi/message.rs index 5aa47890cdd..329f9ba135e 100644 --- a/core/src/sumeragi/message.rs +++ b/core/src/sumeragi/message.rs @@ -7,30 +7,10 @@ use parity_scale_codec::{Decode, Encode}; use super::view_change; use crate::block::{CommittedBlock, ValidBlock}; -/// Helper structure, wrapping messages and view change proofs. -#[derive(Debug, Clone, Decode, Encode)] -pub struct MessagePacket { - /// Proof of view change. As part of this message handling, all - /// peers which agree with view change should sign it. - pub view_change_proofs: view_change::ProofChain, - /// Actual Sumeragi message in this packet. - pub message: Option, -} - -impl MessagePacket { - /// Construct [`Self`] - pub fn new(view_change_proofs: view_change::ProofChain, message: Option) -> Self { - Self { - view_change_proofs, - message, - } - } -} - #[allow(clippy::enum_variant_names)] /// Message's variants that are used by peers to communicate in the process of consensus. #[derive(Debug, Clone, Decode, Encode, FromVariant)] -pub enum Message { +pub enum BlockMessage { /// This message is sent by leader to all validating peers, when a new block is created. BlockCreated(BlockCreated), /// This message is sent by validating peers to proxy tail and observing peers when they have signed this block. @@ -42,18 +22,17 @@ pub enum Message { } /// Specialization of `MessagePacket` +#[derive(Debug, Clone, Decode, Encode)] pub struct ControlFlowMessage { /// Proof of view change. As part of this message handling, all /// peers which agree with view change should sign it. pub view_change_proofs: view_change::ProofChain, } -impl From for MessagePacket { - fn from(m: ControlFlowMessage) -> MessagePacket { - MessagePacket { - view_change_proofs: m.view_change_proofs, - message: None, - } +impl ControlFlowMessage { + /// Helper function to construct a `ControlFlowMessage` + pub fn new(view_change_proofs: view_change::ProofChain) -> ControlFlowMessage { + ControlFlowMessage { view_change_proofs } } } diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index 8c82663ee6c..5d56d9a52d7 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -26,10 +26,7 @@ pub mod view_change; use parking_lot::Mutex; -use self::{ - message::{Message, *}, - view_change::ProofChain, -}; +use self::{message::*, view_change::ProofChain}; use crate::{kura::Kura, prelude::*, queue::Queue, EventsSender, IrohaNetwork, NetworkMessage}; /* @@ -55,7 +52,7 @@ pub struct SumeragiHandle { _thread_handle: Arc, // Should be dropped after `_thread_handle` to prevent sumeargi thread from panicking control_message_sender: mpsc::SyncSender, - message_sender: mpsc::SyncSender, + message_sender: mpsc::SyncSender, } impl SumeragiHandle { @@ -203,20 +200,21 @@ impl SumeragiHandle { &self.metrics } + /// Deposit a sumeragi control flow network message. + pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) { + if let Err(error) = self.control_message_sender.try_send(msg) { + self.metrics.dropped_messages.inc(); + error!( + ?error, + "This peer is faulty. \ + Incoming control messages have to be dropped due to low processing speed." + ); + } + } + /// Deposit a sumeragi network message. - pub fn incoming_message(&self, msg: MessagePacket) { - if msg.message.is_none() { - if let Err(error) = self.control_message_sender.try_send(ControlFlowMessage { - view_change_proofs: msg.view_change_proofs, - }) { - self.metrics.dropped_messages.inc(); - error!( - ?error, - "This peer is faulty. \ - Incoming control messages have to be dropped due to low processing speed." - ); - } - } else if let Err(error) = self.message_sender.try_send(msg) { + pub fn incoming_block_message(&self, msg: BlockMessage) { + if let Err(error) = self.message_sender.try_send(msg) { self.metrics.dropped_messages.inc(); error!( ?error,