diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 30f77b9809b..1c37bd3fa34 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -138,12 +138,12 @@ impl Sumeragi { }) { should_sleep = false; - if let Err(error) = view_change_proof_chain.merge( - msg.view_change_proofs, + if let Err(error) = view_change_proof_chain.insert_proof( + msg.view_change_proof, &self.topology, latest_block, ) { - trace!(%error, "Failed to add proofs into view change proof chain") + trace!(%error, "Failed to add proof into view change proof chain") } } else { break; @@ -1135,8 +1135,28 @@ pub(crate) fn run( .unwrap_or_else(|err| error!("{err}")); } - let msg = ControlFlowMessage::new(view_change_proof_chain.clone()); - sumeragi.broadcast_control_flow_packet(msg); + // If exist broadcast latest verified proof in case some peers missed it. + // Proof doesn't exist in case view_change_index == 0. + if let Some(latest_verified_proof) = + view_change_index + .checked_sub(1) + .and_then(|view_change_index| { + view_change_proof_chain.get_proof_for_view_change(view_change_index) + }) + { + let msg = ControlFlowMessage::new(latest_verified_proof); + sumeragi.broadcast_control_flow_packet(msg); + } + + // If exist broadcast proof for current view change index. + // Proof might not exist for example when view_change_time is up, + // but there is no transactions in the queue so there is nothing to complain about. + if let Some(proof_for_current_view_change_index) = + view_change_proof_chain.get_proof_for_view_change(view_change_index) + { + let msg = ControlFlowMessage::new(proof_for_current_view_change_index); + sumeragi.broadcast_control_flow_packet(msg); + } // NOTE: View change must be periodically suggested until it is accepted. // Must be initialized to pipeline time but can increase by chosen amount diff --git a/core/src/sumeragi/message.rs b/core/src/sumeragi/message.rs index c3089a2d235..6e6cdf8195b 100644 --- a/core/src/sumeragi/message.rs +++ b/core/src/sumeragi/message.rs @@ -26,13 +26,13 @@ pub enum BlockMessage { pub struct ControlFlowMessage { /// Proof of view change. As part of this message handling, all /// peers which agree with view change should sign it. - pub view_change_proofs: view_change::ProofChain, + pub view_change_proof: view_change::SignedViewChangeProof, } impl ControlFlowMessage { /// Helper function to construct a `ControlFlowMessage` - pub fn new(view_change_proofs: view_change::ProofChain) -> ControlFlowMessage { - ControlFlowMessage { view_change_proofs } + pub fn new(view_change_proof: view_change::SignedViewChangeProof) -> ControlFlowMessage { + ControlFlowMessage { view_change_proof } } } diff --git a/core/src/sumeragi/view_change.rs b/core/src/sumeragi/view_change.rs index 6cd53a07aff..902988504ec 100644 --- a/core/src/sumeragi/view_change.rs +++ b/core/src/sumeragi/view_change.rs @@ -1,6 +1,8 @@ //! Structures related to proofs and reasons of view changes. //! Where view change is a process of changing topology due to some faulty network behavior. +use std::collections::{btree_map::Entry, BTreeMap}; + use eyre::Result; use indexmap::IndexSet; use iroha_crypto::{HashOf, PublicKey, SignatureOf}; @@ -13,13 +15,13 @@ use super::network_topology::Topology; type ViewChangeProofSignature = (PublicKey, SignatureOf); /// Error emerge during insertion of `Proof` into `ProofChain` -#[derive(Error, displaydoc::Display, Debug, Clone, Copy)] +#[derive(Error, displaydoc::Display, Debug, Clone, Copy, PartialEq, Eq)] #[allow(missing_docs)] pub enum Error { /// Block hash of proof doesn't match hash of proof chain BlockHashMismatch, - /// View change index is not present in proof chain - ViewChangeNotFound, + /// Peer already have verified view change proof with index larger than received + ViewChangeOutdated, } #[derive(Debug, Clone, Decode, Encode)] @@ -69,7 +71,14 @@ impl ProofBuilder { impl SignedViewChangeProof { /// Verify the signatures of `other` and add them to this proof. - fn merge_signatures(&mut self, other: Vec, topology: &Topology) { + /// Return number of new signatures added. + fn merge_signatures( + &mut self, + other: Vec, + topology: &Topology, + ) -> usize { + let len_before = self.signatures.len(); + let signatures = core::mem::take(&mut self.signatures) .into_iter() .collect::>(); @@ -85,6 +94,10 @@ impl SignedViewChangeProof { }) .into_iter() .collect(); + + let len_after = self.signatures.len(); + + len_after - len_before } /// Verify if the proof is valid, given the peers in `topology`. @@ -100,12 +113,14 @@ impl SignedViewChangeProof { } } -/// Structure representing sequence of view change proofs. -#[derive(Debug, Clone, Encode, Default)] -pub struct ProofChain(Vec); +/// Structure representing view change proofs collected by the peer. +/// All proofs are attributed to the same block. +#[derive(Debug, Clone, Default)] +pub struct ProofChain(BTreeMap); impl ProofChain { - /// Verify the view change proof chain. + /// Find next index to last verified view change proof. + /// Proof is verified if it has more or qual ot f + 1 valid signatures. pub fn verify_with_state( &self, topology: &Topology, @@ -113,36 +128,25 @@ impl ProofChain { ) -> usize { self.0 .iter() - .enumerate() - .take_while(|(i, proof)| { - let view_change_index = proof.payload.view_change_index as usize; - - proof.payload.latest_block == latest_block - && view_change_index == *i - && proof.verify(topology) + .rev() + .filter(|(_, proof)| proof.payload.latest_block == latest_block) + .find(|(_, proof)| proof.verify(topology)) + .map_or(0, |(view_change_index, _)| { + (*view_change_index as usize) + 1 }) - .count() } - /// Remove invalid proofs from the chain. + /// Prune proofs leave only proofs for specified latest block pub fn prune(&mut self, latest_block: HashOf) { - let valid_count = self - .0 - .iter() - .enumerate() - .take_while(|(i, proof)| { - let view_change_index = proof.payload.view_change_index as usize; - proof.payload.latest_block == latest_block && view_change_index == *i - }) - .count(); - self.0.truncate(valid_count); + self.0 + .retain(|_, proof| proof.payload.latest_block == latest_block) } /// Attempt to insert a view chain proof into this `ProofChain`. /// /// # Errors /// - If proof latest block hash doesn't match peer latest block hash - /// - If proof view change number differs from view change number + /// - If proof view change number lower than current verified view change pub fn insert_proof( &mut self, new_proof: SignedViewChangeProof, @@ -153,64 +157,37 @@ impl ProofChain { return Err(Error::BlockHashMismatch); } let next_unfinished_view_change = self.verify_with_state(topology, latest_block); - if new_proof.payload.view_change_index as usize != next_unfinished_view_change { - return Err(Error::ViewChangeNotFound); // We only care about the current view change that may or may not happen. - } - - let is_proof_chain_incomplete = next_unfinished_view_change < self.0.len(); - if is_proof_chain_incomplete { - self.0[next_unfinished_view_change].merge_signatures(new_proof.signatures, topology); - } else { - self.0.push(new_proof); + let new_proof_view_change_index = new_proof.payload.view_change_index as usize; + if new_proof_view_change_index + 1 < next_unfinished_view_change { + return Err(Error::ViewChangeOutdated); // We only care about current proof and proof which might happen in the future } - Ok(()) - } - - /// Add latest proof from other chain into current. - /// - /// # Errors - /// - If there is mismatch between `other` proof chain latest block hash and peer's latest block hash - /// - If `other` proof chain doesn't have proof for current view chain - pub fn merge( - &mut self, - mut other: Self, - topology: &Topology, - latest_block: HashOf, - ) -> Result<(), Error> { - other.prune(latest_block); - - if other.0.is_empty() { - return Err(Error::BlockHashMismatch); + if new_proof_view_change_index + 1 == next_unfinished_view_change { + return Ok(()); // Received a proof for already verified latest proof, not an error just nothing to do about } - let next_unfinished_view_change = self.verify_with_state(topology, latest_block); - let is_proof_chain_incomplete = next_unfinished_view_change < self.0.len(); - let other_contain_additional_proofs = next_unfinished_view_change < other.0.len(); - - match (is_proof_chain_incomplete, other_contain_additional_proofs) { - // Case 1: proof chain is incomplete and other have corresponding proof. - (true, true) => { - let new_proof = other.0.swap_remove(next_unfinished_view_change); - self.0[next_unfinished_view_change] + match self.0.entry(new_proof.payload.view_change_index) { + Entry::Occupied(mut occupied) => { + occupied + .get_mut() .merge_signatures(new_proof.signatures, topology); } - // Case 2: proof chain is complete, but other have additional proof. - (false, true) => { - let new_proof = other.0.swap_remove(next_unfinished_view_change); - self.0.push(new_proof); - } - // Case 3: proof chain is incomplete, but other doesn't contain corresponding proof. - // Usually this mean that sender peer is behind receiver peer. - (true, false) => { - return Err(Error::ViewChangeNotFound); + Entry::Vacant(vacant) => { + vacant.insert(new_proof); } - // Case 4: proof chain is complete, but other doesn't have any new peer. - // This considered normal course of action. - (false, false) => {} } Ok(()) } + + /// Get proof for requested view change index + pub fn get_proof_for_view_change( + &self, + view_change_index: usize, + ) -> Option { + #[allow(clippy::cast_possible_truncation)] + // Was created from u32 so should be able to cast back + self.0.get(&(view_change_index as u32)).cloned() + } } mod candidate { @@ -270,15 +247,221 @@ mod candidate { .map_err(Into::into) } } - impl Decode for ProofChain { - fn decode(input: &mut I) -> Result { - let proofs = Vec::::decode(input)?; +} - if proofs.is_empty() { - return Err("Empty proof chain".into()); - } +#[cfg(test)] +mod tests { + use iroha_crypto::{Hash, HashOf, KeyPair}; + use iroha_data_model::peer::PeerId; - Ok(ProofChain(proofs)) + use super::*; + + fn key_pairs() -> [KeyPair; N] { + [(); N].map(|()| KeyPair::random()) + } + + fn prepare_data() -> ([KeyPair; N], Topology, HashOf) { + let key_pairs = key_pairs::(); + let peer_ids = key_pairs.clone().map(|key_pair| { + let (public_key, _) = key_pair.into_parts(); + PeerId::new(([127, 0, 0, 1], 8080).into(), public_key) + }); + let topology = Topology::new(peer_ids); + let latest_block = HashOf::from_untyped_unchecked(Hash::prehashed([0; 32])); + + (key_pairs, topology, latest_block) + } + + fn create_signed_payload( + payload: ViewChangeProofPayload, + signatories: &[KeyPair], + ) -> SignedViewChangeProof { + let signatures = signatories + .iter() + .map(|key_pair| { + ( + key_pair.public_key().clone(), + SignatureOf::new(key_pair.private_key(), &payload), + ) + }) + .collect(); + SignedViewChangeProof { + signatures, + payload, } } + + #[test] + fn verify_with_state_on_empty() { + let (_key_pairs, topology, latest_block) = prepare_data::<10>(); + let chain = ProofChain::default(); + + assert_eq!(chain.verify_with_state(&topology, latest_block), 0); + } + + #[test] + fn verify_with_state() { + let (key_pairs, topology, latest_block) = prepare_data::<10>(); + + let len = 10; + + let mut view_change_payloads = (0..).map(|view_change_index| ViewChangeProofPayload { + latest_block, + view_change_index, + }); + + let complete_proofs = (&mut view_change_payloads) + .take(len) + .map(|payload| create_signed_payload(payload, &key_pairs[..=topology.max_faults()])) + .collect::>(); + + let incomplete_proofs = (&mut view_change_payloads) + .take(len) + .map(|payload| create_signed_payload(payload, &key_pairs[..1])) + .collect::>(); + + let proofs = { + let mut proofs = complete_proofs; + proofs.extend(incomplete_proofs); + proofs + }; + let chain = ProofChain( + proofs + .clone() + .into_iter() + .map(|proof| (proof.payload.view_change_index, proof)) + .collect(), + ); + + // verify_with_state equal to view_change_index of last verified proof plus 1 + assert_eq!(chain.verify_with_state(&topology, latest_block), len); + + // Add complete proofs on top to check that verified view change is updated as well + let complete_proofs = (&mut view_change_payloads) + .take(len) + .map(|payload| create_signed_payload(payload, &key_pairs[..=topology.max_faults()])) + .collect::>(); + + let proofs = { + let mut proofs = proofs; + proofs.extend(complete_proofs); + proofs + }; + let chain = ProofChain( + proofs + .clone() + .into_iter() + .map(|proof| (proof.payload.view_change_index, proof)) + .collect(), + ); + assert_eq!(chain.verify_with_state(&topology, latest_block), 3 * len); + } + + #[test] + fn proof_for_invalid_block_is_rejected() { + let (key_pairs, topology, latest_block) = prepare_data::<10>(); + + let wrong_latest_block = HashOf::from_untyped_unchecked(Hash::prehashed([1; 32])); + + let mut me = ProofChain::default(); + let other = ProofBuilder::new(wrong_latest_block, 0).sign(&key_pairs[1]); + + assert_eq!( + me.insert_proof(other, &topology, latest_block), + Err(Error::BlockHashMismatch) + ); + } + + #[test] + fn proof_from_the_past_is_rejected() { + let (key_pairs, topology, latest_block) = prepare_data::<10>(); + + let mut chain = ProofChain::default(); + + let proof_future = create_signed_payload( + ViewChangeProofPayload { + latest_block, + view_change_index: 10, + }, + &key_pairs, + ); + + assert_eq!( + Ok(()), + chain.insert_proof(proof_future, &topology, latest_block) + ); + assert_eq!(chain.verify_with_state(&topology, latest_block), 11); + + let proof = create_signed_payload( + ViewChangeProofPayload { + latest_block, + view_change_index: 1, + }, + &key_pairs, + ); + + assert_eq!( + Err(Error::ViewChangeOutdated), + chain.insert_proof(proof, &topology, latest_block) + ); + assert_eq!(chain.verify_with_state(&topology, latest_block), 11); + } + + #[test] + fn proofs_are_merged() { + let (key_pairs, topology, latest_block) = prepare_data::<10>(); + + let mut chain = ProofChain::default(); + + let (from, to) = (topology.max_faults() / 2, topology.max_faults() + 1); + let payload = ViewChangeProofPayload { + latest_block, + view_change_index: 0, + }; + + let proof_0_part_1 = create_signed_payload(payload.clone(), &key_pairs[..from]); + + assert_eq!( + Ok(()), + chain.insert_proof(proof_0_part_1, &topology, latest_block) + ); + assert_eq!(chain.verify_with_state(&topology, latest_block), 0); + + let proof_0_part_2 = create_signed_payload(payload, &key_pairs[from..to]); + + assert_eq!( + Ok(()), + chain.insert_proof(proof_0_part_2, &topology, latest_block) + ); + assert_eq!(chain.verify_with_state(&topology, latest_block), 1); + } + + #[test] + fn proofs_are_appended() { + let (key_pairs, topology, latest_block) = prepare_data::<10>(); + + let mut chain = ProofChain::default(); + + let proof_0 = create_signed_payload( + ViewChangeProofPayload { + latest_block, + view_change_index: 0, + }, + &key_pairs, + ); + + assert_eq!(Ok(()), chain.insert_proof(proof_0, &topology, latest_block)); + assert_eq!(chain.verify_with_state(&topology, latest_block), 1); + + let proof_1 = create_signed_payload( + ViewChangeProofPayload { + latest_block, + view_change_index: 1, + }, + &key_pairs, + ); + + assert_eq!(Ok(()), chain.insert_proof(proof_1, &topology, latest_block)); + assert_eq!(chain.verify_with_state(&topology, latest_block), 2); + } }