Skip to content

Commit

Permalink
[fix] #4140: Fix registration of new peer
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic committed Dec 13, 2023
1 parent 94b9466 commit c57ccf0
Show file tree
Hide file tree
Showing 19 changed files with 198 additions and 109 deletions.
4 changes: 3 additions & 1 deletion client/tests/integration/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub use iroha_config::base::proxy::Builder as _;
use iroha_config::iroha::ConfigurationProxy;
use test_network::*;

use super::{Builder, Configuration, ConfigurationProxy};
use super::Configuration;

#[test]
fn get_config() {
Expand Down
119 changes: 86 additions & 33 deletions client/tests/integration/connected_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::thread;
use eyre::{Context, Result};
use iroha_client::client::Client;
use iroha_data_model::{
parameter::{default::MAX_TRANSACTIONS_IN_BLOCK, ParametersBuilder},
isi::{RegisterExpr, UnregisterExpr},
peer::Peer as DataModelPeer,
prelude::*,
IdBox,
};
use iroha_primitives::unique_vec;
use rand::{seq::SliceRandom, thread_rng, Rng};
use test_network::*;
use tokio::runtime::Runtime;

use super::Configuration;

Expand All @@ -22,11 +25,50 @@ fn connected_peers_with_f_1_0_1() -> Result<()> {
connected_peers_with_f(1, Some(11_000))
}

#[test]
fn register_new_peer() -> Result<()> {
let (_rt, network, _) = <Network>::start_test_with_runtime(4, Some(11_180));
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Configuration::pipeline_time();

let mut peer_clients: Vec<_> = Network::peers(&network)
.zip(Network::clients(&network))
.collect();

check_status(&peer_clients, 1);

// Start new peer
let mut configuration = Configuration::test();
configuration.sumeragi.trusted_peers.peers =
unique_vec![peer_clients.choose(&mut thread_rng()).unwrap().0.id.clone()];
let rt = Runtime::test();
let new_peer = rt.block_on(
PeerBuilder::new()
.with_configuration(configuration)
.with_into_genesis(WithGenesis::None)
.with_port(11_200)
.start(),
);

let register_peer = RegisterExpr::new(DataModelPeer::new(new_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(register_peer)?;
peer_clients.push((&new_peer, Client::test(&new_peer.api_address)));
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect

check_status(&peer_clients, 2);

Ok(())
}

/// Test the number of connected peers, changing the number of faults tolerated down and up
fn connected_peers_with_f(faults: u64, start_port: Option<u16>) -> Result<()> {
let n_peers = 3 * faults + 1;

let (_rt, network, client) = <Network>::start_test_with_runtime(
let (_rt, network, _) = <Network>::start_test_with_runtime(
(n_peers)
.try_into()
.wrap_err("`faults` argument `u64` value too high, cannot convert to `u32`")?,
Expand All @@ -35,40 +77,51 @@ fn connected_peers_with_f(faults: u64, start_port: Option<u16>) -> Result<()> {
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Configuration::pipeline_time();

client.submit_blocking(
ParametersBuilder::new()
.add_parameter(MAX_TRANSACTIONS_IN_BLOCK, 1u32)?
.into_set_parameters(),
)?;
let mut peer_clients: Vec<_> = Network::peers(&network)
.zip(Network::clients(&network))
.collect();

// Confirm all peers connected
let mut status = client.get_status()?;
assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, 2);
check_status(&peer_clients, 1);

// Unregister a peer: committed with f = `faults`
// then `status.peers` decrements
let peer = network.peers.values().last().unwrap();
let peer_client = Client::test(&peer.api_address);
let unregister_peer = UnregisterExpr::new(IdBox::PeerId(peer.id.clone()));
client.submit_blocking(unregister_peer)?;
// Unregister a peer: committed with f = `faults` then `status.peers` decrements
let removed_peer_idx = rand::thread_rng().gen_range(0..peer_clients.len());
let (removed_peer, _) = &peer_clients[removed_peer_idx];
let unregister_peer = UnregisterExpr::new(IdBox::PeerId(removed_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(unregister_peer)?;
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect
status = client.get_status()?;
assert_eq!(status.peers, n_peers - 2);
assert_eq!(status.blocks, 3);
status = peer_client.get_status()?;
let (removed_peer, removed_peer_client) = peer_clients.remove(removed_peer_idx);

check_status(&peer_clients, 2);
let status = removed_peer_client.get_status()?;
assert_eq!(status.blocks, 2);
assert_eq!(status.peers, 0);

// Re-register the peer: committed with f = `faults` - 1 then
// `status.peers` increments
let register_peer = RegisterExpr::new(DataModelPeer::new(peer.id.clone()));
client.submit_blocking(register_peer)?;
thread::sleep(pipeline_time * 4); // Wait for some time to allow peers to connect
status = client.get_status()?;
assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, 4);
status = peer_client.get_status()?;
assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, 4);
// Re-register the peer: committed with f = `faults` - 1 then `status.peers` increments
let register_peer = RegisterExpr::new(DataModelPeer::new(removed_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(register_peer)?;
peer_clients.insert(removed_peer_idx, (removed_peer, removed_peer_client));
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect

check_status(&peer_clients, 3);

Ok(())
}

fn check_status(peer_clients: &[(&Peer, Client)], expected_blocks: u64) {
let n_peers = peer_clients.len() as u64;

for (_, peer_client) in peer_clients {
let status = peer_client.get_status().unwrap();

assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, expected_blocks);
}
}
5 changes: 1 addition & 4 deletions client/tests/integration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
pub use iroha_config::{
base::proxy::Builder,
iroha::{Configuration, ConfigurationProxy},
};
pub use iroha_config::iroha::Configuration;

mod add_account;
mod add_domain;
Expand Down
57 changes: 49 additions & 8 deletions client/tests/integration/offline_peers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use eyre::Result;
use iroha_client::client::{self, QueryResult};
use iroha_client::client::{self, Client, QueryResult};
use iroha_crypto::KeyPair;
use iroha_data_model::{
parameter::{default::MAX_TRANSACTIONS_IN_BLOCK, ParametersBuilder},
peer::{Peer as DataModelPeer, PeerId},
prelude::*,
};
use test_network::*;
use tokio::runtime::Runtime;

use super::Configuration;

#[test]
fn genesis_block_is_committed_with_some_offline_peers() -> Result<()> {
// Given
Expand All @@ -19,12 +22,6 @@ fn genesis_block_is_committed_with_some_offline_peers() -> Result<()> {
));
wait_for_genesis_committed(&network.clients(), 1);

client.submit_blocking(
ParametersBuilder::new()
.add_parameter(MAX_TRANSACTIONS_IN_BLOCK, 1u32)?
.into_set_parameters(),
)?;

//When
let alice_id: AccountId = "alice@wonderland".parse()?;
let roses = "rose#wonderland".parse()?;
Expand All @@ -41,3 +38,47 @@ fn genesis_block_is_committed_with_some_offline_peers() -> Result<()> {
assert_eq!(AssetValue::Quantity(alice_has_roses), *asset.value());
Ok(())
}

#[test]
fn register_offline_peer() -> Result<()> {
let n_peers = 4;

let (_rt, network, client) = <Network>::start_test_with_runtime(n_peers, Some(11_160));
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Configuration::pipeline_time();

let peer_clients: Vec<_> = network
.peers
.values()
.chain(core::iter::once(&network.genesis))
.map(|peer| Client::test(&peer.api_address))
.collect();

check_status(&peer_clients, 1);

let address = "128.0.0.2:8085".parse()?;
let key_pair = KeyPair::generate().unwrap();
let public_key = key_pair.public_key().clone();
let peer_id = PeerId::new(&address, &public_key);
let register_peer = RegisterExpr::new(DataModelPeer::new(peer_id));

// Wait for some time to allow peers to connect
client.submit_blocking(register_peer)?;
std::thread::sleep(pipeline_time * 2);

// Make sure status hasn't change
check_status(&peer_clients, 2);

Ok(())
}

fn check_status(peer_clients: &[Client], expected_blocks: u64) {
let n_peers = peer_clients.len() as u64;

for peer_client in peer_clients {
let status = peer_client.get_status().unwrap();

assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, expected_blocks);
}
}
2 changes: 1 addition & 1 deletion client/tests/integration/restart_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> {
let temp_dir = Arc::new(TempDir::new()?);

let mut configuration = Configuration::test();
let mut peer = <PeerBuilder>::new().with_port(10_000).build()?;
let mut peer = PeerBuilder::new().with_port(10_000).build()?;
configuration.sumeragi.trusted_peers.peers = unique_vec![peer.id.clone()];

let account_id = AccountId::from_str("alice@wonderland").unwrap();
Expand Down
7 changes: 2 additions & 5 deletions config/src/iroha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,8 @@ impl ConfigurationProxy {
message: "Torii config should have at least `p2p_addr` provided for sumeragi finalisation",
});
}
// Finally, if trusted peers were not supplied, we can fall back to inserting itself as
// the only trusted one
if sumeragi_proxy.trusted_peers.is_none() {
sumeragi_proxy.insert_self_as_trusted_peers()
}

sumeragi_proxy.insert_self_as_trusted_peers()
}

Ok(())
Expand Down
13 changes: 9 additions & 4 deletions config/src/sumeragi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,16 @@ impl ConfigurationProxy {
pub fn insert_self_as_trusted_peers(&mut self) {
let peer_id = self
.peer_id
.clone()
.as_ref()
.expect("Insertion of `self` as `trusted_peers` implies that `peer_id` field should be initialized");
self.trusted_peers = Some(TrustedPeers {
peers: unique_vec![peer_id],
});
self.trusted_peers = if let Some(mut trusted_peers) = self.trusted_peers.take() {
trusted_peers.peers.push(peer_id.clone());
Some(trusted_peers)
} else {
Some(TrustedPeers {
peers: unique_vec![peer_id.clone()],
})
};
}
}

Expand Down
37 changes: 19 additions & 18 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,27 +267,28 @@ mod valid {
topology: &Topology,
wsv: &mut WorldStateView,
) -> Result<ValidBlock, (SignedBlock, BlockValidationError)> {
let actual_commit_topology = &block.payload().commit_topology;
let expected_commit_topology = &topology.ordered_peers;

if actual_commit_topology != expected_commit_topology {
let actual_commit_topology = actual_commit_topology.clone();

return Err((
block,
BlockValidationError::TopologyMismatch {
expected: expected_commit_topology.clone(),
actual: actual_commit_topology,
},
));
}
if !block.payload().header.is_genesis() {
let actual_commit_topology = &block.payload().commit_topology;
let expected_commit_topology = &topology.ordered_peers;

if actual_commit_topology != expected_commit_topology {
let actual_commit_topology = actual_commit_topology.clone();

return Err((
block,
BlockValidationError::TopologyMismatch {
expected: expected_commit_topology.clone(),
actual: actual_commit_topology,
},
));
}

if !block.payload().header.is_genesis()
&& topology
if topology
.filter_signatures_by_roles(&[Role::Leader], block.signatures())
.is_empty()
{
return Err((block, SignatureVerificationError::LeaderMissing.into()));
{
return Err((block, SignatureVerificationError::LeaderMissing.into()));
}
}

let expected_block_height = wsv.height() + 1;
Expand Down
1 change: 0 additions & 1 deletion core/src/gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl TransactionGossiper {
.n_random_transactions(self.gossip_batch_size, &self.wsv);

if txs.is_empty() {
iroha_logger::debug!("Nothing to gossip");
return;
}

Expand Down
Loading

0 comments on commit c57ccf0

Please sign in to comment.