From 7f879eb95fa0add0a404f76081cab9d145703881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Ver=C5=A1i=C4=87?= Date: Tue, 8 Oct 2024 18:16:31 +0200 Subject: [PATCH] refactor: move sumeragi view change request into a separate function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marin Veršić --- crates/iroha_core/src/sumeragi/main_loop.rs | 308 ++++++++++---------- crates/iroha_core/src/sumeragi/mod.rs | 1 + 2 files changed, 155 insertions(+), 154 deletions(-) diff --git a/crates/iroha_core/src/sumeragi/main_loop.rs b/crates/iroha_core/src/sumeragi/main_loop.rs index a51fb9738fb..853b7bb5d06 100644 --- a/crates/iroha_core/src/sumeragi/main_loop.rs +++ b/crates/iroha_core/src/sumeragi/main_loop.rs @@ -42,6 +42,8 @@ pub struct Sumeragi { pub transaction_cache: Vec, /// Metrics for reporting number of view changes in current round pub view_changes_metric: iroha_telemetry::metrics::ViewChangesGauge, + /// View change proof chain + pub view_change_proof_chain: ProofChain, /// Was there a commit in previous round? pub was_commit: bool, @@ -120,9 +122,8 @@ impl Sumeragi { } fn receive_network_packet( - &self, + &mut self, latest_block: HashOf, - view_change_proof_chain: &mut ProofChain, ) -> (Option, bool) { const MAX_CONTROL_MSG_IN_A_ROW: usize = 25; @@ -139,7 +140,7 @@ impl Sumeragi { }) { should_sleep = false; - if let Err(error) = view_change_proof_chain.insert_proof( + if let Err(error) = self.view_change_proof_chain.insert_proof( msg.view_change_proof, &self.topology, latest_block, @@ -151,8 +152,7 @@ impl Sumeragi { } } - let block_msg = - self.receive_block_message_network_packet(latest_block, view_change_proof_chain); + let block_msg = self.receive_block_message_network_packet(latest_block); should_sleep &= block_msg.is_none(); (block_msg, should_sleep) @@ -161,10 +161,10 @@ impl Sumeragi { fn receive_block_message_network_packet( &self, latest_block: HashOf, - view_change_proof_chain: &ProofChain, ) -> Option { - let current_view_change_index = - view_change_proof_chain.verify_with_state(&self.topology, latest_block); + let current_view_change_index = self + .view_change_proof_chain + .verify_with_state(&self.topology, latest_block); loop { let block_msg = self @@ -200,6 +200,141 @@ impl Sumeragi { } } + fn consider_view_change( + &mut self, + state: &State, + voting_block: Option<&VotingBlock>, + last_view_change_time: Instant, + view_change_time: &mut Duration, + ) { + let state_view = state.view(); + let view_change_index = self.prune_view_change_proofs_and_calculate_current_index( + state_view + .latest_block_hash() + .expect("INTERNAL BUG: No latest block"), + ); + + // We broadcast our view change suggestion after having processed the latest from others inside `receive_network_packet` + let block_expected = !self.transaction_cache.is_empty(); + if (block_expected || view_change_index > 0) + && last_view_change_time.elapsed() > *view_change_time + { + if block_expected { + if let Some(VotingBlock { block, .. }) = voting_block { + // NOTE: Suspecting the tail node because it hasn't committed the block yet + + warn!( + peer_id=%self.peer_id, + role=%self.role(), + block=%block.as_ref().hash(), + "Block not committed in due time, requesting view change..." + ); + } else { + // NOTE: Suspecting the leader node because it hasn't produced a block + // If the current node has a transaction, leader should have as well + + warn!( + peer_id=%self.peer_id, + role=%self.role(), + "No block produced in due time, requesting view change..." + ); + } + + let latest_block = state_view + .latest_block_hash() + .expect("INTERNAL BUG: No latest block"); + let suspect_proof = + ProofBuilder::new(latest_block, view_change_index).sign(&self.key_pair); + + self.view_change_proof_chain + .insert_proof(suspect_proof, &self.topology, latest_block) + .unwrap_or_else(|err| error!("{err}")); + } + + // If exist broadcast latest verified proof in case some peers missed it. + // Proof doesn't exist in case view_change_index == 0. + if let Some(latest_verified_proof) = + view_change_index + .checked_sub(1) + .and_then(|view_change_index| { + self.view_change_proof_chain + .get_proof_for_view_change(view_change_index) + }) + { + let msg = ControlFlowMessage::new(latest_verified_proof); + self.broadcast_control_flow_packet(msg); + } + + // If exist broadcast proof for current view change index. + // Proof might not exist for example when view_change_time is up, + // but there is no transactions in the queue so there is nothing to complain about. + if let Some(proof_for_current_view_change_index) = self + .view_change_proof_chain + .get_proof_for_view_change(view_change_index) + { + let msg = ControlFlowMessage::new(proof_for_current_view_change_index); + self.broadcast_control_flow_packet(msg); + } + + // NOTE: View change must be periodically suggested until it is accepted. + // Must be initialized to pipeline time but can increase by chosen amount + *view_change_time += state + .world + .view() + .parameters() + .sumeragi + .pipeline_time(view_change_index, self.topology.max_faults() + 1); + } + } + + fn reset_state( + &mut self, + pipeline_time: Duration, + view_change_index: usize, + voting_block: &mut Option, + voting_signatures: &mut BTreeSet, + last_view_change_time: &mut Instant, + view_change_time: &mut Duration, + ) { + let mut was_commit_or_view_change = self.was_commit; + + let prev_role = self.topology.role(&self.peer_id); + if self.topology.view_change_index() < view_change_index { + let new_rotations = self.topology.nth_rotation(view_change_index); + + error!( + peer=%self.peer_id, + next_role=%self.topology.role(&self.peer_id), + %prev_role, + n=%new_rotations, + %view_change_index, + "Topology rotated n times" + ); + #[cfg(debug_assertions)] + iroha_logger::info!( + peer_id=%self.peer_id, + role=%self.topology.role(&self.peer_id), + topology=?self.topology, + "Topology after rotation" + ); + + was_commit_or_view_change = true; + } + + // Reset state for the next round. + if was_commit_or_view_change { + *voting_block = None; + voting_signatures.clear(); + *last_view_change_time = Instant::now(); + *view_change_time = pipeline_time; + + self.was_commit = false; + } + + self.view_changes_metric + .set(self.topology.view_change_index() as u64); + } + fn init_listen_for_genesis( &mut self, genesis_account: &AccountId, @@ -419,12 +554,12 @@ impl Sumeragi { } fn prune_view_change_proofs_and_calculate_current_index( - &self, + &mut self, latest_block: HashOf, - view_change_proof_chain: &mut ProofChain, ) -> usize { - view_change_proof_chain.prune(latest_block); - view_change_proof_chain.verify_with_state(&self.topology, latest_block) + self.view_change_proof_chain.prune(latest_block); + self.view_change_proof_chain + .verify_with_state(&self.topology, latest_block) } #[allow(clippy::too_many_lines)] @@ -908,54 +1043,6 @@ impl Sumeragi { } } -#[allow(clippy::too_many_arguments)] -fn reset_state( - peer_id: &PeerId, - pipeline_time: Duration, - view_change_index: usize, - was_commit: &mut bool, - topology: &mut Topology, - voting_block: &mut Option, - voting_signatures: &mut BTreeSet, - last_view_change_time: &mut Instant, - view_change_time: &mut Duration, -) { - let mut was_commit_or_view_change = *was_commit; - - let prev_role = topology.role(peer_id); - if topology.view_change_index() < view_change_index { - let new_rotations = topology.nth_rotation(view_change_index); - - error!( - %peer_id, - %prev_role, - next_role=%topology.role(peer_id), - n=%new_rotations, - %view_change_index, - "Topology rotated n times" - ); - #[cfg(debug_assertions)] - iroha_logger::info!( - %peer_id, - role=%topology.role(peer_id), - topology=?topology, - "Topology after rotation" - ); - - was_commit_or_view_change = true; - } - - // Reset state for the next round. - if was_commit_or_view_change { - *voting_block = None; - voting_signatures.clear(); - *last_view_change_time = Instant::now(); - *view_change_time = pipeline_time; - - *was_commit = false; - } -} - #[iroha_logger::log(name = "consensus", skip_all)] /// Execute the main loop of [`Sumeragi`] pub(crate) fn run( @@ -1002,7 +1089,6 @@ pub(crate) fn run( // Proxy tail collection of voting block signatures let mut voting_signatures = BTreeSet::new(); let mut should_sleep = false; - let mut view_change_proof_chain = ProofChain::default(); // Duration after which a view change is suggested let mut view_change_time = state.world.view().parameters().sumeragi.pipeline_time( sumeragi.topology.view_change_index(), @@ -1051,11 +1137,9 @@ pub(crate) fn run( state_view .latest_block_hash() .expect("INTERNAL BUG: No latest block"), - &mut view_change_proof_chain, ); - reset_state( - &sumeragi.peer_id, + sumeragi.reset_state( state .world .view() @@ -1063,23 +1147,17 @@ pub(crate) fn run( .sumeragi .pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1), view_change_index, - &mut sumeragi.was_commit, - &mut sumeragi.topology, &mut voting_block, &mut voting_signatures, &mut last_view_change_time, &mut view_change_time, ); - sumeragi - .view_changes_metric - .set(sumeragi.topology.view_change_index() as u64); if let Some(message) = { let (msg, sleep) = sumeragi.receive_network_packet( state_view .latest_block_hash() .expect("INTERNAL BUG: No latest block"), - &mut view_change_proof_chain, ); should_sleep = sleep; msg @@ -1095,87 +1173,14 @@ pub(crate) fn run( ); } - // State could be changed after handling message so it is necessary to reset state before handling message independent step - let state_view = state.view(); - let view_change_index = sumeragi.prune_view_change_proofs_and_calculate_current_index( - state_view - .latest_block_hash() - .expect("INTERNAL BUG: No latest block"), - &mut view_change_proof_chain, + sumeragi.consider_view_change( + &state, + voting_block.as_ref(), + last_view_change_time, + &mut view_change_time, ); - // We broadcast our view change suggestion after having processed the latest from others inside `receive_network_packet` - let block_expected = !sumeragi.transaction_cache.is_empty(); - if (block_expected || view_change_index > 0) - && last_view_change_time.elapsed() > view_change_time - { - if block_expected { - if let Some(VotingBlock { block, .. }) = voting_block.as_ref() { - // NOTE: Suspecting the tail node because it hasn't committed the block yet - - warn!( - peer_id=%sumeragi.peer_id, - role=%sumeragi.role(), - block=%block.as_ref().hash(), - "Block not committed in due time, requesting view change..." - ); - } else { - // NOTE: Suspecting the leader node because it hasn't produced a block - // If the current node has a transaction, leader should have as well - - warn!( - peer_id=%sumeragi.peer_id, - role=%sumeragi.role(), - "No block produced in due time, requesting view change..." - ); - } - - let latest_block = state_view - .latest_block_hash() - .expect("INTERNAL BUG: No latest block"); - let suspect_proof = - ProofBuilder::new(latest_block, view_change_index).sign(&sumeragi.key_pair); - - view_change_proof_chain - .insert_proof(suspect_proof, &sumeragi.topology, latest_block) - .unwrap_or_else(|err| error!("{err}")); - } - - // If exist broadcast latest verified proof in case some peers missed it. - // Proof doesn't exist in case view_change_index == 0. - if let Some(latest_verified_proof) = - view_change_index - .checked_sub(1) - .and_then(|view_change_index| { - view_change_proof_chain.get_proof_for_view_change(view_change_index) - }) - { - let msg = ControlFlowMessage::new(latest_verified_proof); - sumeragi.broadcast_control_flow_packet(msg); - } - - // If exist broadcast proof for current view change index. - // Proof might not exist for example when view_change_time is up, - // but there is no transactions in the queue so there is nothing to complain about. - if let Some(proof_for_current_view_change_index) = - view_change_proof_chain.get_proof_for_view_change(view_change_index) - { - let msg = ControlFlowMessage::new(proof_for_current_view_change_index); - sumeragi.broadcast_control_flow_packet(msg); - } - - // NOTE: View change must be periodically suggested until it is accepted. - // Must be initialized to pipeline time but can increase by chosen amount - view_change_time += state - .world - .view() - .parameters() - .sumeragi - .pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1); - } - - reset_state( - &sumeragi.peer_id, + sumeragi.reset_state( state .world .view() @@ -1183,16 +1188,11 @@ pub(crate) fn run( .sumeragi .pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1), view_change_index, - &mut sumeragi.was_commit, - &mut sumeragi.topology, &mut voting_block, &mut voting_signatures, &mut last_view_change_time, &mut view_change_time, ); - sumeragi - .view_changes_metric - .set(sumeragi.topology.view_change_index() as u64); if sumeragi.role() == Role::Leader && voting_block.is_none() { sumeragi.try_create_block(&state, &mut voting_block); diff --git a/crates/iroha_core/src/sumeragi/mod.rs b/crates/iroha_core/src/sumeragi/mod.rs index 568ba67da4b..d28c1cd7fc0 100644 --- a/crates/iroha_core/src/sumeragi/mod.rs +++ b/crates/iroha_core/src/sumeragi/mod.rs @@ -223,6 +223,7 @@ impl SumeragiStartArgs { topology, transaction_cache: Vec::new(), view_changes_metric: view_changes, + view_change_proof_chain: ProofChain::default(), was_commit: false, round_start_time: Instant::now(), };