diff --git a/Cargo.lock b/Cargo.lock index bd767492c67..e570b2566c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,6 +305,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "ascii_table" +version = "4.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed8a80a95ab122e7cc43bfde1d51949c89ff67e0c76eb795dc045003418473e2" + [[package]] name = "assert_matches" version = "1.5.0" @@ -2904,6 +2910,7 @@ checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" name = "iroha" version = "2.0.0-rc.1.0" dependencies = [ + "ascii_table", "assert_matches", "assertables", "attohttpc", diff --git a/crates/iroha/Cargo.toml b/crates/iroha/Cargo.toml index 8b029bb9ff2..f56e0f8de98 100644 --- a/crates/iroha/Cargo.toml +++ b/crates/iroha/Cargo.toml @@ -95,3 +95,4 @@ hex = { workspace = true } assertables = { workspace = true } trybuild = { workspace = true } assert_matches = "1.5.0" +ascii_table = "4.0.4" diff --git a/crates/iroha/tests/extra_functional/connected_peers.rs b/crates/iroha/tests/extra_functional/connected_peers.rs index 915614c3fd4..bbd171fd1f3 100644 --- a/crates/iroha/tests/extra_functional/connected_peers.rs +++ b/crates/iroha/tests/extra_functional/connected_peers.rs @@ -59,7 +59,7 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> { // Unregister a peer: committed with f = `faults` then `status.peers` decrements let client = randomized_peers.choose(&mut thread_rng()).unwrap().client(); - let unregister_peer = Unregister::peer(removed_peer.peer_id()); + let unregister_peer = Unregister::peer(removed_peer.id()); spawn_blocking(move || client.submit_blocking(unregister_peer)).await??; timeout( network.sync_timeout(), @@ -78,7 +78,7 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> { assert_eq!(status.peers, 0); // Re-register the peer: committed with f = `faults` - 1 then `status.peers` increments - let register_peer = Register::peer(removed_peer.peer_id()); + let register_peer = Register::peer(removed_peer.id()); let client = randomized_peers .iter() .choose(&mut thread_rng()) @@ -109,13 +109,13 @@ async fn assert_peers_status( status.peers, expected_peers, "unexpected peers for {}", - peer.peer_id() + peer.id() ); assert_eq!( status.blocks, expected_blocks, "expected blocks for {}", - peer.peer_id() + peer.id() ); }) .collect::>() diff --git a/crates/iroha/tests/faulty_peers.rs b/crates/iroha/tests/faulty_peers.rs new file mode 100644 index 00000000000..8973b7e3415 --- /dev/null +++ b/crates/iroha/tests/faulty_peers.rs @@ -0,0 +1,649 @@ +use std::{borrow::Cow, time::Duration}; + +use eyre::Result; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use iroha_config_base::toml::WriteExt; +use iroha_data_model::{ + asset::AssetDefinition, isi::Register, parameter::BlockParameter, prelude::*, Level, +}; +use iroha_primitives::addr::socket_addr; +use iroha_test_network::{genesis_factory, once_blocks_sync, Network, NetworkBuilder}; +use iroha_test_samples::ALICE_ID; +use nonzero_ext::nonzero; +use rand::{ + prelude::{IteratorRandom, SliceRandom}, + thread_rng, +}; +use relay::P2pRelay; +use tokio::{self, task::spawn_blocking, time::timeout}; +use toml::Table; + +mod relay { + use std::{ + collections::HashMap, + iter::once, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + }; + + use futures_util::{stream::FuturesUnordered, StreamExt}; + use iroha_data_model::{peer::PeerId, prelude::Peer, Identifiable}; + use iroha_primitives::{ + addr::{socket_addr, SocketAddr}, + unique_vec::UniqueVec, + }; + use iroha_test_network::{fslock_ports::AllocatedPort, Network}; + use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + select, + sync::Notify, + task::JoinSet, + }; + + #[derive(Debug)] + pub struct P2pRelay { + peers: HashMap, + tasks: JoinSet<()>, + } + + #[derive(Debug)] + struct RelayPeer { + real_addr: SocketAddr, + /// Map of proxied destinations. + /// Key is a peer id of + mock_outgoing: HashMap, + suspend: Suspend, + } + + impl P2pRelay { + pub fn new(real_topology: &UniqueVec) -> Self { + let peers: HashMap<_, _> = real_topology + .iter() + .map(|peer| { + let id = peer.id(); + let real_addr = peer.address().clone(); + let mock_outgoing = real_topology + .iter() + .map(|peer| peer.id()) + .filter(|x| *x != id) + .map(|other_id| { + let mock_port = AllocatedPort::new(); + let mock_addr = socket_addr!(127.0.0.1:*mock_port); + (other_id.clone(), (mock_addr, mock_port)) + }) + .collect(); + let relay_peer = RelayPeer { + real_addr, + mock_outgoing, + suspend: Suspend::new(id.clone()), + }; + (id.clone(), relay_peer) + }) + .collect(); + + let mut table = ascii_table::AsciiTable::default(); + table.set_max_width(30 * (1 + real_topology.len())); + table.column(0).set_header("From"); + for (i, id) in real_topology.iter().enumerate() { + table + .column(i + 1) + .set_header(format!("To {}", id.address())); + } + table.print(real_topology.iter().map(|peer| { + once(format!("{}", peer.address())) + .chain(real_topology.iter().map(|other_peer| { + if other_peer.id() == peer.id() { + "".to_string() + } else { + let (mock_addr, _) = peers + .get(peer.id()) + .unwrap() + .mock_outgoing + .get(other_peer.id()) + .unwrap(); + format!("{mock_addr}") + } + })) + .collect::>() + })); + + Self { + peers, + tasks: <_>::default(), + } + } + + pub fn for_network(network: &Network) -> Self { + Self::new( + &network + .peers() + .iter() + .map(|peer| Peer::new(peer.p2p_address(), peer.id())) + .collect(), + ) + } + + pub fn trusted_peers_for(&self, peer: &PeerId) -> UniqueVec { + let peer_info = self + .peers + .get(peer) + .expect("existing peer must be supplied"); + peer_info + .mock_outgoing + .iter() + .map(|(other, (addr, _port))| Peer::new(addr.clone(), other.clone())) + .chain(Some(Peer::new(peer_info.real_addr.clone(), peer.clone()))) + .collect() + } + + pub fn start(&mut self) { + for (_peer_id, peer) in self.peers.iter() { + for (other_id, (other_mock_addr, _)) in peer.mock_outgoing.iter() { + let other_peer = self.peers.get(other_id).expect("must be present"); + let suspend = + SuspendIfAny(vec![peer.suspend.clone(), other_peer.suspend.clone()]); + + P2pRelay::run_proxy( + &mut self.tasks, + other_mock_addr.clone(), + other_peer.real_addr.clone(), + suspend, + ); + } + } + } + + fn run_proxy( + tasks: &mut JoinSet<()>, + from: SocketAddr, + to: SocketAddr, + suspend: SuspendIfAny, + ) { + // eprintln!("proxy: {from} → {to}"); + let mut proxy = Proxy::new(from, to, suspend); + + tasks.spawn(async move { + if let Err(err) = proxy.run().await { + eprintln!("proxy at {} exited with an error: {err}", proxy.from); + } else { + eprintln!("proxy exited normally"); + } + }); + } + + pub fn suspend(&self, peer: &PeerId) -> Suspend { + self.peers + .get(peer) + .expect("must be present") + .suspend + .clone() + } + } + + #[derive(Clone, Debug)] + pub struct Suspend { + peer_id: PeerId, + active: Arc, + notify: Arc, + } + + impl Suspend { + fn new(peer_id: PeerId) -> Self { + Self { + peer_id, + active: <_>::default(), + notify: <_>::default(), + } + } + + pub fn activate(&self) { + self.active.store(true, Ordering::Release); + eprintln!("suspended {}", self.peer_id) + } + + pub fn deactivate(&self) { + self.active.store(false, Ordering::Release); + self.notify.notify_waiters(); + eprintln!("unsuspended {}", self.peer_id) + } + } + + #[derive(Clone, Debug)] + struct SuspendIfAny(Vec); + + impl SuspendIfAny { + async fn is_not_active(&self) { + loop { + let waited_for = self + .0 + .iter() + .filter_map(|x| { + x.active + .load(Ordering::Acquire) + .then_some(x.notify.notified()) + }) + .collect::>() + .collect::>() + .await + .len(); + if waited_for == 0 { + break; + } + } + } + } + + struct Proxy { + from: SocketAddr, + to: SocketAddr, + suspend: SuspendIfAny, + } + + impl Proxy { + fn new(from: SocketAddr, to: SocketAddr, suspend: SuspendIfAny) -> Self { + Self { from, to, suspend } + } + + async fn run(&mut self) -> eyre::Result<()> { + let listener = TcpListener::bind(self.from.to_string()).await?; + loop { + let (client, _) = listener.accept().await?; + let server = TcpStream::connect(self.to.to_string()).await?; + + let (mut eread, mut ewrite) = client.into_split(); + let (mut oread, mut owrite) = server.into_split(); + + let suspend = self.suspend.clone(); + let e2o = + tokio::spawn( + async move { Proxy::copy(&suspend, &mut eread, &mut owrite).await }, + ); + let suspend = self.suspend.clone(); + let o2e = + tokio::spawn( + async move { Proxy::copy(&suspend, &mut oread, &mut ewrite).await }, + ); + + select! { + _ = e2o => { + // eprintln!("{} → {}: client-to-server closed ×", self.from, self.to); + }, + _ = o2e => { + // eprintln!("{} → {}: server-to-client closed ×", self.from, self.to); + }, + } + } + } + + async fn copy( + suspend: &SuspendIfAny, + mut reader: R, + mut writer: W, + ) -> std::io::Result<()> + where + R: AsyncRead + Unpin, + W: AsyncWrite + Unpin, + { + // NOTE: stack overflow happens without the box + let mut buf = Box::new([0u8; 2usize.pow(20)]); + + loop { + suspend.is_not_active().await; + + let n = reader.read(&mut *buf).await?; + if n == 0 { + break; + } + + writer.write_all(&buf[..n]).await?; + } + + Ok(()) + } + } +} + +#[derive(Default)] +enum GenesisPeer { + #[default] + Whichever, + Nth(usize), +} + +async fn start_network_under_relay( + network: &Network, + relay: &P2pRelay, + genesis_peer: GenesisPeer, +) -> Result<()> { + timeout( + network.peer_startup_timeout(), + network + .peers() + .iter() + .enumerate() + .map(|(i, peer)| { + let config = network.config_layers().chain(Some(Cow::Owned( + Table::new() + .write( + ["sumeragi", "trusted_peers"], + relay.trusted_peers_for(&peer.id()), + ) + // We don't want peers to gossip any actual addresses, because each peer has + // its own set of incoming and outgoing proxies with every other peer. + // Thus, we are giving this addr which should always reject connections and + // peers should rely on what they got in the `sumeragi.trusted_peers`. + .write( + ["network", "public_address"], + // This IP is in the range of IPs reserved for "documentation and examples" + // https://en.wikipedia.org/wiki/Reserved_IP_addresses#:~:text=192.0.2.0/24 + socket_addr!(192.0.2.133:1337), + ), + ))); + + let genesis = match genesis_peer { + GenesisPeer::Whichever => i == 0, + GenesisPeer::Nth(n) => i == n, + } + .then(|| { + Cow::Owned(genesis_factory( + network.genesis_isi().clone(), + network.peers().iter().map(|x| x.id()).collect(), + )) + }); + + async move { + peer.start_checked(config, genesis) + .await + .expect("must start"); + } + }) + .collect::>() + .collect::>(), + ) + .await?; + + Ok(()) +} + +#[tokio::test] +async fn network_starts_with_relay() -> Result<()> { + let network = NetworkBuilder::new().with_peers(4).build(); + let mut relay = P2pRelay::for_network(&network); + + relay.start(); + start_network_under_relay(&network, &relay, GenesisPeer::Whichever).await?; + network.ensure_blocks(1).await?; + + Ok(()) +} + +#[tokio::test] +async fn network_doesnt_start_without_relay_being_started() -> Result<()> { + let network = NetworkBuilder::new().with_peers(4).build(); + let relay = P2pRelay::for_network(&network); + + start_network_under_relay(&network, &relay, GenesisPeer::Whichever).await?; + + let Err(_) = timeout( + Duration::from_secs(3), + once_blocks_sync(network.peers().iter(), 1), + ) + .await + else { + panic!("network must not start!") + }; + + Ok(()) +} + +#[tokio::test] +async fn suspending_works() -> Result<()> { + const SYNC: Duration = Duration::from_secs(3); + const N_PEERS: usize = 4; + const { assert!(N_PEERS > 0) }; + + let network = NetworkBuilder::new().with_peers(N_PEERS).build(); + let mut relay = P2pRelay::for_network(&network); + start_network_under_relay(&network, &relay, GenesisPeer::Whichever).await?; + // we will plug/unplug the last peer who doesn't have the genesis + let last_peer = network + .peers() + .last() + .expect("there are more than 0 of them"); + let suspend = relay.suspend(&last_peer.id()); + + suspend.activate(); + relay.start(); + + // all peers except the last one should get the genesis + timeout( + SYNC, + once_blocks_sync(network.peers().iter().take(N_PEERS - 1), 1), + ) + .await??; + let Err(_) = timeout(SYNC, last_peer.once_block(1)).await else { + panic!("should not get block within timeout!") + }; + + // unsuspend, the last peer should get the block too + suspend.deactivate(); + timeout(SYNC, last_peer.once_block(1)).await?; + + Ok(()) +} + +#[tokio::test] +async fn block_after_genesis_is_synced() -> Result<()> { + // A consensus anomaly occurred deterministically depending on the peer set + // (how public keys of different peers are sorted to each other, which determines consensus + // roles) and which peer submits the genesis. The values below are an experimentally found + // case. + const SEED: &str = "we want a determined order of peers"; + const GENESIS_PEER_INDEX: usize = 3; + + let network = NetworkBuilder::new() + .with_base_seed(SEED) + .with_peers(5) + .with_pipeline_time(Duration::from_secs(6)) + .build(); + let mut relay = P2pRelay::for_network(&network); + + relay.start(); + start_network_under_relay(&network, &relay, GenesisPeer::Nth(GENESIS_PEER_INDEX)).await?; + network.ensure_blocks(1).await?; + + for peer in network.peers() { + relay.suspend(&peer.id()).activate(); + } + let client = network.client(); + spawn_blocking(move || client.submit(Log::new(Level::INFO, "tick".to_owned()))).await??; + let Err(_) = timeout( + Duration::from_secs(3), + once_blocks_sync(network.peers().iter(), 2), + ) + .await + else { + panic!("should not sync with relay being suspended") + }; + for peer in network.peers() { + relay.suspend(&peer.id()).deactivate(); + } + timeout( + Duration::from_secs(2), + once_blocks_sync(network.peers().iter(), 2), + ) + .await??; + + Ok(()) +} + +// ======= ACTUAL TESTS BEGIN HERE ======= + +struct UnstableNetwork { + n_peers: usize, + n_faulty_peers: usize, + n_transactions: usize, + force_soft_fork: bool, +} + +impl UnstableNetwork { + async fn run(self) -> Result<()> { + assert!(self.n_peers > self.n_faulty_peers); + + let account_id = ALICE_ID.clone(); + let asset_definition_id: AssetDefinitionId = "camomile#wonderland".parse().expect("Valid"); + + let network = NetworkBuilder::new() + .with_peers(self.n_peers) + .with_config_layer(|cfg| { + if self.force_soft_fork { + cfg.write(["sumeragi", "debug", "force_soft_fork"], true); + } + }) + .with_genesis_instruction(SetParameter::new(Parameter::Block( + BlockParameter::MaxTransactions(nonzero!(1u64)), + ))) + .build(); + let mut relay = P2pRelay::for_network(&network); + start_network_under_relay(&network, &relay, GenesisPeer::Whichever).await?; + + relay.start(); + { + let client = network.client(); + let isi = + Register::asset_definition(AssetDefinition::numeric(asset_definition_id.clone())); + spawn_blocking(move || client.submit_blocking(isi)).await??; + } + let init_blocks = 2; + network.ensure_blocks(init_blocks).await?; + + for i in 0..self.n_transactions { + eprintln!("round {} of {}", i + 1, self.n_transactions); + // Make random peers faulty. + let faulty: Vec<_> = network + .peers() + .choose_multiple(&mut thread_rng(), self.n_faulty_peers) + .map(|peer| peer.id()) + .collect(); + for peer in &faulty { + relay.suspend(peer).activate(); + } + + // When minted + let quantity = Numeric::ONE; + let mint_asset = Mint::asset_numeric( + quantity, + AssetId::new(asset_definition_id.clone(), account_id.clone()), + ); + let some_peer = network + .peers() + .iter() + .filter(|x| !faulty.contains(&x.id())) + .choose(&mut thread_rng()) + .expect("there should be some working peers"); + eprintln!("submitting via peer {}", some_peer.id()); + let client = some_peer.client(); + spawn_blocking(move || client.submit(mint_asset)).await??; + + // Then all non-faulty peers get the new block + timeout( + network.sync_timeout(), + once_blocks_sync( + network.peers().iter().filter(|x| !faulty.contains(&x.id())), + init_blocks + (i as u64) + 1, + ), + ) + .await??; + + // Return all peers to normal function. + for peer in &faulty { + relay.suspend(peer).deactivate(); + } + + // await for sync so that we can start the next round + network.ensure_blocks(init_blocks + (i as u64) + 1).await?; + } + + // Then there are N assets minted + let client = network.client(); + let asset = spawn_blocking(move || { + client + .query(FindAssets) + .filter_with(|asset| asset.id.definition_id.eq(asset_definition_id)) + .execute_all() + }) + .await?? + .into_iter() + .next() + .expect("there should be 1 result"); + assert_eq!( + *asset.value(), + AssetValue::Numeric(Numeric::new(self.n_transactions as u128, 0)) + ); + + Ok(()) + } +} + +#[tokio::test] +async fn unstable_network_5_peers_1_fault() -> Result<()> { + UnstableNetwork { + n_peers: 5, + n_faulty_peers: 1, + n_transactions: 20, + force_soft_fork: false, + } + .run() + .await +} + +#[tokio::test] +async fn soft_fork() -> Result<()> { + UnstableNetwork { + n_peers: 4, + n_faulty_peers: 0, + n_transactions: 20, + force_soft_fork: true, + } + .run() + .await +} + +#[tokio::test] +async fn unstable_network_8_peers_1_fault() -> Result<()> { + UnstableNetwork { + n_peers: 8, + n_faulty_peers: 1, + n_transactions: 20, + force_soft_fork: false, + } + .run() + .await +} + +#[tokio::test] +async fn unstable_network_9_peers_2_faults() -> Result<()> { + UnstableNetwork { + n_peers: 9, + n_faulty_peers: 2, + n_transactions: 5, + force_soft_fork: false, + } + .run() + .await +} + +#[tokio::test] +async fn unstable_network_9_peers_4_faults() -> Result<()> { + UnstableNetwork { + n_peers: 9, + n_faulty_peers: 4, + n_transactions: 5, + force_soft_fork: false, + } + .run() + .await +} diff --git a/crates/iroha/tests/permissions.rs b/crates/iroha/tests/permissions.rs index 13559c9c5fd..23f03f4b6f1 100644 --- a/crates/iroha/tests/permissions.rs +++ b/crates/iroha/tests/permissions.rs @@ -27,12 +27,13 @@ async fn genesis_transactions_are_validated_by_executor() { let network = NetworkBuilder::new() .with_genesis_instruction(invalid_instruction) .build(); + let genesis = genesis_factory(network.genesis_isi().clone(), network.topology()); let peer = network.peer(); timeout(Duration::from_secs(3), async { join!( // Peer should start... - peer.start(network.config(), Some(network.genesis())), + peer.start(network.config(), Some(&genesis)), peer.once(|event| matches!(event, PeerLifecycleEvent::ServerStarted)), // ...but it should shortly exit with an error peer.once(|event| match event { diff --git a/crates/iroha_core/src/sumeragi/main_loop.rs b/crates/iroha_core/src/sumeragi/main_loop.rs index efdfd30db89..13e06a64574 100644 --- a/crates/iroha_core/src/sumeragi/main_loop.rs +++ b/crates/iroha_core/src/sumeragi/main_loop.rs @@ -85,6 +85,7 @@ impl Sumeragi { data: NetworkMessage::SumeragiBlock(Box::new(packet)), peer_id: peer.clone(), }; + trace!(?post, "Posting a packet"); self.network.post(post); } @@ -105,6 +106,7 @@ impl Sumeragi { let broadcast = iroha_p2p::Broadcast { data: NetworkMessage::SumeragiBlock(Box::new(msg.into())), }; + trace!(?broadcast, "Broadcasting a packet"); self.network.broadcast(broadcast); } @@ -136,23 +138,19 @@ impl Sumeragi { let mut should_sleep = true; for _ in 0..MAX_CONTROL_MSG_IN_A_ROW { match self.control_message_receiver.try_recv() { - Ok(msg) => { + Ok(ControlFlowMessage::ViewChangeProof(proof)) => { should_sleep = false; - if let Err(error) = view_change_proof_chain.insert_proof( - msg.view_change_proof, - &self.topology, - latest_block, - ) { + trace!("Inserting a proof into the view change proof chain"); + if let Err(error) = + view_change_proof_chain.insert_proof(proof, &self.topology, latest_block) + { trace!(%error, "Failed to add proof into view change proof chain") } } Err(mpsc::TryRecvError::Disconnected) => { return Err(ReceiveNetworkPacketError::ChannelDisconnected) } - Err(err) => { - trace!(%err, "Failed to receive control message"); - break; - } + Err(mpsc::TryRecvError::Empty) => break, } } @@ -177,8 +175,7 @@ impl Sumeragi { Err(mpsc::TryRecvError::Disconnected) => { return Err(ReceiveNetworkPacketError::ChannelDisconnected) } - Err(err) => { - trace!(%err, "Failed to receive message"); + Err(mpsc::TryRecvError::Empty) => { return Ok(None); } }; @@ -1152,6 +1149,7 @@ pub(crate) fn run( peer_id=%sumeragi.peer, role=%sumeragi.role(), block=%block.as_ref().hash(), + %view_change_index, "Block not committed in due time, requesting view change..." ); } else { @@ -1161,6 +1159,7 @@ pub(crate) fn run( warn!( peer_id=%sumeragi.peer, role=%sumeragi.role(), + %view_change_index, "No block produced in due time, requesting view change..." ); } @@ -1185,7 +1184,7 @@ pub(crate) fn run( view_change_proof_chain.get_proof_for_view_change(view_change_index) }) { - let msg = ControlFlowMessage::new(latest_verified_proof); + let msg = ControlFlowMessage::ViewChangeProof(latest_verified_proof); sumeragi.broadcast_control_flow_packet(msg); } @@ -1195,7 +1194,7 @@ pub(crate) fn run( if let Some(proof_for_current_view_change_index) = view_change_proof_chain.get_proof_for_view_change(view_change_index) { - let msg = ControlFlowMessage::new(proof_for_current_view_change_index); + let msg = ControlFlowMessage::ViewChangeProof(proof_for_current_view_change_index); sumeragi.broadcast_control_flow_packet(msg); } diff --git a/crates/iroha_core/src/sumeragi/message.rs b/crates/iroha_core/src/sumeragi/message.rs index 94ee7503f91..d2c7c33ae31 100644 --- a/crates/iroha_core/src/sumeragi/message.rs +++ b/crates/iroha_core/src/sumeragi/message.rs @@ -21,19 +21,12 @@ pub enum BlockMessage { BlockSyncUpdate(BlockSyncUpdate), } -/// Specialization of `MessagePacket` +/// Specialization of [`BlockMessage`] #[derive(Debug, Clone, Decode, Encode)] -pub struct ControlFlowMessage { +pub enum ControlFlowMessage { /// Proof of view change. As part of this message handling, all /// peers which agree with view change should sign it. - pub view_change_proof: view_change::SignedViewChangeProof, -} - -impl ControlFlowMessage { - /// Helper function to construct a `ControlFlowMessage` - pub fn new(view_change_proof: view_change::SignedViewChangeProof) -> ControlFlowMessage { - ControlFlowMessage { view_change_proof } - } + ViewChangeProof(view_change::SignedViewChangeProof), } /// `BlockCreated` message structure. diff --git a/crates/iroha_core/src/sumeragi/mod.rs b/crates/iroha_core/src/sumeragi/mod.rs index c256b4eddc3..cfdbd2b4aeb 100644 --- a/crates/iroha_core/src/sumeragi/mod.rs +++ b/crates/iroha_core/src/sumeragi/mod.rs @@ -48,7 +48,7 @@ pub struct SumeragiHandle { impl SumeragiHandle { /// Deposit a sumeragi control flow network message. pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) { - trace!(ty = "ViewChangeProofChain", "Incoming message"); + trace!(?msg, "Incoming control flow message"); if let Err(error) = self.control_message_sender.try_send(msg) { #[cfg(feature = "telemetry")] self.dropped_messages_metric.inc(); @@ -65,16 +65,7 @@ impl SumeragiHandle { /// Deposit a sumeragi network message. pub fn incoming_block_message(&self, msg: impl Into) { let msg = msg.into(); - let (ty, block) = match &msg { - BlockMessage::BlockCommitted(BlockCommitted { hash, .. }) => ("BlockCommitted", *hash), - BlockMessage::BlockCreated(BlockCreated { block }) => ("BlockCreated", block.hash()), - BlockMessage::BlockSigned(BlockSigned { hash, .. }) => ("BlockSigned", *hash), - BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => { - trace!(ty="BlockSyncUpdate", block=%block.hash(), "Incoming message"); - ("BlockSyncUpdate", block.hash()) - } - }; - trace!(ty, %block, "Incoming message"); + trace!(?msg, "Incoming block message"); if let Err(error) = self.message_sender.try_send(msg) { #[cfg(feature = "telemetry")] diff --git a/crates/iroha_core/src/sumeragi/network_topology.rs b/crates/iroha_core/src/sumeragi/network_topology.rs index cea5eab73e6..e6f8f417986 100644 --- a/crates/iroha_core/src/sumeragi/network_topology.rs +++ b/crates/iroha_core/src/sumeragi/network_topology.rs @@ -9,6 +9,7 @@ use iroha_data_model::{block::BlockSignature, prelude::PeerId}; /// The ordering of the peers which defines their roles in the current round of consensus. /// +/// ```txt /// A | | |>| |->| /// B | | | | | V /// C | A Set | ^ V Rotate A Set ^ | @@ -16,6 +17,7 @@ use iroha_data_model::{block::BlockSignature, prelude::PeerId}; /// E | | |<| ^ | /// F | B Set | | V /// G | f | |<-| +/// ``` /// /// Above is an illustration of how the various operations work for a f = 2 topology. #[derive(Debug, Clone, PartialEq, Eq)] @@ -74,8 +76,8 @@ impl Topology { self.0.iter() } - /// True, if the topology contains at least one peer and thus requires consensus - pub fn is_non_empty(&self) -> Option { + /// Some, if the topology contains at least one peer and thus requires consensus + pub fn as_non_empty(&self) -> Option { (!self.0.is_empty()).then_some(NonEmptyTopology { topology: self }) } @@ -134,7 +136,7 @@ impl Topology { let mut filtered = IndexSet::new(); for role in roles { - match (role, self.is_non_empty(), self.is_consensus_required()) { + match (role, self.as_non_empty(), self.is_consensus_required()) { (Role::Leader, Some(topology), _) => { filtered.insert(topology.leader_index()); } @@ -538,7 +540,7 @@ mod tests { assert_eq!( topology - .is_non_empty() + .as_non_empty() .as_ref() .map(NonEmptyTopology::leader), Some(&peers[0]) @@ -552,7 +554,7 @@ mod tests { assert_eq!( topology - .is_non_empty() + .as_non_empty() .as_ref() .map(NonEmptyTopology::leader), Some(&peers[0]) @@ -566,7 +568,7 @@ mod tests { assert_eq!( topology - .is_non_empty() + .as_non_empty() .as_ref() .map(NonEmptyTopology::leader), Some(&peers[0]) @@ -580,7 +582,7 @@ mod tests { assert_eq!( topology - .is_non_empty() + .as_non_empty() .as_ref() .map(NonEmptyTopology::leader), Some(&peers[0]) diff --git a/crates/iroha_core/src/sumeragi/view_change.rs b/crates/iroha_core/src/sumeragi/view_change.rs index ce7c7f932e2..43609c41028 100644 --- a/crates/iroha_core/src/sumeragi/view_change.rs +++ b/crates/iroha_core/src/sumeragi/view_change.rs @@ -120,7 +120,7 @@ pub struct ProofChain(BTreeMap); impl ProofChain { /// Find next index to last verified view change proof. - /// Proof is verified if it has more or qual ot f + 1 valid signatures. + /// Proof is verified if it has more or equal to f + 1 valid signatures. pub fn verify_with_state( &self, topology: &Topology, diff --git a/crates/iroha_data_model/src/peer.rs b/crates/iroha_data_model/src/peer.rs index ef398bf0ecb..a7e6a13427b 100644 --- a/crates/iroha_data_model/src/peer.rs +++ b/crates/iroha_data_model/src/peer.rs @@ -23,7 +23,6 @@ mod model { /// Peer's identification. /// - /// Equality is tested by `public_key` field only. /// Each peer should have a unique public key. #[derive( DebugCustom, diff --git a/crates/iroha_p2p/src/network.rs b/crates/iroha_p2p/src/network.rs index 27163113146..8f6bd8ceab2 100644 --- a/crates/iroha_p2p/src/network.rs +++ b/crates/iroha_p2p/src/network.rs @@ -247,7 +247,6 @@ struct NetworkBase { impl NetworkBase { /// [`Self`] task. - #[log(skip(self, shutdown_signal), fields(listen_addr=%self.listen_addr, public_key=%self.key_pair.public_key()))] async fn run(mut self, shutdown_signal: ShutdownSignal) { // TODO: probably should be configuration parameter let mut update_topology_interval = tokio::time::interval(Duration::from_millis(1000)); diff --git a/crates/iroha_test_network/src/config.rs b/crates/iroha_test_network/src/config.rs index eef0ca09c1e..835449e982d 100644 --- a/crates/iroha_test_network/src/config.rs +++ b/crates/iroha_test_network/src/config.rs @@ -33,7 +33,8 @@ pub fn base_iroha_config() -> Table { .write(["snapshot", "mode"], "disabled") .write(["kura", "store_dir"], "./storage") .write(["network", "block_gossip_size"], 1) - .write(["logger", "level"], "DEBUG") + .write(["logger", "level"], "TRACE") + .write(["logger", "format"], "json") } pub fn genesis( diff --git a/crates/iroha_test_network/src/lib.rs b/crates/iroha_test_network/src/lib.rs index 187ca60eb0c..940c7509c02 100644 --- a/crates/iroha_test_network/src/lib.rs +++ b/crates/iroha_test_network/src/lib.rs @@ -1,10 +1,12 @@ //! Puppeteer for `irohad`, to create test networks mod config; -mod fslock_ports; +pub mod fslock_ports; use core::{fmt::Debug, time::Duration}; use std::{ + borrow::Cow, + iter, ops::Deref, path::{Path, PathBuf}, process::{ExitStatus, Stdio}, @@ -23,7 +25,7 @@ use iroha_config::base::{ read::ConfigReader, toml::{TomlSource, WriteExt as _, Writer as TomlWriter}, }; -use iroha_crypto::{ExposedPrivateKey, KeyPair, PrivateKey}; +use iroha_crypto::{Algorithm, ExposedPrivateKey, KeyPair, PrivateKey}; use iroha_data_model::{ events::pipeline::BlockEventFilter, isi::InstructionBox, @@ -31,12 +33,14 @@ use iroha_data_model::{ ChainId, }; use iroha_genesis::GenesisBlock; -use iroha_primitives::{addr::socket_addr, unique_vec::UniqueVec}; +use iroha_primitives::{ + addr::{socket_addr, SocketAddr}, + unique_vec::UniqueVec, +}; use iroha_telemetry::metrics::Status; use iroha_test_samples::{ALICE_ID, ALICE_KEYPAIR, PEER_KEYPAIR, SAMPLE_GENESIS_ACCOUNT_KEYPAIR}; use parity_scale_codec::Encode; use rand::{prelude::IteratorRandom, thread_rng}; -use tempfile::TempDir; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -48,11 +52,13 @@ use tokio::{ }; use toml::Table; +pub use crate::config::genesis as genesis_factory; + const INSTANT_PIPELINE_TIME: Duration = Duration::from_millis(10); const DEFAULT_BLOCK_SYNC: Duration = Duration::from_millis(150); const PEER_START_TIMEOUT: Duration = Duration::from_secs(30); const PEER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); -const SYNC_TIMEOUT: Duration = Duration::from_secs(30); +const SYNC_TIMEOUT: Duration = Duration::from_secs(5); fn iroha_bin() -> impl AsRef { static PATH: OnceLock = OnceLock::new(); @@ -82,15 +88,27 @@ fn tempdir_in() -> Option> { .as_ref() } +fn generate_and_keep_temp_dir() -> PathBuf { + let mut builder = tempfile::Builder::new(); + builder.keep(true).prefix(TEMPDIR_PREFIX); + match tempdir_in() { + Some(create_within) => builder.tempdir_in(create_within), + None => builder.tempdir(), + } + .expect("tempdir creation should work") + .path() + .to_path_buf() +} + /// Network of peers pub struct Network { peers: Vec, - genesis: GenesisBlock, + genesis_isi: Vec, block_time: Duration, commit_time: Duration, - config: Table, + config_layers: Vec, } impl Network { @@ -129,22 +147,13 @@ impl Network { .iter() .enumerate() .map(|(i, peer)| async move { - let failure = async move { - peer.once(|e| matches!(e, PeerLifecycleEvent::Terminated { .. })) - .await; - panic!("a peer exited unexpectedly"); - }; - - let start = async move { - peer.start(self.config(), (i == 0).then_some(&self.genesis)) - .await; - peer.once_block(1).await; - }; - - tokio::select! { - _ = failure => {}, - _ = start => {}, - } + peer.start_checked( + self.config_layers(), + (i == 0).then_some(Cow::Owned(self.genesis())), + ) + .await + .expect("peer failed to start"); + peer.once_block(1).await; }) .collect::>() .collect::>(), @@ -183,15 +192,29 @@ impl Network { /// Base configuration of all peers. /// /// Includes `sumeragi.trusted_peers` parameter, containing all currently present peers. - pub fn config(&self) -> Table { - self.config - .clone() - .write(["sumeragi", "trusted_peers"], self.topology()) + pub fn config_layers(&self) -> impl Iterator> { + self.config_layers + .iter() + .map(Cow::Borrowed) + .chain(Some(Cow::Owned( + Table::new().write(["sumeragi", "trusted_peers"], self.trusted_peers()), + ))) } /// Network genesis block. - pub fn genesis(&self) -> &GenesisBlock { - &self.genesis + /// + /// It uses the basic [`genesis_factory`] with [`Self::genesis_isi`] + + /// topology of the network peers. + pub fn genesis(&self) -> GenesisBlock { + genesis_factory( + self.genesis_isi.clone(), + self.peers.iter().map(NetworkPeer::id).collect(), + ) + } + + /// Base network instructions included in the genesis block. + pub fn genesis_isi(&self) -> &Vec { + &self.genesis_isi } /// Shutdown running peers @@ -206,8 +229,11 @@ impl Network { self } - fn topology(&self) -> UniqueVec { - self.peers.iter().map(|x| x.id.clone()).collect() + fn trusted_peers(&self) -> UniqueVec { + self.peers + .iter() + .map(|x| Peer::new(x.p2p_address(), x.id())) + .collect() } /// Resolves when all _running_ peers have at least N blocks @@ -216,17 +242,12 @@ impl Network { pub async fn ensure_blocks(&self, height: u64) -> Result<&Self> { timeout( self.sync_timeout(), - self.peers - .iter() - .filter(|x| x.is_running()) - .map(|x| x.once_block(height)) - .collect::>() - .collect::>(), + once_blocks_sync(self.peers.iter().filter(|x| x.is_running()), height), ) .await .wrap_err_with(|| { eyre!("Network hasn't reached the height of {height} block(s) within timeout") - })?; + })??; eprintln!("network reached height={height}"); @@ -237,9 +258,10 @@ impl Network { /// Builder of [`Network`] pub struct NetworkBuilder { n_peers: usize, - config: Table, + config_layers: Vec
, pipeline_time: Option, - extra_isi: Vec, + genesis_isi: Vec, + seed: Option, } impl Default for NetworkBuilder { @@ -254,9 +276,10 @@ impl NetworkBuilder { pub fn new() -> Self { Self { n_peers: 1, - config: config::base_iroha_config(), + config_layers: vec![], pipeline_time: Some(INSTANT_PIPELINE_TIME), - extra_isi: vec![], + genesis_isi: vec![], + seed: None, } } @@ -293,75 +316,83 @@ impl NetworkBuilder { /// ``` /// use iroha_test_network::NetworkBuilder; /// - /// NetworkBuilder::new().with_config(|t| { + /// NetworkBuilder::new().with_config_layer(|t| { /// t.write(["logger", "level"], "DEBUG"); /// }); /// ``` - pub fn with_config(mut self, f: F) -> Self + pub fn with_config_layer(mut self, f: F) -> Self where for<'a> F: FnOnce(&'a mut TomlWriter<'a>), { - let mut writer = TomlWriter::new(&mut self.config); + let mut table = Table::new(); + let mut writer = TomlWriter::new(&mut table); f(&mut writer); + self.config_layers.push(table); self } /// Append an instruction to genesis. pub fn with_genesis_instruction(mut self, isi: impl Into) -> Self { - self.extra_isi.push(isi.into()); + self.genesis_isi.push(isi.into()); + self + } + + pub fn with_base_seed(mut self, seed: impl ToString) -> Self { + self.seed = Some(seed.to_string()); self } /// Build the [`Network`]. Doesn't start it. pub fn build(self) -> Network { - let peers: Vec<_> = (0..self.n_peers).map(|_| NetworkPeer::generate()).collect(); - - let topology: UniqueVec<_> = peers.iter().map(|peer| peer.peer_id()).collect(); + let network_dir = generate_and_keep_temp_dir(); + let peers: Vec<_> = (0..self.n_peers) + .map(|i| { + let peer_dir = network_dir.join(format!("peer{i}")); + std::fs::create_dir_all(&peer_dir).unwrap(); + let seed = self.seed.as_ref().map(|x| format!("{x}-peer-{i}")); + NetworkPeerBuilder::new() + .with_dir(Some(peer_dir)) + .with_seed(seed.as_ref().map(|x| x.as_bytes())) + .build() + }) + .collect(); let block_sync_gossip_period = DEFAULT_BLOCK_SYNC; - let mut extra_isi = vec![]; let block_time; let commit_time; if let Some(duration) = self.pipeline_time { block_time = duration / 3; commit_time = duration / 2; - extra_isi.extend([ - InstructionBox::SetParameter(SetParameter::new(Parameter::Sumeragi( - SumeragiParameter::BlockTimeMs(block_time.as_millis() as u64), - ))), - InstructionBox::SetParameter(SetParameter::new(Parameter::Sumeragi( - SumeragiParameter::CommitTimeMs(commit_time.as_millis() as u64), - ))), - ]); } else { block_time = SumeragiParameters::default().block_time(); commit_time = SumeragiParameters::default().commit_time(); } - let genesis = config::genesis( - [ - InstructionBox::SetParameter(SetParameter::new(Parameter::Sumeragi( - SumeragiParameter::BlockTimeMs(block_time.as_millis() as u64), - ))), - InstructionBox::SetParameter(SetParameter::new(Parameter::Sumeragi( - SumeragiParameter::CommitTimeMs(commit_time.as_millis() as u64), - ))), - ] - .into_iter() - .chain(self.extra_isi), - topology, - ); + let genesis_isi = [ + InstructionBox::SetParameter(SetParameter::new(Parameter::Sumeragi( + SumeragiParameter::BlockTimeMs(block_time.as_millis() as u64), + ))), + InstructionBox::SetParameter(SetParameter::new(Parameter::Sumeragi( + SumeragiParameter::CommitTimeMs(commit_time.as_millis() as u64), + ))), + ] + .into_iter() + .chain(self.genesis_isi) + .collect(); Network { peers, - genesis, + genesis_isi, block_time, commit_time, - config: self.config.write( + config_layers: Some(config::base_iroha_config().write( ["network", "block_gossip_period_ms"], block_sync_gossip_period.as_millis() as u64, - ), + )) + .into_iter() + .chain(self.config_layers.into_iter()) + .collect(), } } @@ -457,9 +488,8 @@ pub enum PeerLifecycleEvent { /// When dropped, aborts the child process (if it is running). #[derive(Clone, Debug)] pub struct NetworkPeer { - id: Peer, key_pair: KeyPair, - dir: Arc, + dir: PathBuf, run: Arc>>, runs_count: Arc, is_running: Arc, @@ -471,50 +501,6 @@ pub struct NetworkPeer { } impl NetworkPeer { - /// Generate a random peer - pub fn generate() -> Self { - let key_pair = KeyPair::random(); - let port_p2p = AllocatedPort::new(); - let port_api = AllocatedPort::new(); - let id = Peer::new( - socket_addr!(127.0.0.1:*port_p2p), - key_pair.public_key().clone(), - ); - let temp_dir = Arc::new({ - let mut builder = tempfile::Builder::new(); - builder.keep(true).prefix(TEMPDIR_PREFIX); - match tempdir_in() { - Some(path) => builder.tempdir_in(path), - None => builder.tempdir(), - } - .expect("temp dirs must be available in the system") - }); - - let (events, _rx) = broadcast::channel(32); - let (block_height, _rx) = watch::channel(None); - - let result = Self { - id, - key_pair, - dir: temp_dir, - run: Default::default(), - runs_count: Default::default(), - is_running: Default::default(), - events, - block_height, - port_p2p: Arc::new(port_p2p), - port_api: Arc::new(port_api), - }; - - eprintln!( - "{} generated peer, dir: {}", - result.log_prefix(), - result.dir.path().display() - ); - - result - } - fn log_prefix(&self) -> String { format!("[PEER p2p: {}, api: {}]", self.port_p2p, self.port_api) } @@ -529,52 +515,22 @@ impl NetworkPeer { /// /// # Panics /// If peer was not started. - pub async fn start(&self, config: Table, genesis: Option<&GenesisBlock>) { + pub async fn start>( + &self, + config_layers: impl Iterator, + genesis: Option>, + ) { let mut run_guard = self.run.lock().await; assert!(run_guard.is_none(), "already running"); let run_num = self.runs_count.fetch_add(1, Ordering::Relaxed) + 1; - let log_prefix = self.log_prefix(); eprintln!("{log_prefix} starting (run #{run_num})"); - let mut config = config - .clone() - .write("public_key", self.key_pair.public_key()) - .write( - "private_key", - ExposedPrivateKey(self.key_pair.private_key().clone()), - ) - .write( - ["network", "address"], - format!("127.0.0.1:{}", self.port_p2p), - ) - .write( - ["network", "public_address"], - format!("127.0.0.1:{}", self.port_p2p), - ) - .write(["torii", "address"], format!("127.0.0.1:{}", self.port_api)) - .write(["logger", "format"], "json"); - - let config_path = self.dir.path().join(format!("run-{run_num}-config.toml")); - let genesis_path = self.dir.path().join(format!("run-{run_num}-genesis.scale")); - - if genesis.is_some() { - config = config.write(["genesis", "file"], &genesis_path); - } - - tokio::fs::write( - &config_path, - toml::to_string(&config).expect("TOML config is valid"), - ) - .await - .expect("temp directory exists and there was no config file before"); - - if let Some(genesis) = genesis { - tokio::fs::write(genesis_path, genesis.0.encode()) - .await - .expect("tmp dir is available and genesis was not written before"); - } + let config_path = self + .write_config_and_genesis(config_layers, genesis, run_num) + .await + .expect("fatal failure"); let mut cmd = tokio::process::Command::new(iroha_bin().as_ref()); cmd.stdout(Stdio::piped()) @@ -582,7 +538,7 @@ impl NetworkPeer { .kill_on_drop(true) .arg("--config") .arg(config_path); - cmd.current_dir(self.dir.path()); + cmd.current_dir(&self.dir); let mut child = cmd.spawn().expect("spawn failure is abnormal"); self.is_running.store(true, Ordering::Relaxed); let _ = self.events.send(PeerLifecycleEvent::Spawned); @@ -591,7 +547,7 @@ impl NetworkPeer { { let output = child.stdout.take().unwrap(); - let mut file = File::create(self.dir.path().join(format!("run-{run_num}-stdout.log"))) + let mut file = File::create(self.dir.join(format!("run-{run_num}-stdout.log"))) .await .unwrap(); tasks.spawn(async move { @@ -600,6 +556,9 @@ impl NetworkPeer { file.write_all(line.as_bytes()) .await .expect("writing logs to file shouldn't fail"); + file.write_all("\n".as_bytes()) + .await + .expect("shouldn't fail either"); file.flush() .await .expect("writing logs to file shouldn't fail"); @@ -609,7 +568,7 @@ impl NetworkPeer { { let log_prefix = log_prefix.clone(); let output = child.stderr.take().unwrap(); - let path = self.dir.path().join(format!("run-{run_num}-stderr.log")); + let path = self.dir.join(format!("run-{run_num}-stderr.log")); tasks.spawn(async move { let mut in_memory = PeerStderrBuffer { log_prefix, @@ -718,6 +677,38 @@ impl NetworkPeer { } } + pub async fn start_checked>( + &self, + config_layers: impl Iterator, + genesis: Option>, + ) -> Result<()> { + let failure = async move { + self.once(|e| matches!(e, PeerLifecycleEvent::Terminated { .. })) + .await; + panic!("a peer exited unexpectedly"); + }; + let start = async move { self.start(config_layers, genesis).await }; + let server_started = async move { + self.once(|e| matches!(e, PeerLifecycleEvent::ServerStarted)) + .await + }; + + let success = async move { + tokio::join!(start, server_started); + }; + + // TODO: wait for server started? + + tokio::select! { + _ = failure => { + Err(eyre!("Peer exited unexpectedly")) + }, + _ = success => { + Ok(()) + }, + } + } + /// Subscribe on peer lifecycle events. pub fn events(&self) -> broadcast::Receiver { self.events.subscribe() @@ -734,7 +725,7 @@ impl NetworkPeer { /// let peer = network.peer(); /// /// tokio::join!( - /// peer.start(network.config(), None), + /// peer.start(network.config_layers(), None), /// peer.once(|event| matches!(event, PeerLifecycleEvent::ServerStarted)) /// ); /// } @@ -776,14 +767,13 @@ impl NetworkPeer { } } - /// Generated [`Peer`] - pub fn peer(&self) -> Peer { - self.id.clone() + /// Generated [`PeerId`] + pub fn id(&self) -> PeerId { + PeerId::new(self.key_pair.public_key().clone()) } - /// Generated [`PeerId`] - pub fn peer_id(&self) -> PeerId { - self.id.id.clone() + pub fn p2p_address(&self) -> SocketAddr { + socket_addr!(127.0.0.1:**self.port_p2p) } /// Check whether the peer is running @@ -803,6 +793,7 @@ impl NetworkPeer { ["account", "private_key"], ExposedPrivateKey(account_private_key.clone()), ) + .write(["transaction", "status_timeout_ms"], 5_000) .write("torii_url", format!("http://127.0.0.1:{}", self.port_api)), )) .read_and_complete::() @@ -828,12 +819,125 @@ impl NetworkPeer { pub fn blocks(&self) -> watch::Receiver> { self.block_height.subscribe() } + + fn base_config(&self) -> Table { + Table::new() + .write("public_key", self.key_pair.public_key()) + .write( + "private_key", + ExposedPrivateKey(self.key_pair.private_key().clone()), + ) + .write(["network", "address"], self.p2p_address()) + .write(["network", "public_address"], self.p2p_address()) + .write( + ["torii", "address"], + socket_addr!(127.0.0.1:**self.port_api), + ) + } + + async fn write_config_and_genesis>( + &self, + extra_layers: impl Iterator, + genesis: Option>, + + run: usize, + ) -> Result { + let extra_layers: Vec<_> = extra_layers + .enumerate() + .map(|(i, table)| (format!("run-{run}-config.layer-{i}.toml"), table)) + .collect(); + + for (path, table) in &extra_layers { + tokio::fs::write(self.dir.join(path), toml::to_string(table.as_ref())?).await?; + } + + let mut final_config = Table::new().write( + "extends", + // should be written on peers initialisation + iter::once("config.base.toml".to_string()) + .chain(extra_layers.into_iter().map(|(path, _)| path)) + .collect::>(), + ); + if let Some(block) = genesis { + let path = self.dir.join(format!("run-{run}-genesis.scale")); + final_config = final_config.write(["genesis", "file"], &path); + tokio::fs::write(path, block.as_ref().0.encode()).await?; + } + let path = self.dir.join(format!("run-{run}-config.toml")); + tokio::fs::write(&path, toml::to_string(&final_config)?).await?; + + Ok(path) + } } /// Compare by ID impl PartialEq for NetworkPeer { fn eq(&self, other: &Self) -> bool { - self.id.eq(&other.id) + self.key_pair.eq(&other.key_pair) + } +} + +#[derive(Default)] +pub struct NetworkPeerBuilder { + dir: Option, + seed: Option>, +} + +impl NetworkPeerBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn with_dir(mut self, dir: Option>) -> Self { + self.dir = dir.map(Into::into); + self + } + + pub fn with_seed(mut self, seed: Option>>) -> Self { + self.seed = seed.map(Into::into); + self + } + + pub fn build(self) -> NetworkPeer { + let key_pair = self + .seed + .map(|seed| KeyPair::from_seed(seed, Algorithm::Ed25519)) + .unwrap_or_else(KeyPair::random); + let port_p2p = AllocatedPort::new(); + let port_api = AllocatedPort::new(); + + let dir = self.dir.unwrap_or_else(generate_and_keep_temp_dir); + + let (events, _rx) = broadcast::channel(32); + let (block_height, _rx) = watch::channel(None); + + let peer = NetworkPeer { + key_pair, + dir, + run: Default::default(), + runs_count: Default::default(), + is_running: Default::default(), + events, + block_height, + port_p2p: Arc::new(port_p2p), + port_api: Arc::new(port_api), + }; + + // FIXME: move code + std::fs::write( + peer.dir.join("config.base.toml"), + toml::to_string(&peer.base_config()).unwrap(), + ) + .unwrap(); + + eprintln!( + "{} generated peer\n dir: {}\n public key: {}", + peer.log_prefix(), + peer.dir.display(), + peer.key_pair.public_key(), + ); + + peer } } @@ -908,6 +1012,33 @@ impl PeerExit { } } +/// Wait until [`NetworkPeer::once_block`] resolves for all the peers. +pub async fn once_blocks_sync( + peers: impl Iterator, + height: u64, +) -> Result<()> { + let mut futures = peers + .map(|x| async move { + tokio::select! { + () = x.once_block(height) => { + Ok(()) + }, + () = x.once(|e| matches!(e, PeerLifecycleEvent::Terminated { .. })) => { + Err(eyre!("Peer terminated")) + } + } + }) + .collect::>(); + + loop { + match futures.next().await { + Some(Ok(())) => {} + Some(Err(e)) => return Err(e), + None => return Ok(()), + } + } +} + #[cfg(test)] mod tests { use super::*;