Skip to content

Commit

Permalink
test: stabilise tests more; use which to find irohad
Browse files Browse the repository at this point in the history
Signed-off-by: 0x009922 <[email protected]>
  • Loading branch information
0x009922 committed Sep 27, 2024
1 parent 70120b8 commit 657617d
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 100 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/iroha2-dev-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ jobs:
path: ${{ env.DOCKER_COMPOSE_PATH }}
- name: Run tests, with coverage
run: |
cargo build --bin irohad
export IROHAD_EXEC=$(realpath ./target/debug/irohad)
mold --run cargo install --path ./crates/irohad
mold --run cargo test --all-features --no-fail-fast --workspace
env:
RUSTFLAGS: "-C instrument-coverage"
Expand Down
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/iroha/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ impl Client {
Self::listen_for_tx_confirmation_loop(&mut event_iterator, hash),
)
.await
.wrap_err_with(|| {
eyre!(
"haven't got tx confirmation within {:?} (configured with `transaction_status_timeout`)",
self.transaction_status_timeout
)
})
.map_err(Into::into)
.and_then(std::convert::identity);
event_iterator.close().await;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{iter::once, time::Duration};
use std::iter::once;

use assert_matches::assert_matches;
use eyre::Result;
Expand All @@ -12,7 +12,6 @@ use iroha_test_network::*;
use rand::{prelude::IteratorRandom, seq::SliceRandom, thread_rng};
use tokio::{task::spawn_blocking, time::timeout};

