From 5d57d454b56d1c7e67237fec4deb927e0c938219 Mon Sep 17 00:00:00 2001 From: 0x009922 <43530070+0x009922@users.noreply.github.com> Date: Fri, 20 Sep 2024 11:19:43 +0900 Subject: [PATCH] refactor: tidy up test network code & docs Signed-off-by: 0x009922 <43530070+0x009922@users.noreply.github.com> --- .../integration/extra_functional/genesis.rs | 42 +- .../multiple_blocks_created.rs | 28 +- .../extra_functional/offline_peers.rs | 4 +- .../extra_functional/restart_peer.rs | 8 +- .../extra_functional/unregister_peer.rs | 4 +- crates/iroha/tests/integration/permissions.rs | 16 +- .../tests/integration/transfer_domain.rs | 15 +- crates/iroha_config_base/src/toml.rs | 2 + crates/iroha_test_network/src/config.rs | 98 ++++ crates/iroha_test_network/src/fslock_ports.rs | 96 ++++ crates/iroha_test_network/src/lib.rs | 482 ++++++------------ 11 files changed, 428 insertions(+), 367 deletions(-) create mode 100644 crates/iroha_test_network/src/config.rs create mode 100644 crates/iroha_test_network/src/fslock_ports.rs diff --git a/crates/iroha/tests/integration/extra_functional/genesis.rs b/crates/iroha/tests/integration/extra_functional/genesis.rs index 9c60aa6b47e..7bb4c5f214a 100644 --- a/crates/iroha/tests/integration/extra_functional/genesis.rs +++ b/crates/iroha/tests/integration/extra_functional/genesis.rs @@ -1,11 +1,13 @@ +use std::time::Duration; + use eyre::Context; use futures_util::{stream::FuturesUnordered, StreamExt}; use iroha::data_model::{ domain::{Domain, DomainId}, isi::Register, }; -use iroha_test_network::NetworkBuilder; -use tokio::task::spawn_blocking; +use iroha_test_network::{NetworkBuilder, PeerLifecycleEvent}; +use tokio::{task::spawn_blocking, time::timeout}; #[tokio::test] async fn all_peers_submit_genesis() -> eyre::Result<()> { @@ -24,19 +26,29 @@ async fn multiple_genesis_4_peers_2_genesis() -> eyre::Result<()> { async fn multiple_genesis_peers(n_peers: usize, n_genesis_peers: usize) -> eyre::Result<()> { let network = NetworkBuilder::new().with_peers(n_peers).build(); - network - .peers() - .enumerate() - .map(|(i, peer)| { - peer.start( - network.config(), - (i < n_genesis_peers).then_some(network.genesis()), - ) - }) - .collect::>() - .collect::>() - .await; - network.ensure_synchronised_blocks_height(1).await?; + timeout( + Duration::from_secs(5), + network + .peers() + .enumerate() + .map(|(i, peer)| { + let cfg = network.config(); + let genesis = (i < n_genesis_peers).then_some(network.genesis()); + async move { + tokio::join!( + peer.spawn(cfg, genesis), + peer.once(|e| matches!( + e, + PeerLifecycleEvent::LogBlockCommitted { height: 1 } + )) + ); + } + }) + .collect::>() + .collect::>(), + ) + .await + .expect("should start"); let client = network.client(); let domain_id: DomainId = "foo".parse().expect("Valid"); diff --git a/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs b/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs index a8dc6fb34ac..08ee16d4c44 100644 --- a/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs +++ b/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs @@ -1,7 +1,4 @@ -use std::{num::NonZero, time::Duration}; - use eyre::Result; -use futures_util::StreamExt; use iroha::{ client::{self}, data_model::prelude::*, @@ -10,7 +7,7 @@ use iroha_data_model::parameter::BlockParameter; use iroha_test_network::*; use iroha_test_samples::gen_account_in; use nonzero_ext::nonzero; -use tokio::{task::spawn_blocking, time::sleep}; +use tokio::task::spawn_blocking; #[tokio::test] async fn multiple_blocks_created() -> Result<()> { @@ -19,13 +16,27 @@ async fn multiple_blocks_created() -> Result<()> { // Given let network = NetworkBuilder::new() .with_peers(4) - .push_isi(SetParameter(Parameter::Block( + .append_instruction(SetParameter(Parameter::Block( BlockParameter::MaxTransactions(nonzero!(1u64)), ))) .start() .await?; - let client_one = network.peers().take(1).next().unwrap().client_for_alice(); - let client_two = network.peers().last().unwrap().client_for_alice(); + let peer_one = network.peer(); + let client_one = peer_one.alice_client(); + let last_peer = network.peers().last().unwrap(); + assert_ne!(peer_one.id(), last_peer.id()); + + let mut events = last_peer.events(); + tokio::spawn(async move { + while let Ok(e) = events.recv().await { + match e { + PeerLifecycleEvent::LogBlockCommitted { height } => { + println!("Last peer block committed: {height}") + } + _ => {} + } + } + }); let create_domain = Register::domain(Domain::new("domain".parse()?)); let (account_id, _account_keypair) = gen_account_in("domain"); @@ -63,7 +74,7 @@ async fn multiple_blocks_created() -> Result<()> { AssetId::new(definition.clone(), account.clone()), )) .expect("submit cannot fail"); - println!("{i}/{N_BLOCKS}"); + println!("Submitted {}/{N_BLOCKS} txs", i + 1); } }) .await?; @@ -72,6 +83,7 @@ async fn multiple_blocks_created() -> Result<()> { .await?; // Then + let client_two = last_peer.alice_client(); let asset = spawn_blocking(move || { client_two .query(client::asset::all()) diff --git a/crates/iroha/tests/integration/extra_functional/offline_peers.rs b/crates/iroha/tests/integration/extra_functional/offline_peers.rs index 6b39b0e1b18..55b4b544b97 100644 --- a/crates/iroha/tests/integration/extra_functional/offline_peers.rs +++ b/crates/iroha/tests/integration/extra_functional/offline_peers.rs @@ -29,7 +29,7 @@ async fn genesis_block_is_committed_with_some_offline_peers() -> Result<()> { // only 2 out of 4 .take(2) .enumerate() - .map(|(i, peer)| peer.start(cfg.clone(), (i == 0).then_some(genesis))) + .map(|(i, peer)| peer.spawn(cfg.clone(), (i == 0).then_some(genesis))) .collect::>() .collect::>() .await; @@ -80,7 +80,7 @@ async fn register_offline_peer() -> Result<()> { async fn check_status(network: &Network, expected_peers: u64) { for peer in network.peers() { - let client = peer.client_for_alice(); + let client = peer.alice_client(); let status = spawn_blocking(move || client.get_status()) .await .expect("no panic") diff --git a/crates/iroha/tests/integration/extra_functional/restart_peer.rs b/crates/iroha/tests/integration/extra_functional/restart_peer.rs index ad949d58053..7fc6a2f0d91 100644 --- a/crates/iroha/tests/integration/extra_functional/restart_peer.rs +++ b/crates/iroha/tests/integration/extra_functional/restart_peer.rs @@ -14,7 +14,7 @@ async fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> { let network = NetworkBuilder::new().with_peers(4).start().await?; - let random_client = network.random_peer().client_for_alice(); + let random_client = network.random_peer().alice_client(); let asset_definition_clone = asset_definition_id.clone(); spawn_blocking(move || { random_client @@ -34,7 +34,7 @@ async fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> { // Wait for observing peer to get the block sleep(network.pipeline_time()).await; - let random_client = network.random_peer().client_for_alice(); + let random_client = network.random_peer().alice_client(); let assets = spawn_blocking(move || { random_client .query(client::asset::all()) @@ -55,13 +55,13 @@ async fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> { // restart one let peer = network.random_peer(); - peer.start(network.config().clone(), Some(network.genesis())) + peer.spawn(network.config().clone(), Some(network.genesis())) .await; network.ensure_synchronised_blocks_height(1).await?; iroha_logger::info!("peer restart"); - let client = peer.client_for_alice(); + let client = peer.alice_client(); let assets = spawn_blocking(move || { client .query(client::asset::all()) diff --git a/crates/iroha/tests/integration/extra_functional/unregister_peer.rs b/crates/iroha/tests/integration/extra_functional/unregister_peer.rs index 9c6ccc39e14..a3bd8012a7e 100644 --- a/crates/iroha/tests/integration/extra_functional/unregister_peer.rs +++ b/crates/iroha/tests/integration/extra_functional/unregister_peer.rs @@ -25,8 +25,8 @@ fn unstable_network_stable_after_add_and_after_remove_peer() -> Result<()> { // and a new peer is registered let peer = network.add_peer(); let peer_id = peer.id(); - let peer_client = peer.client_for_alice(); - rt.block_on(async { peer.start(network.config(), None).await }); + let peer_client = peer.alice_client(); + rt.block_on(async { peer.spawn(network.config(), None).await }); client.submit_blocking(Register::peer(Peer::new(peer_id.clone())))?; // Then the new peer should already have the mint result. network.poll_sync(|| { diff --git a/crates/iroha/tests/integration/permissions.rs b/crates/iroha/tests/integration/permissions.rs index 48d1c0a590e..90c3db20ce3 100644 --- a/crates/iroha/tests/integration/permissions.rs +++ b/crates/iroha/tests/integration/permissions.rs @@ -1,7 +1,6 @@ -use std::{thread, time::Duration}; +use std::time::Duration; use eyre::Result; -use futures_util::{future::join_all, stream::FuturesUnordered}; use iroha::{ client::{self, Client}, crypto::KeyPair, @@ -14,10 +13,9 @@ use iroha_executor_data_model::permission::{ asset::{CanSetKeyValueInUserAsset, CanTransferUserAsset}, domain::CanSetKeyValueInDomain, }; -use iroha_genesis::GenesisBlock; use iroha_test_network::*; use iroha_test_samples::{gen_account_in, ALICE_ID, BOB_ID}; -use tokio::{join, task::JoinSet, time::timeout}; +use tokio::{join, time::timeout}; // FIXME #[tokio::test] @@ -28,18 +26,20 @@ async fn genesis_transactions_are_validated_by_executor() { let asset_definition_id = "xor#wonderland".parse().expect("Valid"); let invalid_instruction = Register::asset_definition(AssetDefinition::numeric(asset_definition_id)); - let network = NetworkBuilder::new().push_isi(invalid_instruction).build(); + let network = NetworkBuilder::new() + .append_instruction(invalid_instruction) + .build(); let peer = network.peer(); timeout(Duration::from_secs(3), async { join!( // Peer should start... - peer.start(network.config(), Some(network.genesis())), - peer.once(|event| matches!(event, PeerExecutionEvent::ServerStarted)), + peer.spawn(network.config(), Some(network.genesis())), + peer.once(|event| matches!(event, PeerLifecycleEvent::ServerStarted)), // ...but it should shortly exit with an error peer.once(|event| match event { // TODO: handle "Invalid genesis" more granular - PeerExecutionEvent::Terminated { status } => !status.success(), + PeerLifecycleEvent::Terminated { status } => !status.success(), _ => false, }) ) diff --git a/crates/iroha/tests/integration/transfer_domain.rs b/crates/iroha/tests/integration/transfer_domain.rs index b633e72663c..b5c085df066 100644 --- a/crates/iroha/tests/integration/transfer_domain.rs +++ b/crates/iroha/tests/integration/transfer_domain.rs @@ -1,7 +1,6 @@ use eyre::Result; use iroha::{ client, - client::Client, crypto::KeyPair, data_model::{prelude::*, transaction::error::TransactionRejectionReason}, }; @@ -12,11 +11,9 @@ use iroha_executor_data_model::permission::{ domain::{CanRegisterAssetDefinitionInDomain, CanUnregisterDomain}, trigger::CanUnregisterUserTrigger, }; -use iroha_genesis::GenesisBlock; use iroha_primitives::json::JsonString; use iroha_test_network::*; use iroha_test_samples::{gen_account_in, ALICE_ID, BOB_ID, SAMPLE_GENESIS_ACCOUNT_ID}; -use tokio::runtime::Runtime; #[test] fn domain_owner_domain_permissions() -> Result<()> { @@ -367,16 +364,16 @@ fn not_allowed_to_transfer_other_user_domain() -> Result<()> { let genesis_account = SAMPLE_GENESIS_ACCOUNT_ID.clone(); let (network, _rt) = NetworkBuilder::new() - .push_isi(Register::domain(Domain::new(users_domain.clone()))) - .push_isi(Register::account(Account::new(user1.clone()))) - .push_isi(Register::account(Account::new(user2.clone()))) - .push_isi(Register::domain(Domain::new(foo_domain.clone()))) - .push_isi(Transfer::domain( + .append_instruction(Register::domain(Domain::new(users_domain.clone()))) + .append_instruction(Register::account(Account::new(user1.clone()))) + .append_instruction(Register::account(Account::new(user2.clone()))) + .append_instruction(Register::domain(Domain::new(foo_domain.clone()))) + .append_instruction(Transfer::domain( genesis_account.clone(), foo_domain.clone(), user1.clone(), )) - .push_isi(Transfer::domain( + .append_instruction(Transfer::domain( genesis_account.clone(), users_domain.clone(), user1.clone(), diff --git a/crates/iroha_config_base/src/toml.rs b/crates/iroha_config_base/src/toml.rs index 29d74943a8f..2b7ab4adb1a 100644 --- a/crates/iroha_config_base/src/toml.rs +++ b/crates/iroha_config_base/src/toml.rs @@ -286,7 +286,9 @@ impl<'a> From<&'a mut Table> for Writer<'a> { } } +/// Extension trait to implement writing with [`Writer`] directly into [`Table`] in a chained manner. pub trait WriteExt: Sized { + /// See [`Writer::write`]. fn write(self, path: P, value: T) -> Self; } diff --git a/crates/iroha_test_network/src/config.rs b/crates/iroha_test_network/src/config.rs new file mode 100644 index 00000000000..8819caae40c --- /dev/null +++ b/crates/iroha_test_network/src/config.rs @@ -0,0 +1,98 @@ +//! Sample configuration builders + +use std::path::Path; + +use iroha_config::base::toml::WriteExt; +use iroha_data_model::{ + asset::AssetDefinitionId, + isi::{Grant, Instruction}, + peer::PeerId, + ChainId, +}; +use iroha_executor_data_model::permission::{ + asset::{CanBurnAssetWithDefinition, CanMintAssetWithDefinition}, + domain::CanUnregisterDomain, + executor::CanUpgradeExecutor, + peer::CanUnregisterAnyPeer, + role::CanUnregisterAnyRole, +}; +use iroha_genesis::{GenesisBlock, RawGenesisTransaction}; +use iroha_primitives::unique_vec::UniqueVec; +use iroha_test_samples::{ALICE_ID, SAMPLE_GENESIS_ACCOUNT_KEYPAIR}; +use toml::Table; + +pub fn chain_id() -> ChainId { + ChainId::from("00000000-0000-0000-0000-000000000000") +} + +pub fn base_iroha_config() -> Table { + Table::new() + .write("chain", chain_id()) + .write( + ["genesis", "public_key"], + SAMPLE_GENESIS_ACCOUNT_KEYPAIR.public_key(), + ) + // There is no need in persistence in tests. + .write(["snapshot", "mode"], "disabled") + .write(["kura", "store_dir"], "./storage") + .write(["network", "block_gossip_size"], 1) + .write(["logger", "level"], "DEBUG") +} + +pub fn genesis( + extra_isi: impl IntoIterator, + topology: UniqueVec, +) -> GenesisBlock { + // TODO: Fix this somehow. Probably we need to make `kagami` a library (#3253). + let mut genesis = RawGenesisTransaction::from_path( + Path::new(env!("CARGO_MANIFEST_DIR")).join("../../defaults/genesis.json"), + ) + .expect("Failed to deserialize genesis block from file"); + + let rose_definition_id = "rose#wonderland".parse::().unwrap(); + + let grant_mint_rose_permission = Grant::account_permission( + CanMintAssetWithDefinition { + asset_definition: rose_definition_id.clone(), + }, + ALICE_ID.clone(), + ); + let grant_burn_rose_permission = Grant::account_permission( + CanBurnAssetWithDefinition { + asset_definition: rose_definition_id, + }, + ALICE_ID.clone(), + ); + let grant_unregister_any_peer_permission = + Grant::account_permission(CanUnregisterAnyPeer, ALICE_ID.clone()); + let grant_unregister_any_role_permission = + Grant::account_permission(CanUnregisterAnyRole, ALICE_ID.clone()); + let grant_unregister_wonderland_domain = Grant::account_permission( + CanUnregisterDomain { + domain: "wonderland".parse().unwrap(), + }, + ALICE_ID.clone(), + ); + let grant_upgrade_executor_permission = + Grant::account_permission(CanUpgradeExecutor, ALICE_ID.clone()); + for isi in [ + grant_mint_rose_permission, + grant_burn_rose_permission, + grant_unregister_any_peer_permission, + grant_unregister_any_role_permission, + grant_unregister_wonderland_domain, + grant_upgrade_executor_permission, + ] { + genesis.append_instruction(isi); + } + + for isi in extra_isi.into_iter() { + genesis.append_instruction(isi); + } + + let genesis_key_pair = SAMPLE_GENESIS_ACCOUNT_KEYPAIR.clone(); + genesis + .with_topology(topology.into()) + .build_and_sign(&genesis_key_pair) + .expect("genesis should load fine") +} diff --git a/crates/iroha_test_network/src/fslock_ports.rs b/crates/iroha_test_network/src/fslock_ports.rs new file mode 100644 index 00000000000..78e9a6fc518 --- /dev/null +++ b/crates/iroha_test_network/src/fslock_ports.rs @@ -0,0 +1,96 @@ +//! [`fslock`]-based socket ports locking for test network peers, +//! supporting inter-process and intra-process test execution scenarios. + +use std::{ + collections::BTreeSet, + fs::OpenOptions, + io::{Read, Write}, +}; + +use color_eyre::Result; +use derive_more::{Deref, Display}; +use serde::{Deserialize, Serialize}; + +const LOCK_FILE: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/.test_network_ports.lock"); +const DATA_FILE: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/.test_network_ports.json"); + +#[derive(Serialize, Deserialize, Default)] +struct LockContent { + ports_in_use: BTreeSet, +} + +impl LockContent { + fn read() -> Result { + let value = if std::fs::exists(DATA_FILE)? { + let mut content = String::new(); + let mut file = OpenOptions::new().read(true).open(DATA_FILE)?; + file.read_to_string(&mut content)?; + serde_json::from_str(&content)? + } else { + Default::default() + }; + Ok(value) + } + + fn write(&self) -> Result<()> { + if std::fs::exists(DATA_FILE)? { + std::fs::remove_file(DATA_FILE)?; + } + if self.ports_in_use.is_empty() { + return Ok(()); + }; + let mut file = OpenOptions::new() + .create(true) + .write(true) + .open(DATA_FILE)?; + file.write_all(serde_json::to_string(&self).unwrap().as_bytes())?; + Ok(()) + } +} + +/// Releases the port on [`Drop`]. +#[derive(Debug, Deref, Display)] +pub struct AllocatedPort(u16); + +impl AllocatedPort { + pub fn new() -> Self { + let mut lock = fslock::LockFile::open(LOCK_FILE).expect("path is valid"); + lock.lock().expect("this handle doesn't own the file yet"); + + let mut value = LockContent::read().expect("should be able to read the data"); + + let mut i = 0; + let port = loop { + let port = unique_port::get_unique_free_port().unwrap(); + if !value.ports_in_use.contains(&port) { + break port; + } + i = i + 1; + if i == 1000 { + panic!("cannot find a free port") + } + }; + + value.ports_in_use.insert(port); + + value.write().expect("should be able to write the data"); + lock.unlock().expect("this handle still holds the lock"); + + eprintln!("[unique port] allocated {port}"); + + Self(port) + } +} + +impl Drop for AllocatedPort { + fn drop(&mut self) { + let mut lock = fslock::LockFile::open(LOCK_FILE).expect("path is valid"); + lock.lock().expect("doesn't hold it yet"); + let mut value = LockContent::read().expect("should read fine"); + value.ports_in_use.remove(&self.0); + value.write().expect("should save the result filne"); + lock.unlock().expect("still holds it"); + + eprintln!("[unique port] released {}", self.0); + } +} diff --git a/crates/iroha_test_network/src/lib.rs b/crates/iroha_test_network/src/lib.rs index 420ed5034e6..74aa785fdcd 100644 --- a/crates/iroha_test_network/src/lib.rs +++ b/crates/iroha_test_network/src/lib.rs @@ -1,11 +1,12 @@ -//! Module for starting peers and networks. Used only for tests +//! Puppeteer for `irohad`, to create test networks + +mod config; +mod fslock_ports; + use core::{fmt::Debug, time::Duration}; use std::{ - collections::BTreeSet, - fs::OpenOptions, - io::Write, ops::Deref, - path::{Path, PathBuf}, + path::Path, process::{ExitStatus, Stdio}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -15,53 +16,37 @@ use std::{ use backoff::{exponential::ExponentialBackoff, ExponentialBackoffBuilder, SystemClock}; use color_eyre::eyre::{eyre, Result}; -use derive_more::{Deref, Display}; -use futures::{prelude::*, stream::FuturesUnordered, task::SpawnExt}; -use iroha::{ - client::Client, - data_model::{isi::Instruction, prelude::*}, -}; +use derive_more::Display; +use fslock_ports::AllocatedPort; +use futures::{prelude::*, stream::FuturesUnordered}; +use iroha::{client::Client, data_model::prelude::*}; use iroha_config::base::{ read::ConfigReader, toml::{TomlSource, WriteExt as _, Writer as TomlWriter}, }; pub use iroha_core::state::StateReadOnly; use iroha_crypto::{ExposedPrivateKey, KeyPair, PrivateKey}; -use iroha_data_model::{ - asset::AssetDefinitionId, isi::InstructionBox, parameter::SumeragiParameter, ChainId, -}; -use iroha_executor_data_model::permission::{ - asset::{CanBurnAssetWithDefinition, CanMintAssetWithDefinition}, - domain::CanUnregisterDomain, - executor::CanUpgradeExecutor, - peer::CanUnregisterAnyPeer, - role::CanUnregisterAnyRole, -}; -use iroha_genesis::{GenesisBlock, RawGenesisTransaction}; +use iroha_data_model::{isi::InstructionBox, parameter::SumeragiParameter, ChainId}; +use iroha_genesis::GenesisBlock; use iroha_primitives::{addr::socket_addr, unique_vec::UniqueVec}; use iroha_test_samples::{ALICE_ID, ALICE_KEYPAIR, PEER_KEYPAIR, SAMPLE_GENESIS_ACCOUNT_KEYPAIR}; use parity_scale_codec::Encode; -use rand::{ - prelude::{IteratorRandom, SliceRandom}, - thread_rng, -}; -use serde::{Deserialize, Serialize}; +use rand::{prelude::IteratorRandom, thread_rng}; use tempfile::TempDir; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, join, runtime::{self, Runtime}, - sync::{broadcast, mpsc, oneshot, Mutex}, + sync::{broadcast, oneshot, Mutex}, task::{spawn_blocking, JoinSet}, time::timeout, }; use toml::Table; -use unique_port; const DEFAULT_PIPELINE_TIME: Duration = Duration::from_millis(100); const DEFAULT_BLOCK_SYNC: Duration = Duration::from_millis(25); -const PEER_START_TIMEOUT: Duration = Duration::from_secs(3); +// const PEER_START_TIMEOUT: Duration = Duration::from_secs(3); const POLLING_TIMEOUT: Duration = Duration::from_secs(10); const PEER_KILL_TIMEOUT: Duration = Duration::from_secs(2); @@ -69,7 +54,7 @@ const IROHA_BIN: &'static str = "/Users/qua/dev/iroha/target/release/irohad"; /// Network of peers pub struct Network { - peers: Vec, + peers: Vec, genesis: GenesisBlock, block_time: Duration, @@ -84,22 +69,22 @@ impl Network { /// /// The peer is not started automatically, but is included in the topology for future /// calls of [`Self::config`]. - pub fn add_peer(&mut self) -> PeerHandle { - let peer = PeerHandle::generate(); + pub fn add_peer(&mut self) -> NetworkPeer { + let peer = NetworkPeer::generate(); self.peers.push(peer); self.peers.last().expect("just added one").clone() } /// Returns all peers. - pub fn peers(&self) -> impl Iterator + '_ { + pub fn peers(&self) -> impl Iterator + '_ { self.peers.iter() } - pub fn peer(&self) -> &PeerHandle { + pub fn peer(&self) -> &NetworkPeer { self.peers.first().expect("there is always at least one") } - pub fn random_peer(&self) -> &PeerHandle { + pub fn random_peer(&self) -> &NetworkPeer { self.peers() .choose(&mut thread_rng()) .expect("there is at least one peer") @@ -116,9 +101,9 @@ impl Network { for (i, peer) in self.peers().enumerate() { futures.push(async move { join!( - peer.start(self.config(), (i == 0).then_some(&self.genesis)), - peer.once(|e| matches!(e, PeerExecutionEvent::ServerStarted)), - peer.once(|e| matches!(e, PeerExecutionEvent::LogBlockCommitted { height: 1 })), + peer.spawn(self.config(), (i == 0).then_some(&self.genesis)), + peer.once(|e| matches!(e, PeerLifecycleEvent::ServerStarted)), + peer.once(|e| matches!(e, PeerLifecycleEvent::LogBlockCommitted { height: 1 })), ); }) } @@ -143,7 +128,7 @@ impl Network { } pub fn chain_id(&self) -> ChainId { - default_chain_id() + config::chain_id() } pub fn config(&self) -> Table { @@ -159,7 +144,7 @@ impl Network { pub async fn shutdown(&self) -> &Self { for peer in self.peers() { if peer.is_running() { - peer.shutdown().await; + peer.kill().await; } } self @@ -205,7 +190,7 @@ impl Network { if self .peers() .filter(|x| x.is_running()) - .map(|x| x.client_for_alice()) + .map(|x| x.alice_client()) .map(|client| async { self.poll(move || { let client = client.clone(); @@ -249,16 +234,20 @@ pub struct NetworkBuilder { extra_isi: Vec, } +/// Test network builder impl NetworkBuilder { pub fn new() -> Self { Self { n_peers: 1, - config: base_config(), + config: config::base_iroha_config(), pipeline_time: DEFAULT_PIPELINE_TIME, extra_isi: vec![], } } + /// Set the number of peers in the network. + /// + /// One by default. pub fn with_peers(mut self, n_peers: usize) -> Self { let n_peers = n_peers.into(); assert_ne!(n_peers, 0); @@ -266,29 +255,46 @@ impl NetworkBuilder { self } + /// Set the pipeline time. + /// + /// Translates into setting of the [`SumeragiParameter::BlockTimeMs`] (1/3) and + /// [`SumeragiParameter::CommitTimeMs`] (2/3) in the genesis block. + /// + /// Reflected in [`Network::pipeline_time`]. pub fn with_pipeline_time(mut self, duration: Duration) -> Self { self.pipeline_time = duration; self } - pub fn with_config(self, f: F) -> Self + /// Add a layer of TOML configuration via [`TomlWriter`]. + /// + /// # Example + /// + /// ``` + /// use iroha_test_network::NetworkBuilder; + /// + /// NetworkBuilder::new().with_config(|t| { + /// t.write(["logger", "level"], "DEBUG"); + /// }); + /// ``` + pub fn with_config(mut self, f: F) -> Self where for<'a> F: FnOnce(&'a mut TomlWriter<'a>), { - let mut table = Table::new(); - let mut writer = TomlWriter::new(&mut table); + let mut writer = TomlWriter::new(&mut self.config); f(&mut writer); self } - pub fn push_isi(mut self, isi: impl Into) -> Self { + /// Append an instruction to genesis. + pub fn append_instruction(mut self, isi: impl Into) -> Self { self.extra_isi.push(isi.into()); self } - /// Creates new network with options provided. + /// Build the [`Network`]. Doesn't start it. pub fn build(self) -> Network { - let peers: Vec<_> = (0..self.n_peers).map(|_| PeerHandle::generate()).collect(); + let peers: Vec<_> = (0..self.n_peers).map(|_| NetworkPeer::generate()).collect(); let topology: UniqueVec<_> = peers.iter().map(|peer| peer.id.clone()).collect(); @@ -296,7 +302,7 @@ impl NetworkBuilder { let commit_time = self.pipeline_time / 2; let block_sync_gossip_period = DEFAULT_BLOCK_SYNC; - let genesis = test_genesis( + let genesis = config::genesis( [ InstructionBox::SetParameter(SetParameter(Parameter::Sumeragi( SumeragiParameter::BlockTimeMs(block_time.as_millis() as u64), @@ -323,6 +329,9 @@ impl NetworkBuilder { } } + /// Same as [`Self::build`], but also creates a [`Runtime`]. + /// + /// This method exists for convenience and to preserve compatibility with non-async tests. pub fn build_blocking(self) -> (Network, Runtime) { let rt = runtime::Builder::new_multi_thread() .thread_stack_size(32 * 1024 * 1024) @@ -333,12 +342,17 @@ impl NetworkBuilder { (network, rt) } + /// Build and start the network. + /// + /// Resolves when all peers are running and have committed genesis block. + /// See [`Network::start_all`]. pub async fn start(self) -> Result { let network = self.build(); network.start_all().await; Ok(network) } + /// Combination of [`Self::build_blocking`] and [`Self::start`]. pub fn start_blocking(self) -> Result<(Network, Runtime)> { let (network, rt) = self.build_blocking(); rt.block_on(async { network.start_all().await }); @@ -346,7 +360,15 @@ impl NetworkBuilder { } } -/// A common signatory in the test network +/// A common signatory in the test network. +/// +/// # Example +/// +/// ``` +/// use iroha_test_network::Signatory; +/// +/// let _alice_kp = Signatory::Alice.key_pair(); +/// ``` pub enum Signatory { Peer, Genesis, @@ -354,6 +376,7 @@ pub enum Signatory { } impl Signatory { + /// Get the associated key pair pub fn key_pair(&self) -> &KeyPair { match self { Signatory::Peer => &PEER_KEYPAIR, @@ -367,11 +390,9 @@ impl Signatory { /// Running Iroha peer. /// /// Aborts peer forcefully when dropped -pub struct PeerRun { - id: RunId, +struct PeerRun { tasks: JoinSet<()>, kill: oneshot::Sender<()>, - is_running: Arc, } #[derive(Display, Clone)] @@ -383,53 +404,54 @@ struct RunId { run: usize, } -struct RunFailure { - exit_status: ExitStatus, - stderr: String, -} - +/// Lifecycle events of a peer #[derive(Copy, Clone, Debug)] -pub enum PeerExecutionEvent { +pub enum PeerLifecycleEvent { + /// Process spawned Spawned, + /// Server started to respond ServerStarted, + /// Process terminated Terminated { status: ExitStatus }, + /// Process was killed Killed, - LogWelcome, - LogGenesisInvalid, + /// Caught a "Block committed" log message LogBlockCommitted { height: u64 }, - LogExitedNormally, - LogExitedBadly, + // LogWelcome, + // LogGenesisInvalid, + // LogExitedNormally, + // LogExitedBadly, } -/// Information about a test peer +/// Controls execution of `irohad` child process. +/// +/// While exists, allocates socket ports and a temporary directory (not cleared automatically). +/// +/// It can be started and shut down repeatedly. +/// It stores configuration and logs for each run separately. +/// +/// When dropped, aborts the child process (if it is running). #[derive(Clone)] -pub struct PeerHandle { +pub struct NetworkPeer { id: PeerId, key_pair: KeyPair, dir: Arc, run: Arc>>, runs_count: Arc, is_running: Arc, - events: broadcast::Sender, + events: broadcast::Sender, // dropping these the last - port_p2p: Arc, - port_api: Arc, + port_p2p: Arc, + port_api: Arc, } -impl PeerHandle { - /// Creates peer - /// - /// # Errors - /// * If can't get a unique port for - /// - `p2p_address` - /// - `api_address` - /// * If keypair generation fails - pub fn generate() -> Self { +impl NetworkPeer { + fn generate() -> Self { let key_pair = KeyPair::random(); - let port_p2p = LockedPort::allocate(); - let port_api = LockedPort::allocate(); + let port_p2p = AllocatedPort::new(); + let port_api = AllocatedPort::new(); let id = PeerId::new( - socket_addr!(127.0.0.1:port_p2p.0), + socket_addr!(127.0.0.1:*port_p2p), key_pair.public_key().clone(), ); let temp_dir = Arc::new( @@ -438,7 +460,7 @@ impl PeerHandle { .tempdir() .expect("temp dirs are available in the system"), ); - let (tx, mut rx) = broadcast::channel(32); + let (tx, _rx) = broadcast::channel(32); // tokio::spawn(async move { // while let Ok(event) = rx.recv().await { @@ -459,14 +481,24 @@ impl PeerHandle { } } - pub async fn start(&self, config: Table, genesis: Option<&GenesisBlock>) { + /// Spawn the child process. + /// + /// Passed configuration must contain network topology in the `sumeragi.trusted_peers` parameter. + /// + /// This function doesn't wait for peer server to start working, or for it to commit genesis block. + /// Iroha could as well terminate immediately with an error, and it is not tracked by this function. + /// Use [`Self::events`]/[`Self::once`] to monitor peer's lifecycle. + /// + /// # Panics + /// If peer was not started. + pub async fn spawn(&self, config: Table, genesis: Option<&GenesisBlock>) { let mut run_guard = self.run.lock().await; assert!(run_guard.is_none(), "already running"); let run_num = self.runs_count.fetch_add(1, Ordering::Relaxed) + 1; let run_id = RunId { - p2p: self.port_p2p.0, - api: self.port_api.0, + p2p: **self.port_p2p, + api: **self.port_api, dir: format!("{}", self.dir.path().display()), run: run_num, }; @@ -484,7 +516,6 @@ impl PeerHandle { format!("127.0.0.1:{}", self.port_p2p), ) .write(["torii", "address"], format!("127.0.0.1:{}", self.port_api)) - .write(["logger", "level"], "DEBUG") .write(["logger", "format"], "json"); let config_path = self.dir.path().join(format!("run-{run_num}-config.toml")); @@ -516,6 +547,7 @@ impl PeerHandle { cmd.current_dir(self.dir.path()); let mut child = cmd.spawn().expect("spawn failure is abnormal"); self.is_running.store(true, Ordering::Relaxed); + let _ = self.events.send(PeerLifecycleEvent::Spawned); let mut tasks = JoinSet::<()>::new(); @@ -564,20 +596,20 @@ impl PeerHandle { result = child.wait() => { let status = result.expect("peer process waiting should not fail"); eprintln!("{run_id} process terminated: {status}"); - let _ = events_tx.send(PeerExecutionEvent::Terminated { status }); + let _ = events_tx.send(PeerLifecycleEvent::Terminated { status }); } _ = kill_rx => { eprintln!("{run_id} killing process"); child.kill().await.expect("kill failed"); // TODO: child.wait? - let _ = events_tx.send(PeerExecutionEvent::Killed); + let _ = events_tx.send(PeerLifecycleEvent::Killed); } }; is_running.store(false, Ordering::Relaxed); }); } - let client = self.client_for_alice(); + let client = self.alice_client(); let events_tx = self.events.clone(); tasks.spawn(async move { backoff::future::retry( @@ -596,23 +628,28 @@ impl PeerHandle { ) .await .expect("retrying can only succeed since there is no max timeout"); - let _ = events_tx.send(PeerExecutionEvent::ServerStarted); + let _ = events_tx.send(PeerLifecycleEvent::ServerStarted); }); *run_guard = Some(PeerRun { - id: run_id, + // id: run_id, tasks, kill: kill_tx, - is_running: self.is_running.clone(), + // is_running: self.is_running.clone(), }); } - pub async fn shutdown(&self) { + /// Forcefully kills the running peer + /// + /// # Panics + /// If peer was not started. + pub async fn kill(&self) { let mut guard = self.run.lock().await; let Some(run) = (*guard).take() else { panic!("peer is not running, nothing to shut down"); }; if self.is_running() { + // TODO: send graceful shutdown signal? let _ = run.kill.send(()); timeout(PEER_KILL_TIMEOUT, run.tasks.join_all()) .await @@ -621,13 +658,32 @@ impl PeerHandle { } } - pub fn events(&self) -> broadcast::Receiver { + /// Subscribe on peer lifecycle events. + pub fn events(&self) -> broadcast::Receiver { self.events.subscribe() } + /// Wait _once_ an event matches a predicate. + /// + /// ``` + /// use iroha_test_network::{Network, NetworkBuilder, PeerLifecycleEvent}; + /// + /// #[tokio::main] + /// async fn mail() { + /// let network = NetworkBuilder::new().build(); + /// let peer = network.peer(); + /// + /// tokio::join!( + /// peer.spawn(network.config(), None), + /// peer.once(|event| matches!(event, PeerLifecycleEvent::ServerStarted)) + /// ); + /// } + /// ``` + /// + /// It is a narrowed version of [`Self::events`]. pub async fn once(&self, f: F) where - F: Fn(PeerExecutionEvent) -> bool, + F: Fn(PeerLifecycleEvent) -> bool, { let mut rx = self.events(); loop { @@ -639,19 +695,22 @@ impl PeerHandle { } } + /// Generated [`PeerId`] pub fn id(&self) -> PeerId { self.id.clone() } + /// Check whether the peer is running pub fn is_running(&self) -> bool { self.is_running.load(Ordering::Relaxed) } + /// Create a client to interact with this peer pub fn client(&self, account_id: &AccountId, account_private_key: PrivateKey) -> Client { let config = ConfigReader::new() .with_toml_source(TomlSource::inline( Table::new() - .write("chain", default_chain_id()) + .write("chain", config::chain_id()) .write(["account", "domain"], account_id.domain()) .write(["account", "public_key"], account_id.signatory()) .write(["account", "private_key"], account_private_key.expose()) @@ -665,12 +724,13 @@ impl PeerHandle { Client::new(config) } - pub fn client_for_alice(&self) -> Client { + /// Alice's [`Self::client`] + pub fn alice_client(&self) -> Client { self.client(&ALICE_ID, ALICE_KEYPAIR.private_key().clone()) } } -fn handle_peer_log_message(log: &serde_json::Value, tx: &broadcast::Sender) { +fn handle_peer_log_message(log: &serde_json::Value, tx: &broadcast::Sender) { let is_info = log .get("level") .map(|level| level.as_str().map(|value| value == "INFO")) @@ -695,129 +755,17 @@ fn handle_peer_log_message(log: &serde_json::Value, tx: &broadcast::Sender for Box { - fn from(val: PeerHandle) -> Self { - Box::new(iroha_data_model::peer::Peer::new(val.id.clone())) - } -} - -/// Client mocking trait -pub trait ClientExt: Sized { - /// Loop for events with filter and handler function - fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)); -} - -impl ClientExt for Client { - fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)) { - for event_result in self - .listen_for_events([event_filter]) - .expect("Failed to create event iterator.") - { - f(event_result) - } - } -} - -#[derive(Debug, Deref, Display)] -struct LockedPort(u16); - -#[derive(Serialize, Deserialize, Default)] -struct PortsLockfile { - ports: BTreeSet, -} - -impl PortsLockfile { - fn read() -> Self { - use std::io::Read; - - if std::fs::exists(ports_file()).unwrap() { - let mut content = String::new(); - let mut file = OpenOptions::new().read(true).open(ports_file()).unwrap(); - file.read_to_string(&mut content).unwrap(); - serde_json::from_str(&content).unwrap() - } else { - Default::default() - } - } - - fn save(&self) { - if std::fs::exists(crate::ports_file()).unwrap() { - std::fs::remove_file(ports_file()).unwrap(); - } - if self.ports.is_empty() { - return; - }; - let mut file = OpenOptions::new() - .create(true) - .write(true) - .open(ports_file()) - .unwrap(); - file.write_all(serde_json::to_string(&self).unwrap().as_bytes()) - .unwrap(); - } -} - -fn ports_file() -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(".test_network_ports.json") -} - -impl LockedPort { - fn open_lock() -> fslock::LockFile { - fslock::LockFile::open( - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join(".test_network_ports.lock") - .as_os_str(), - ) - .expect("cannot lock") - } - - fn allocate() -> Self { - use std::io::Read; - - let mut lock = Self::open_lock(); - lock.lock().expect("should be able to lock"); - let mut value = PortsLockfile::read(); - - let mut i = 0; - let port = loop { - let port = unique_port::get_unique_free_port().unwrap(); - if !value.ports.contains(&port) { - break port; - } - i = i + 1; - if i == 1000 { - panic!("cannot find a free port") - } - }; - - value.ports.insert(port); - - value.save(); - lock.unlock().unwrap(); - - eprintln!("[unique port] allocated {port}"); - - Self(port) - } -} - -impl Drop for LockedPort { - fn drop(&mut self) { - let mut lock = Self::open_lock(); - lock.lock().unwrap(); - let mut value = PortsLockfile::read(); - value.ports.remove(&self.0); - value.save(); - lock.unlock().unwrap(); - - eprintln!("[unique port] released {}", self.0); +impl From for Box { + fn from(val: NetworkPeer) -> Self { + Box::new(Peer::new(val.id.clone())) } } +// TODO: move to test utils, doesn't belong to test network? /// Construct executor from path. /// /// `relative_path` should be relative to `CARGO_MANIFEST_DIR`. @@ -838,107 +786,3 @@ pub fn construct_executor(relative_path: impl AsRef) -> Executor { Executor::new(WasmSmartContract::from_compiled(wasm_blob)) } - -fn test_genesis( - extra_isi: impl IntoIterator, - topology: UniqueVec, -) -> GenesisBlock { - // TODO: Fix this somehow. Probably we need to make `kagami` a library (#3253). - let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR")); - let mut genesis = - RawGenesisTransaction::from_path(manifest_dir.join("../../defaults/genesis.json")) - .expect("Failed to deserialize genesis block from file"); - - let rose_definition_id = "rose#wonderland".parse::().unwrap(); - - let grant_mint_rose_permission = Grant::account_permission( - CanMintAssetWithDefinition { - asset_definition: rose_definition_id.clone(), - }, - ALICE_ID.clone(), - ); - let grant_burn_rose_permission = Grant::account_permission( - CanBurnAssetWithDefinition { - asset_definition: rose_definition_id, - }, - ALICE_ID.clone(), - ); - let grant_unregister_any_peer_permission = - Grant::account_permission(CanUnregisterAnyPeer, ALICE_ID.clone()); - let grant_unregister_any_role_permission = - Grant::account_permission(CanUnregisterAnyRole, ALICE_ID.clone()); - let grant_unregister_wonderland_domain = Grant::account_permission( - CanUnregisterDomain { - domain: "wonderland".parse().unwrap(), - }, - ALICE_ID.clone(), - ); - let grant_upgrade_executor_permission = - Grant::account_permission(CanUpgradeExecutor, ALICE_ID.clone()); - for isi in [ - grant_mint_rose_permission, - grant_burn_rose_permission, - grant_unregister_any_peer_permission, - grant_unregister_any_role_permission, - grant_unregister_wonderland_domain, - grant_upgrade_executor_permission, - ] { - genesis.append_instruction(isi); - } - - for isi in extra_isi.into_iter() { - genesis.append_instruction(isi); - } - - let genesis_key_pair = SAMPLE_GENESIS_ACCOUNT_KEYPAIR.clone(); - genesis - .with_topology(topology.into()) - .build_and_sign(&genesis_key_pair) - .expect("genesis should load fine") -} - -fn default_chain_id() -> ChainId { - ChainId::from("00000000-0000-0000-0000-000000000000") -} - -fn base_config() -> Table { - Table::new() - .write("chain", default_chain_id()) - .write( - ["genesis", "public_key"], - SAMPLE_GENESIS_ACCOUNT_KEYPAIR.public_key(), - ) - // There is no need in persistence in tests. - // If required to should be set explicitly not to overlap with other existing tests - .write(["snapshot", "mode"], "disabled") - .write(["kura", "store_dir"], "./storage") - .write(["network", "block_gossip_size"], 1) -} - -#[cfg(test)] - -mod tests { - use super::*; - - #[test] - fn ports_lock() { - let _a = LockedPort::allocate(); - let _b = LockedPort::allocate(); - let _c = LockedPort::allocate(); - let _d = LockedPort::allocate(); - // thread::sleep(Duration::from_secs(120)); - } - - #[tokio::test] - async fn just_start() { - let network = timeout( - Duration::from_secs(1), - NetworkBuilder::new().with_peers(4).start(), - ) - .await - .unwrap() - .unwrap(); - // println!("started, go observe, you have 5 mins"); - // sleep(Duration::from_secs(300)).await; - } -}