From 657617dbf8f6834ac7d66ac50112bb86c8e41d4e Mon Sep 17 00:00:00 2001 From: 0x009922 <43530070+0x009922@users.noreply.github.com> Date: Fri, 27 Sep 2024 13:39:17 +0900 Subject: [PATCH] test: stabilise tests more; use `which` to find `irohad` Signed-off-by: 0x009922 <43530070+0x009922@users.noreply.github.com> --- .github/workflows/iroha2-dev-pr.yml | 3 +- Cargo.lock | 19 ++ crates/iroha/src/client.rs | 6 + .../extra_functional/connected_peers.rs | 8 +- .../integration/extra_functional/genesis.rs | 7 +- .../multiple_blocks_created.rs | 203 +++++++++++++----- .../extra_functional/restart_peer.rs | 4 +- .../extra_functional/unregister_peer.rs | 7 +- crates/iroha/tests/integration/tx_history.rs | 4 +- crates/iroha_test_network/Cargo.toml | 1 + crates/iroha_test_network/src/lib.rs | 47 ++-- 11 files changed, 209 insertions(+), 100 deletions(-) diff --git a/.github/workflows/iroha2-dev-pr.yml b/.github/workflows/iroha2-dev-pr.yml index 65ff2cbd68f..d356f26aa6f 100644 --- a/.github/workflows/iroha2-dev-pr.yml +++ b/.github/workflows/iroha2-dev-pr.yml @@ -76,8 +76,7 @@ jobs: path: ${{ env.DOCKER_COMPOSE_PATH }} - name: Run tests, with coverage run: | - cargo build --bin irohad - export IROHAD_EXEC=$(realpath ./target/debug/irohad) + mold --run cargo install --path ./crates/irohad mold --run cargo test --all-features --no-fail-fast --workspace env: RUSTFLAGS: "-C instrument-coverage" diff --git a/Cargo.lock b/Cargo.lock index ce64fb5db29..f2a4861c117 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3700,6 +3700,7 @@ dependencies = [ "tokio", "toml", "unique_port", + "which", ] [[package]] @@ -7179,6 +7180,18 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "which" +version = "6.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ee928febd44d98f2f459a4a79bd4d928591333a494a10a868418ac1b39cf1f" +dependencies = [ + "either", + "home", + "rustix", + "winsafe", +] + [[package]] name = "winapi" version = "0.3.9" @@ -7432,6 +7445,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "winsafe" +version = "0.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" + [[package]] name = "wit-parser" version = "0.209.1" diff --git a/crates/iroha/src/client.rs b/crates/iroha/src/client.rs index e88ed38fd91..93a8e54d04f 100644 --- a/crates/iroha/src/client.rs +++ b/crates/iroha/src/client.rs @@ -357,6 +357,12 @@ impl Client { Self::listen_for_tx_confirmation_loop(&mut event_iterator, hash), ) .await + .wrap_err_with(|| { + eyre!( + "haven't got tx confirmation within {:?} (configured with `transaction_status_timeout`)", + self.transaction_status_timeout + ) + }) .map_err(Into::into) .and_then(std::convert::identity); event_iterator.close().await; diff --git a/crates/iroha/tests/integration/extra_functional/connected_peers.rs b/crates/iroha/tests/integration/extra_functional/connected_peers.rs index 303e78e546f..969affc32d5 100644 --- a/crates/iroha/tests/integration/extra_functional/connected_peers.rs +++ b/crates/iroha/tests/integration/extra_functional/connected_peers.rs @@ -1,4 +1,4 @@ -use std::{iter::once, time::Duration}; +use std::iter::once; use assert_matches::assert_matches; use eyre::Result; @@ -12,7 +12,6 @@ use iroha_test_network::*; use rand::{prelude::IteratorRandom, seq::SliceRandom, thread_rng}; use tokio::{task::spawn_blocking, time::timeout}; -// #[ignore = "ignore, more in #2851"] #[tokio::test] async fn connected_peers_with_f_2_1_2() -> Result<()> { connected_peers_with_f(2).await @@ -41,7 +40,7 @@ async fn register_new_peer() -> Result<()> { let client = network.client(); spawn_blocking(move || client.submit_blocking(register)).await??; - timeout(Duration::from_secs(2), peer.once_block(2)).await?; + timeout(network.sync_timeout(), peer.once_block(2)).await?; Ok(()) } @@ -65,7 +64,7 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> { let unregister_peer = Unregister::peer(removed_peer.id()); spawn_blocking(move || client.submit_blocking(unregister_peer)).await??; timeout( - network.pipeline_time() * 10, + network.sync_timeout(), randomized_peers .iter() .map(|peer| peer.once_block(2)) @@ -90,7 +89,6 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> { spawn_blocking(move || client.submit_blocking(register_peer)).await??; network.ensure_blocks(3).await?; - // timeout(Duration::from_secs(2), removed_peer.once_block_height(3)).await?; assert_peers_status( randomized_peers .iter() diff --git a/crates/iroha/tests/integration/extra_functional/genesis.rs b/crates/iroha/tests/integration/extra_functional/genesis.rs index 6a661150c7e..8d680759e94 100644 --- a/crates/iroha/tests/integration/extra_functional/genesis.rs +++ b/crates/iroha/tests/integration/extra_functional/genesis.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use eyre::Context; use futures_util::{stream::FuturesUnordered, StreamExt}; use iroha::data_model::{ @@ -27,7 +25,7 @@ async fn multiple_genesis_4_peers_2_genesis() -> eyre::Result<()> { async fn multiple_genesis_peers(n_peers: usize, n_genesis_peers: usize) -> eyre::Result<()> { let network = NetworkBuilder::new().with_peers(n_peers).build(); timeout( - Duration::from_secs(5), + network.peer_startup_timeout(), network .peers() .iter() @@ -43,8 +41,7 @@ async fn multiple_genesis_peers(n_peers: usize, n_genesis_peers: usize) -> eyre: .collect::>() .collect::>(), ) - .await - .expect("should start"); + .await?; let client = network.client(); let domain_id: DomainId = "foo".parse().expect("Valid"); diff --git a/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs b/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs index 0a2cb7182af..94ae57f71b3 100644 --- a/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs +++ b/crates/iroha/tests/integration/extra_functional/multiple_blocks_created.rs @@ -1,3 +1,5 @@ +use std::{num::NonZero, time::Duration}; + use eyre::Result; use iroha::{ client::{self}, @@ -6,40 +8,30 @@ use iroha::{ use iroha_data_model::parameter::BlockParameter; use iroha_test_network::*; use iroha_test_samples::gen_account_in; -use nonzero_ext::nonzero; -use tokio::task::spawn_blocking; +use rand::{prelude::IteratorRandom, thread_rng}; +use tokio::{ + sync::{mpsc, oneshot, watch}, + task::{spawn_blocking, JoinSet}, + time::{sleep, timeout}, +}; +/// Bombard random peers with random mints in multiple rounds, ensuring they all have +/// a consistent total amount in the end. #[tokio::test] async fn multiple_blocks_created() -> Result<()> { - const N_BLOCKS: u64 = 500; + const N_ROUNDS: u64 = 50; + const N_MAX_TXS_PER_BLOCK: u64 = 10; // Given let network = NetworkBuilder::new() .with_peers(4) .with_genesis_instruction(SetParameter(Parameter::Block( - BlockParameter::MaxTransactions(nonzero!(1u64)), + BlockParameter::MaxTransactions(NonZero::new(N_MAX_TXS_PER_BLOCK).expect("valid")), ))) + .with_pipeline_time(Duration::from_secs(1)) .start() .await?; - let mut peers = network.peers().iter(); - let peer_a = peers.next().unwrap(); - let peer_b = peers.next().unwrap(); - let client_a = peer_a.client(); - assert_ne!(peer_a, peer_b); - - // let mut events = peer_b.events(); - // tokio::spawn(async move { - // while let Ok(e) = events.recv().await { - // match e { - // PeerLifecycleEvent::LogBlockCommitted { height } => { - // println!("Last peer block committed: {height}") - // } - // _ => {} - // } - // } - // }); - let create_domain = Register::domain(Domain::new("domain".parse()?)); let (account_id, _account_keypair) = gen_account_in("domain"); let create_account = Register::account(Account::new(account_id.clone())); @@ -48,9 +40,9 @@ async fn multiple_blocks_created() -> Result<()> { Register::asset_definition(AssetDefinition::numeric(asset_definition_id.clone())); { - let client_one = client_a.clone(); + let client = network.client(); spawn_blocking(move || { - client_one.clone().submit_all::([ + client.clone().submit_all::([ create_domain.into(), create_account.into(), create_asset.into(), @@ -59,40 +51,141 @@ async fn multiple_blocks_created() -> Result<()> { .await??; } - const INIT_BLOCKS: u64 = 2; - network.ensure_blocks(INIT_BLOCKS).await?; + network.ensure_blocks(2).await?; + + let blocks = BlocksTracker::start(&network).await; // When - let total: u128 = (0..N_BLOCKS as u128).sum(); - let definition = asset_definition_id.clone(); - let account = account_id.clone(); - spawn_blocking(move || { - for i in 0..N_BLOCKS { - client_a - .submit(Mint::asset_numeric( - Numeric::new(i as u128, 0), - AssetId::new(definition.clone(), account.clone()), - )) - .expect("submit cannot fail"); - println!("Submitted {}/{N_BLOCKS} txs", i + 1); + let mut total: u128 = 0; + for _ in 1..=N_ROUNDS { + blocks.invalidate().await; + + let txs = (1..=N_MAX_TXS_PER_BLOCK) + .choose(&mut thread_rng()) + .expect("there is a room to choose from"); + println!("submitting {txs} transactions on random peers"); + for _ in 0..txs { + let value = (0..999_999) + .choose(&mut thread_rng()) + .expect("there is quite a room to choose from"); + total = total + value; + + let client = network.client(); + let tx = client.build_transaction( + [Mint::asset_numeric( + Numeric::new(value, 0), + AssetId::new(asset_definition_id.clone(), account_id.clone()), + )], + <_>::default(), + ); + spawn_blocking(move || client.submit_transaction(&tx)).await??; } - }) - .await?; - network.ensure_blocks(INIT_BLOCKS + N_BLOCKS).await?; - - // Then - let client_two = peer_b.client(); - let asset = spawn_blocking(move || { - client_two - .query(client::asset::all()) - .filter_with(|asset| asset.id.account.eq(account_id)) - .execute_all() - }) - .await?? - .into_iter() - .find(|asset| asset.id().definition() == &asset_definition_id) - .expect("should exist"); - assert_eq!(*asset.value(), AssetValue::Numeric(Numeric::new(total, 0))); + + timeout(network.sync_timeout(), blocks.sync()).await?; + } + + // ensuring all have the same total + sleep(Duration::from_secs(2)).await; + println!("all peers should have total={total}"); + let expected_value = AssetValue::Numeric(Numeric::new(total, 0)); + for peer in network.peers() { + let client = peer.client(); + let expected_value = expected_value.clone(); + let account_id = account_id.clone(); + let definition = asset_definition_id.clone(); + let assets = spawn_blocking(move || { + client + .query(client::asset::all()) + .filter_with(|asset| { + asset.id.account.eq(account_id) & asset.id.definition_id.eq(definition) + }) + .execute_all() + }) + .await??; + assert_eq!(assets.len(), 1); + let asset = assets.into_iter().next().unwrap(); + assert_eq!(*asset.value(), expected_value); + } Ok(()) } + +struct BlocksTracker { + invalidate_tx: mpsc::Sender>, + sync_tx: watch::Sender, + _children: JoinSet<()>, +} + +impl BlocksTracker { + async fn start(network: &Network) -> Self { + let mut children = JoinSet::new(); + + let (block_tx, mut block_rx) = mpsc::channel::<(u64, usize)>(10); + for (i, peer) in network.peers().iter().cloned().enumerate() { + let tx = block_tx.clone(); + children.spawn(async move { + let mut recv = peer.blocks(); + loop { + let value = *recv.borrow_and_update(); + if let Some(value) = value { + let _ = tx.send((value, i)).await; + } + if let Err(_) = recv.changed().await { + break; + } + } + }); + } + + let peers_count = network.peers().len(); + let (invalidate_tx, mut invalidate_rx) = mpsc::channel::>(1); + let (sync_tx, _sync_rx) = watch::channel(false); + let sync_clone = sync_tx.clone(); + children.spawn(async move { + let mut max = 0; + let mut invalidated = false; + let mut blocks = vec![0; peers_count]; + loop { + tokio::select! { + Some((height, i)) = block_rx.recv() => { + *blocks.get_mut(i).unwrap() = height; + if height > max { + invalidated = false; + max = height; + } + // println!("blocks sync state: {blocks:?}"); + let is_sync = !invalidated && blocks.iter().all(|x| *x == max); + sync_tx.send_modify(|flag| *flag = is_sync); + } + Some(tx) = invalidate_rx.recv() => { + invalidated = true; + sync_tx.send_modify(|flag| *flag = false); + let _ = tx.send(()).unwrap(); + } + } + } + }); + + Self { + invalidate_tx, + sync_tx: sync_clone, + _children: children, + } + } + + async fn invalidate(&self) { + let (tx, rx) = oneshot::channel(); + self.invalidate_tx.send(tx).await.unwrap(); + rx.await.unwrap(); + } + + async fn sync(&self) { + let mut recv = self.sync_tx.subscribe(); + loop { + if *recv.borrow_and_update() { + return; + } + recv.changed().await.unwrap() + } + } +} diff --git a/crates/iroha/tests/integration/extra_functional/restart_peer.rs b/crates/iroha/tests/integration/extra_functional/restart_peer.rs index 2ec0cd0a0d9..b26357582a1 100644 --- a/crates/iroha/tests/integration/extra_functional/restart_peer.rs +++ b/crates/iroha/tests/integration/extra_functional/restart_peer.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use eyre::Result; use iroha::{ client::{self}, @@ -57,7 +55,7 @@ async fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> { let peer = network.peer(); peer.start(network.config().clone(), Some(network.genesis())) .await; - timeout(Duration::from_secs(1), peer.once_block(1)).await?; + timeout(network.peer_startup_timeout(), peer.once_block(1)).await?; let client = peer.client(); let assets = spawn_blocking(move || { diff --git a/crates/iroha/tests/integration/extra_functional/unregister_peer.rs b/crates/iroha/tests/integration/extra_functional/unregister_peer.rs index 23ad92ef82b..83297f8e6e4 100644 --- a/crates/iroha/tests/integration/extra_functional/unregister_peer.rs +++ b/crates/iroha/tests/integration/extra_functional/unregister_peer.rs @@ -11,7 +11,10 @@ use nonzero_ext::nonzero; #[test] fn network_stable_after_add_and_after_remove_peer() -> Result<()> { // Given a network - let (mut network, rt) = NetworkBuilder::new().with_peers(4).start_blocking()?; + let (mut network, rt) = NetworkBuilder::new() + .with_default_pipeline_time() + .with_peers(4) + .start_blocking()?; let main_client = network.client(); let (account_id, asset_definition_id) = init(&main_client)?; rt.block_on(async { network.ensure_blocks(2).await })?; @@ -60,7 +63,7 @@ fn network_stable_after_add_and_after_remove_peer() -> Result<()> { numeric!(300), )?; // But not on the unregistered peer's network. - std::thread::sleep(network.pipeline_time() * 4); + std::thread::sleep(network.pipeline_time()); check_assets( &new_peer_client, &account_id, diff --git a/crates/iroha/tests/integration/tx_history.rs b/crates/iroha/tests/integration/tx_history.rs index 259563ba71e..adcafebcf4d 100644 --- a/crates/iroha/tests/integration/tx_history.rs +++ b/crates/iroha/tests/integration/tx_history.rs @@ -8,7 +8,7 @@ use iroha_test_samples::ALICE_ID; use nonzero_ext::nonzero; #[test] -fn client_has_rejected_and_acepted_txs_should_return_tx_history() -> Result<()> { +fn client_has_rejected_and_accepted_txs_should_return_tx_history() -> Result<()> { let (network, _rt) = NetworkBuilder::new().start_blocking()?; let client = network.client(); @@ -38,7 +38,7 @@ fn client_has_rejected_and_acepted_txs_should_return_tx_history() -> Result<()> }; let instructions: Vec = vec![mint_asset.clone().into()]; let transaction = client.build_transaction(instructions, Metadata::default()); - client.submit_transaction(&transaction)?; + let _ = client.submit_transaction_blocking(&transaction); } let transactions = client diff --git a/crates/iroha_test_network/Cargo.toml b/crates/iroha_test_network/Cargo.toml index 2a84bdd0781..de196fb5a3e 100644 --- a/crates/iroha_test_network/Cargo.toml +++ b/crates/iroha_test_network/Cargo.toml @@ -37,3 +37,4 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] } fslock = "0.2.1" serde = { workspace = true, features = ["derive"] } derive_more = { workspace = true } +which = "6.0.3" diff --git a/crates/iroha_test_network/src/lib.rs b/crates/iroha_test_network/src/lib.rs index ae994d01512..4dbe44e0fb6 100644 --- a/crates/iroha_test_network/src/lib.rs +++ b/crates/iroha_test_network/src/lib.rs @@ -55,30 +55,12 @@ const PEER_KILL_TIMEOUT: Duration = Duration::from_secs(5); const SYNC_TIMEOUT: Duration = Duration::from_secs(30); fn iroha_bin() -> impl AsRef { - const DEFAULT: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../target/release/irohad"); - const ENV: &str = "IROHAD_EXEC"; - static PATH: OnceLock = OnceLock::new(); PATH.get_or_init(|| { - let path = std::env::var(ENV) - .ok() - .map(PathBuf::from) - .unwrap_or_else(|| PathBuf::from(DEFAULT)); - - if !std::fs::exists(&path).expect("should not fail") { - panic!( - " Cannot run `iroha_test_network` without `irohad` binary provided.\n \ - Looked at path: {}\n \ - Make sure you have pre-built `irohad` in `release` target\n \ - or provide a custom path via `{}` environment variable", - path.display(), - ENV - ); - } - - eprintln!("Using `irohad`: {}", path.display()); - path + which::which("irohad").expect( + "`irohad` binary should be available in $PATH before running `iroha_test_network`", + ) }) } @@ -151,7 +133,15 @@ impl Network { self.block_time + self.commit_time / 2 } - /// Alice's client for the first peer of the network + pub fn sync_timeout(&self) -> Duration { + SYNC_TIMEOUT + } + + pub fn peer_startup_timeout(&self) -> Duration { + PEER_START_TIMEOUT + } + + /// Get a client for a random peer in the network pub fn client(&self) -> Client { self.peer().client() } @@ -196,8 +186,7 @@ impl Network { /// If this doesn't happen within a timeout. pub async fn ensure_blocks(&self, height: u64) -> Result<&Self> { timeout( - // TODO: make pipeline time-dependent? - SYNC_TIMEOUT, + self.sync_timeout(), self.peers .iter() .filter(|x| x.is_running()) @@ -206,7 +195,9 @@ impl Network { .collect::>(), ) .await - .wrap_err_with(|| eyre!("Network hasn't reached the height of {height} block(s) within timeout ({SYNC_TIMEOUT:?})"))?; + .wrap_err_with(|| { + eyre!("Network hasn't reached the height of {height} block(s) within timeout") + })?; Ok(self) } @@ -661,7 +652,7 @@ impl NetworkPeer { // FIXME: should we wait for `Applied` event instead? if *block.status() == BlockStatus::Applied { let height = block.header().height().get(); - eprintln!("{log_prefix} block committed: {height}",); + eprintln!("{log_prefix} BlockStatus::Applied height={height}",); let _ = events_tx.send(PeerLifecycleEvent::BlockApplied { height }); block_height_tx.send_modify(|x| *x = Some(height)); } @@ -799,6 +790,10 @@ impl NetworkPeer { .await .expect("should not panic") } + + pub fn blocks(&self) -> watch::Receiver> { + self.block_height.subscribe() + } } /// Compare by ID