// #[ignore = "ignore, more in #2851"]
#[tokio::test]
async fn connected_peers_with_f_2_1_2() -> Result<()> {
connected_peers_with_f(2).await
Expand Down Expand Up @@ -41,7 +40,7 @@ async fn register_new_peer() -> Result<()> {
let client = network.client();
spawn_blocking(move || client.submit_blocking(register)).await??;

timeout(Duration::from_secs(2), peer.once_block(2)).await?;
timeout(network.sync_timeout(), peer.once_block(2)).await?;

Ok(())
}
Expand All @@ -65,7 +64,7 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> {
let unregister_peer = Unregister::peer(removed_peer.id());
spawn_blocking(move || client.submit_blocking(unregister_peer)).await??;
timeout(
network.pipeline_time() * 10,
network.sync_timeout(),
randomized_peers
.iter()
.map(|peer| peer.once_block(2))
Expand All @@ -90,7 +89,6 @@ async fn connected_peers_with_f(faults: usize) -> Result<()> {
spawn_blocking(move || client.submit_blocking(register_peer)).await??;
network.ensure_blocks(3).await?;

// timeout(Duration::from_secs(2), removed_peer.once_block_height(3)).await?;
assert_peers_status(
randomized_peers
.iter()
Expand Down
7 changes: 2 additions & 5 deletions crates/iroha/tests/integration/extra_functional/genesis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use eyre::Context;
use futures_util::{stream::FuturesUnordered, StreamExt};
use iroha::data_model::{
Expand Down Expand Up @@ -27,7 +25,7 @@ async fn multiple_genesis_4_peers_2_genesis() -> eyre::Result<()> {
async fn multiple_genesis_peers(n_peers: usize, n_genesis_peers: usize) -> eyre::Result<()> {
let network = NetworkBuilder::new().with_peers(n_peers).build();
timeout(
Duration::from_secs(5),
network.peer_startup_timeout(),
network
.peers()
.iter()
Expand All @@ -43,8 +41,7 @@ async fn multiple_genesis_peers(n_peers: usize, n_genesis_peers: usize) -> eyre:
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>(),
)
.await
.expect("should start");
.await?;

let client = network.client();
let domain_id: DomainId = "foo".parse().expect("Valid");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{num::NonZero, time::Duration};

use eyre::Result;
use iroha::{
client::{self},
Expand All @@ -6,40 +8,30 @@ use iroha::{
use iroha_data_model::parameter::BlockParameter;
use iroha_test_network::*;
use iroha_test_samples::gen_account_in;
use nonzero_ext::nonzero;
use tokio::task::spawn_blocking;
use rand::{prelude::IteratorRandom, thread_rng};
use tokio::{
sync::{mpsc, oneshot, watch},
task::{spawn_blocking, JoinSet},
time::{sleep, timeout},
};

/// Bombard random peers with random mints in multiple rounds, ensuring they all have
/// a consistent total amount in the end.
#[tokio::test]
async fn multiple_blocks_created() -> Result<()> {
const N_BLOCKS: u64 = 500;
const N_ROUNDS: u64 = 50;
const N_MAX_TXS_PER_BLOCK: u64 = 10;

// Given
let network = NetworkBuilder::new()
.with_peers(4)
.with_genesis_instruction(SetParameter(Parameter::Block(
BlockParameter::MaxTransactions(nonzero!(1u64)),
BlockParameter::MaxTransactions(NonZero::new(N_MAX_TXS_PER_BLOCK).expect("valid")),
)))
.with_pipeline_time(Duration::from_secs(1))
.start()
.await?;

let mut peers = network.peers().iter();
let peer_a = peers.next().unwrap();
let peer_b = peers.next().unwrap();
let client_a = peer_a.client();
assert_ne!(peer_a, peer_b);

// let mut events = peer_b.events();
// tokio::spawn(async move {
// while let Ok(e) = events.recv().await {
// match e {
// PeerLifecycleEvent::LogBlockCommitted { height } => {
// println!("Last peer block committed: {height}")
// }
// _ => {}
// }
// }
// });

let create_domain = Register::domain(Domain::new("domain".parse()?));
let (account_id, _account_keypair) = gen_account_in("domain");
let create_account = Register::account(Account::new(account_id.clone()));
Expand All @@ -48,9 +40,9 @@ async fn multiple_blocks_created() -> Result<()> {
Register::asset_definition(AssetDefinition::numeric(asset_definition_id.clone()));

{
let client_one = client_a.clone();
let client = network.client();
spawn_blocking(move || {
client_one.clone().submit_all::<InstructionBox>([
client.clone().submit_all::<InstructionBox>([
create_domain.into(),
create_account.into(),
create_asset.into(),
Expand All @@ -59,40 +51,141 @@ async fn multiple_blocks_created() -> Result<()> {
.await??;
}

const INIT_BLOCKS: u64 = 2;
network.ensure_blocks(INIT_BLOCKS).await?;
network.ensure_blocks(2).await?;

let blocks = BlocksTracker::start(&network).await;

// When
let total: u128 = (0..N_BLOCKS as u128).sum();
let definition = asset_definition_id.clone();
let account = account_id.clone();
spawn_blocking(move || {
for i in 0..N_BLOCKS {
client_a
.submit(Mint::asset_numeric(
Numeric::new(i as u128, 0),
AssetId::new(definition.clone(), account.clone()),
))
.expect("submit cannot fail");
println!("Submitted {}/{N_BLOCKS} txs", i + 1);
let mut total: u128 = 0;
for _ in 1..=N_ROUNDS {
blocks.invalidate().await;

let txs = (1..=N_MAX_TXS_PER_BLOCK)
.choose(&mut thread_rng())
.expect("there is a room to choose from");
println!("submitting {txs} transactions on random peers");
for _ in 0..txs {
let value = (0..999_999)
.choose(&mut thread_rng())
.expect("there is quite a room to choose from");
total = total + value;

let client = network.client();
let tx = client.build_transaction(
[Mint::asset_numeric(
Numeric::new(value, 0),
AssetId::new(asset_definition_id.clone(), account_id.clone()),
)],
<_>::default(),
);
spawn_blocking(move || client.submit_transaction(&tx)).await??;
}
})
.await?;
network.ensure_blocks(INIT_BLOCKS + N_BLOCKS).await?;

// Then
let client_two = peer_b.client();
let asset = spawn_blocking(move || {
client_two
.query(client::asset::all())
.filter_with(|asset| asset.id.account.eq(account_id))
.execute_all()
})
.await??
.into_iter()
.find(|asset| asset.id().definition() == &asset_definition_id)
.expect("should exist");
assert_eq!(*asset.value(), AssetValue::Numeric(Numeric::new(total, 0)));

timeout(network.sync_timeout(), blocks.sync()).await?;
}

// ensuring all have the same total
sleep(Duration::from_secs(2)).await;
println!("all peers should have total={total}");
let expected_value = AssetValue::Numeric(Numeric::new(total, 0));
for peer in network.peers() {
let client = peer.client();
let expected_value = expected_value.clone();
let account_id = account_id.clone();
let definition = asset_definition_id.clone();
let assets = spawn_blocking(move || {
client
.query(client::asset::all())
.filter_with(|asset| {
asset.id.account.eq(account_id) & asset.id.definition_id.eq(definition)
})
.execute_all()
})
.await??;
assert_eq!(assets.len(), 1);
let asset = assets.into_iter().next().unwrap();
assert_eq!(*asset.value(), expected_value);
}

Ok(())
}

struct BlocksTracker {
invalidate_tx: mpsc::Sender<oneshot::Sender<()>>,
sync_tx: watch::Sender<bool>,
_children: JoinSet<()>,
}

impl BlocksTracker {
async fn start(network: &Network) -> Self {
let mut children = JoinSet::new();

let (block_tx, mut block_rx) = mpsc::channel::<(u64, usize)>(10);
for (i, peer) in network.peers().iter().cloned().enumerate() {
let tx = block_tx.clone();
children.spawn(async move {
let mut recv = peer.blocks();
loop {
let value = *recv.borrow_and_update();
if let Some(value) = value {
let _ = tx.send((value, i)).await;
}
if let Err(_) = recv.changed().await {
break;
}
}
});
}

let peers_count = network.peers().len();
let (invalidate_tx, mut invalidate_rx) = mpsc::channel::<oneshot::Sender<()>>(1);
let (sync_tx, _sync_rx) = watch::channel(false);
let sync_clone = sync_tx.clone();
children.spawn(async move {
let mut max = 0;
let mut invalidated = false;
let mut blocks = vec![0; peers_count];
loop {
tokio::select! {
Some((height, i)) = block_rx.recv() => {
*blocks.get_mut(i).unwrap() = height;
if height > max {
invalidated = false;
max = height;
}
// println!("blocks sync state: {blocks:?}");
let is_sync = !invalidated && blocks.iter().all(|x| *x == max);
sync_tx.send_modify(|flag| *flag = is_sync);
}
Some(tx) = invalidate_rx.recv() => {
invalidated = true;
sync_tx.send_modify(|flag| *flag = false);
let _ = tx.send(()).unwrap();
}
}
}
});

Self {
invalidate_tx,
sync_tx: sync_clone,
_children: children,
}
}

async fn invalidate(&self) {
let (tx, rx) = oneshot::channel();
self.invalidate_tx.send(tx).await.unwrap();
rx.await.unwrap();
}

async fn sync(&self) {
let mut recv = self.sync_tx.subscribe();
loop {
if *recv.borrow_and_update() {
return;
}
recv.changed().await.unwrap()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use eyre::Result;
use iroha::{
client::{self},
Expand Down Expand Up @@ -57,7 +55,7 @@ async fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> {
let peer = network.peer();
peer.start(network.config().clone(), Some(network.genesis()))
.await;
timeout(Duration::from_secs(1), peer.once_block(1)).await?;
timeout(network.peer_startup_timeout(), peer.once_block(1)).await?;

let client = peer.client();
let assets = spawn_blocking(move || {
Expand Down
Loading

0 comments on commit 657617d

Please sign in to comment.