diff --git a/Cargo.lock b/Cargo.lock index f2a4861c117..adf871a66ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3691,6 +3691,7 @@ dependencies = [ "iroha_telemetry", "iroha_test_samples", "iroha_wasm_builder", + "nix 0.29.0", "parity-scale-codec", "rand", "serde", @@ -4339,6 +4340,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -4774,7 +4787,7 @@ dependencies = [ "findshlibs", "libc", "log", - "nix", + "nix 0.26.4", "once_cell", "parking_lot", "protobuf", diff --git a/crates/iroha/tests/integration/extra_functional/restart_peer.rs b/crates/iroha/tests/integration/extra_functional/restart_peer.rs index e91dddb60c3..b6681c4b645 100644 --- a/crates/iroha/tests/integration/extra_functional/restart_peer.rs +++ b/crates/iroha/tests/integration/extra_functional/restart_peer.rs @@ -20,24 +20,24 @@ async fn restarted_peer_should_restore_its_state() -> Result<()> { let client = peer_a.client(); let asset_definition_clone = asset_definition_id.clone(); spawn_blocking(move || { - let tx = client.build_transaction( - [ - InstructionBox::from(Register::asset_definition(AssetDefinition::numeric( + client + .submit_all_blocking::([ + Register::asset_definition(AssetDefinition::numeric( asset_definition_clone.clone(), - ))), + )) + .into(), Mint::asset_numeric( quantity, AssetId::new(asset_definition_clone, ALICE_ID.clone()), ) .into(), - ], - <_>::default(), - ); - client.submit_transaction_blocking(&tx).unwrap(); + ]) + .unwrap(); }) .await?; - network.ensure_blocks(2).await?; + + // shutdown all network.shutdown().await; // restart another one, **without a genesis** even diff --git a/crates/iroha/tests/integration/extra_functional/unregister_peer.rs b/crates/iroha/tests/integration/extra_functional/unregister_peer.rs index 83297f8e6e4..7e345562653 100644 --- a/crates/iroha/tests/integration/extra_functional/unregister_peer.rs +++ b/crates/iroha/tests/integration/extra_functional/unregister_peer.rs @@ -1,4 +1,7 @@ -use eyre::{eyre, Result}; +use std::time::Duration; + +use assert_matches::assert_matches; +use eyre::Result; use iroha::{ client, client::Client, @@ -7,96 +10,107 @@ use iroha::{ use iroha_test_network::{NetworkBuilder, NetworkPeer}; use iroha_test_samples::gen_account_in; use nonzero_ext::nonzero; +use tokio::{task::spawn_blocking, time::sleep}; + +#[tokio::test] +async fn network_stable_after_add_and_after_remove_peer() -> Result<()> { + const PIPELINE_TIME: Duration = Duration::from_millis(300); -#[test] -fn network_stable_after_add_and_after_remove_peer() -> Result<()> { // Given a network - let (mut network, rt) = NetworkBuilder::new() - .with_default_pipeline_time() + let mut network = NetworkBuilder::new() + .with_pipeline_time(PIPELINE_TIME) .with_peers(4) - .start_blocking()?; - let main_client = network.client(); - let (account_id, asset_definition_id) = init(&main_client)?; - rt.block_on(async { network.ensure_blocks(2).await })?; + .with_genesis_instruction(SetParameter::new(Parameter::Block( + BlockParameter::MaxTransactions(nonzero!(1_u64)), + ))) + .start() + .await?; + let client = network.client(); - // When assets are minted - mint( - &main_client, - &asset_definition_id, - &account_id, - numeric!(100), - )?; - rt.block_on(async { network.ensure_blocks(3).await })?; + let (account, _account_keypair) = gen_account_in("domain"); + let asset_def: AssetDefinitionId = "xor#domain".parse()?; + { + let client = client.clone(); + let account = account.clone(); + let asset_def = asset_def.clone(); + spawn_blocking(move || { + client.submit_all_blocking::([ + Register::domain(Domain::new("domain".parse()?)).into(), + Register::account(Account::new(account)).into(), + Register::asset_definition(AssetDefinition::numeric(asset_def)).into(), + ]) + }) + .await??; // blocks=2 + } + // When assets are minted + mint(&client, &asset_def, &account, numeric!(100)).await?; + network.ensure_blocks(3).await?; // and a new peer is registered let new_peer = NetworkPeer::generate(); let new_peer_id = new_peer.id(); let new_peer_client = new_peer.client(); - network.add_peer(new_peer.clone()); - rt.block_on(async { new_peer.start(network.config(), None).await }); - main_client.submit_blocking(Register::peer(Peer::new(new_peer_id.clone())))?; - rt.block_on(async { network.ensure_blocks(4).await })?; + network.add_peer(&new_peer); + new_peer.start(network.config(), None).await; + { + let client = client.clone(); + let id = new_peer_id.clone(); + spawn_blocking(move || client.submit_blocking(Register::peer(Peer::new(id)))).await??; + } + network.ensure_blocks(4).await?; // Then the new peer should already have the mint result. - check_assets( - &new_peer_client, - &account_id, - &asset_definition_id, - numeric!(100), - )?; + assert_eq!( + find_asset(&new_peer_client, &account, &asset_def).await?, + numeric!(100) + ); - // Also, when a peer is unregistered - let remove_peer = Unregister::peer(new_peer_id); - main_client.submit_blocking(remove_peer)?; - rt.block_on(async { network.ensure_blocks(5).await })?; + // When a peer is unregistered + { + let client = client.clone(); + spawn_blocking(move || client.submit_blocking(Unregister::peer(new_peer_id))).await??; + // blocks=6 + } + network.remove_peer(&new_peer); // We can mint without an error. - mint( - &main_client, - &asset_definition_id, - &account_id, - numeric!(200), - )?; + mint(&client, &asset_def, &account, numeric!(200)).await?; // Assets are increased on the main network. - check_assets( - &main_client, - &account_id, - &asset_definition_id, - numeric!(300), - )?; + network.ensure_blocks(6).await?; + assert_eq!( + find_asset(&client, &account, &asset_def).await?, + numeric!(300) + ); // But not on the unregistered peer's network. - std::thread::sleep(network.pipeline_time()); - check_assets( - &new_peer_client, - &account_id, - &asset_definition_id, - numeric!(100), - )?; + sleep(PIPELINE_TIME * 5).await; + assert_eq!( + find_asset(&new_peer_client, &account, &asset_def).await?, + numeric!(100) + ); Ok(()) } -fn check_assets( +async fn find_asset( client: &Client, - account_id: &AccountId, - asset_definition_id: &AssetDefinitionId, - quantity: Numeric, -) -> Result<()> { - let account_id = account_id.clone(); - let assets = client - .query(client::asset::all()) - .filter_with(|asset| asset.id.account.eq(account_id.clone())) - .execute_all()?; + account: &AccountId, + asset_definition: &AssetDefinitionId, +) -> Result { + let account_id = account.clone(); + let client = client.clone(); + let asset = spawn_blocking(move || { + client + .query(client::asset::all()) + .filter_with(|asset| asset.id.account.eq(account_id.clone())) + .execute_all() + }) + .await?? + .into_iter() + .find(|asset| asset.id().definition() == asset_definition) + .expect("asset should be there"); - if assets.iter().any(|asset| { - asset.id().definition() == asset_definition_id - && *asset.value() == AssetValue::Numeric(quantity) - }) { - Ok(()) - } else { - Err(eyre!("assets mismatch")) - } + assert_matches!(asset.value(), AssetValue::Numeric(quantity) => Ok(quantity.clone())) } -fn mint( +async fn mint( client: &Client, asset_definition_id: &AssetDefinitionId, account_id: &AccountId, @@ -106,26 +120,7 @@ fn mint( quantity, AssetId::new(asset_definition_id.clone(), account_id.clone()), ); - client.submit_blocking(mint_asset)?; + let client = client.clone(); + spawn_blocking(move || client.submit_blocking(mint_asset)).await??; Ok(()) } - -fn init(client: &Client) -> Result<(AccountId, AssetDefinitionId)> { - let set_max_txns_in_block = SetParameter::new(Parameter::Block( - BlockParameter::MaxTransactions(nonzero!(1_u64)), - )); - - let create_domain = Register::domain(Domain::new("domain".parse()?)); - let (account_id, _account_keypair) = gen_account_in("domain"); - let create_account = Register::account(Account::new(account_id.clone())); - let asset_definition_id: AssetDefinitionId = "xor#domain".parse()?; - let create_asset = - Register::asset_definition(AssetDefinition::numeric(asset_definition_id.clone())); - client.submit_all_blocking::([ - set_max_txns_in_block.into(), - create_domain.into(), - create_account.into(), - create_asset.into(), - ])?; - Ok((account_id, asset_definition_id)) -} diff --git a/crates/iroha_test_network/Cargo.toml b/crates/iroha_test_network/Cargo.toml index de196fb5a3e..4b80aafce7f 100644 --- a/crates/iroha_test_network/Cargo.toml +++ b/crates/iroha_test_network/Cargo.toml @@ -38,3 +38,4 @@ fslock = "0.2.1" serde = { workspace = true, features = ["derive"] } derive_more = { workspace = true } which = "6.0.3" +nix = { version = "0.29.0", features = ["signal"] } diff --git a/crates/iroha_test_network/src/lib.rs b/crates/iroha_test_network/src/lib.rs index 1ec192fc93c..b69cb521cae 100644 --- a/crates/iroha_test_network/src/lib.rs +++ b/crates/iroha_test_network/src/lib.rs @@ -41,6 +41,7 @@ use tempfile::TempDir; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + process::Child, runtime::{self, Runtime}, sync::{broadcast, oneshot, watch, Mutex}, task::{spawn_blocking, JoinSet}, @@ -51,7 +52,7 @@ use toml::Table; const INSTANT_PIPELINE_TIME: Duration = Duration::from_millis(10); const DEFAULT_BLOCK_SYNC: Duration = Duration::from_millis(150); const PEER_START_TIMEOUT: Duration = Duration::from_secs(30); -const PEER_KILL_TIMEOUT: Duration = Duration::from_secs(5); +const PEER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); const SYNC_TIMEOUT: Duration = Duration::from_secs(30); fn iroha_bin() -> impl AsRef { @@ -87,8 +88,13 @@ pub struct Network { impl Network { /// Add a peer to the network. - pub fn add_peer(&mut self, peer: NetworkPeer) { - self.peers.push(peer); + pub fn add_peer(&mut self, peer: &NetworkPeer) { + self.peers.push(peer.clone()); + } + + /// Remove a peer from the network. + pub fn remove_peer(&mut self, peer: &NetworkPeer) { + self.peers.retain(|x| x != peer); } /// Access network peers @@ -180,7 +186,7 @@ impl Network { self.peers .iter() .filter(|peer| peer.is_running()) - .map(|peer| peer.kill()) + .map(|peer| peer.shutdown()) .collect::>() .collect::>() .await; @@ -403,7 +409,7 @@ impl Signatory { #[derive(Debug)] struct PeerRun { tasks: JoinSet<()>, - kill: oneshot::Sender<()>, + shutdown: oneshot::Sender<()>, } /// Lifecycle events of a peer @@ -594,30 +600,20 @@ impl NetworkPeer { }); } - let (kill_tx, kill_rx) = oneshot::channel::<()>(); - { - let log_prefix = log_prefix.clone(); - let is_running = self.is_running.clone(); - let events_tx = self.events.clone(); - let block_height_tx = self.block_height.clone(); - tasks.spawn(async move { - tokio::select! { - result = child.wait() => { - let status = result.expect("peer process waiting should not fail"); - eprintln!("{log_prefix} process terminated: {status}"); - let _ = events_tx.send(PeerLifecycleEvent::Terminated { status }); - } - _ = kill_rx => { - child.kill().await.expect("kill failed"); - eprintln!("{log_prefix} process killed"); - // TODO: child.wait? - let _ = events_tx.send(PeerLifecycleEvent::Killed); - } - }; - is_running.store(false, Ordering::Relaxed); - block_height_tx.send_modify(|x| *x = None); - }); - } + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let peer_exit = PeerExit { + child, + log_prefix: log_prefix.clone(), + is_running: self.is_running.clone(), + events: self.events.clone(), + block_height: self.block_height.clone(), + }; + tasks.spawn(async move { + if let Err(err) = peer_exit.monitor(shutdown_rx).await { + eprintln!("something went very bad during peer exit monitoring: {err}"); + panic!() + } + }); { let log_prefix = log_prefix.clone(); @@ -682,7 +678,7 @@ impl NetworkPeer { *run_guard = Some(PeerRun { tasks, - kill: kill_tx, + shutdown: shutdown_tx, }); } @@ -690,17 +686,16 @@ impl NetworkPeer { /// /// # Panics /// If peer was not started. - pub async fn kill(&self) { + pub async fn shutdown(&self) { let mut guard = self.run.lock().await; let Some(run) = (*guard).take() else { panic!("peer is not running, nothing to shut down"); }; if self.is_running() { - // TODO: send graceful shutdown signal? - let _ = run.kill.send(()); - timeout(PEER_KILL_TIMEOUT, run.tasks.join_all()) + let _ = run.shutdown.send(()); + timeout(PEER_SHUTDOWN_TIMEOUT, run.tasks.join_all()) .await - .expect("run-related tasks should terminate within timeout"); + .expect("run-related tasks should exit within timeout"); assert!(!self.is_running()); } } @@ -851,6 +846,58 @@ impl From for Box { } } +struct PeerExit { + child: Child, + log_prefix: String, + is_running: Arc, + events: broadcast::Sender, + block_height: watch::Sender>, +} + +impl PeerExit { + async fn monitor(mut self, shutdown: oneshot::Receiver<()>) -> Result<()> { + let status = tokio::select! { + status = self.child.wait() => status?, + _ = shutdown => self.shutdown_or_kill().await?, + }; + + eprintln!("{} {status}", self.log_prefix); + let _ = self.events.send(PeerLifecycleEvent::Terminated { status }); + self.is_running.store(false, Ordering::Relaxed); + self.block_height.send_modify(|x| *x = None); + + Ok(()) + } + + async fn shutdown_or_kill(&mut self) -> Result { + use nix::{sys::signal, unistd::Pid}; + const TIMEOUT: Duration = Duration::from_secs(5); + + eprintln!("{} sending SIGTERM", self.log_prefix); + signal::kill( + Pid::from_raw(self.child.id().ok_or(eyre!("race condition"))? as i32), + signal::Signal::SIGTERM, + ) + .wrap_err("failed to send SIGTERM")?; + + if let Ok(status) = timeout(TIMEOUT, self.child.wait()).await { + eprintln!("{} exited gracefully", self.log_prefix); + return status.wrap_err("wait failure"); + }; + eprintln!( + "{} process didn't terminate after {TIMEOUT:?}, killing", + self.log_prefix + ); + timeout(TIMEOUT, async move { + self.child.kill().await.expect("not a recoverable failure"); + self.child.wait().await + }) + .await + .wrap_err("didn't terminate after SIGKILL")? + .wrap_err("wait failure") + } +} + #[cfg(test)] mod tests { use super::*;