Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add listen timeouts to iroha cli #5241

Merged
merged 20 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/iroha_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ serde = { workspace = true }
serde_json = { workspace = true }
erased-serde = "0.4.5"
supports-color = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
futures = { workspace = true }

[build-dependencies]
vergen = { version = "8.3.1", default-features = false }
color-eyre = "0.6.3"

[dev-dependencies]
iroha_test_network = { workspace = true }
aoyako marked this conversation as resolved.
Show resolved Hide resolved
203 changes: 179 additions & 24 deletions crates/iroha_cli/src/main.rs
aoyako marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use std::{
use erased_serde::Serialize;
use error_stack::{fmt::ColorMode, IntoReportCompat, ResultExt};
use eyre::{eyre, Error, Result, WrapErr};
use futures::TryStreamExt;
use iroha::{client::Client, config::Config, data_model::prelude::*};
use iroha_primitives::json::Json;
use thiserror::Error;
use tokio::{runtime::Runtime, time::Duration};

/// Re-usable clap `--metadata <PATH>` (`-m`) argument.
/// Should be combined with `#[command(flatten)]` attr.
Expand Down Expand Up @@ -309,37 +311,85 @@ mod events {
#[derive(clap::Subcommand, Debug, Clone, Copy)]
pub enum Args {
/// Gets block pipeline events
BlockPipeline,
BlockPipeline(Options),
/// Gets transaction pipeline events
TransactionPipeline,
TransactionPipeline(Options),
/// Gets data events
Data,
Data(Options),
/// Get execute trigger events
ExecuteTrigger,
ExecuteTrigger(Options),
/// Get trigger completed events
TriggerCompleted,
TriggerCompleted(Options),
}

#[derive(clap::Args, Debug, Clone, Copy)]
pub struct Options {
/// Wait timeout in seconds
#[clap(short, long)]
timeout: Option<u64>,
}
aoyako marked this conversation as resolved.
Show resolved Hide resolved

impl RunArgs for Args {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
match self {
Args::TransactionPipeline => listen(TransactionEventFilter::default(), context),
Args::BlockPipeline => listen(BlockEventFilter::default(), context),
Args::Data => listen(DataEventFilter::Any, context),
Args::ExecuteTrigger => listen(ExecuteTriggerEventFilter::new(), context),
Args::TriggerCompleted => listen(TriggerCompletedEventFilter::new(), context),
Args::TransactionPipeline(Options { timeout }) => listen(
TransactionEventFilter::default(),
context,
timeout.map(Duration::from_secs),
),
Args::BlockPipeline(Options { timeout }) => listen(
BlockEventFilter::default(),
context,
timeout.map(Duration::from_secs),
),
Args::Data(Options { timeout }) => listen(
DataEventFilter::Any,
context,
timeout.map(Duration::from_secs),
),
Args::ExecuteTrigger(Options { timeout }) => listen(
ExecuteTriggerEventFilter::new(),
context,
timeout.map(Duration::from_secs),
),
Args::TriggerCompleted(Options { timeout }) => listen(
TriggerCompletedEventFilter::new(),
context,
timeout.map(Duration::from_secs),
),
}
}
}

fn listen(filter: impl Into<EventFilterBox>, context: &mut dyn RunContext) -> Result<()> {
pub fn listen(
filter: impl Into<EventFilterBox>,
context: &mut dyn RunContext,
timeout: Option<Duration>,
) -> Result<()> {
let filter = filter.into();
let client = context.client_from_config();
eprintln!("Listening to events with filter: {filter:?}");
client
.listen_for_events([filter])
.wrap_err("Failed to listen for events.")?
.try_for_each(|event| context.print_data(&event?))?;

if let Some(timeout) = timeout {
eprintln!("Listening to events with filter: {filter:?} and timeout: {timeout:?}");
let rt = Runtime::new().wrap_err("Failed to create runtime.")?;
rt.block_on(async {
let mut stream = client
.listen_for_events_async([filter])
.await
.expect("Failed to listen for events.");
while let Ok(event) = tokio::time::timeout(timeout, stream.try_next()).await {
context.print_data(&event?)?;
}
eprintln!("Timeout period has expired.");
Result::<()>::Ok(())
})?;
} else {
eprintln!("Listening to events with filter: {filter:?}");
client
.listen_for_events([filter])
.wrap_err("Failed to listen for events.")?
.try_for_each(|event| context.print_data(&event?))?;
}
Ok(())
}
}
Expand All @@ -354,22 +404,46 @@ mod blocks {
pub struct Args {
/// Block height from which to start streaming blocks
height: NonZeroU64,

/// Wait timeout in seconds
DCNick3 marked this conversation as resolved.
Show resolved Hide resolved
#[clap(short, long)]
timeout: Option<u64>,
}

impl RunArgs for Args {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
let Args { height } = self;
listen(height, context)
let Args { height, timeout } = self;
listen(height, context, timeout.map(Duration::from_secs))
}
}

fn listen(height: NonZeroU64, context: &mut dyn RunContext) -> Result<()> {
pub fn listen(
height: NonZeroU64,
context: &mut dyn RunContext,
timeout: Option<Duration>,
) -> Result<()> {
let client = context.client_from_config();
eprintln!("Listening to blocks from height: {height}");
client
.listen_for_blocks(height)
.wrap_err("Failed to listen for blocks.")?
.try_for_each(|event| context.print_data(&event?))?;
if let Some(timeout) = timeout {
eprintln!("Listening to blocks from height: {height} and timeout: {timeout:?}");
let rt = Runtime::new().wrap_err("Failed to create runtime.")?;
rt.block_on(async {
let mut stream = client
.listen_for_blocks_async(height)
.await
.expect("Failed to listen for blocks.");
while let Ok(event) = tokio::time::timeout(timeout, stream.try_next()).await {
context.print_data(&event?)?;
}
eprintln!("Timeout period has expired.");
Result::<()>::Ok(())
})?;
} else {
eprintln!("Listening to blocks from height: {height}");
client
.listen_for_blocks(height)
.wrap_err("Failed to listen for blocks.")?
.try_for_each(|event| context.print_data(&event?))?;
}
Ok(())
}
}
Expand Down Expand Up @@ -1377,8 +1451,13 @@ mod multisig {
Ok(())
}
}

#[cfg(test)]
mod tests {
use iroha::crypto::KeyPair;
use iroha_test_network::*;
aoyako marked this conversation as resolved.
Show resolved Hide resolved
use serde_json::to_string;

use super::*;

#[test]
Expand All @@ -1403,4 +1482,80 @@ mod tests {
let json_str = r#"{"Vec":[{"String":"a"},{"String":"b"}]}"#;
case!(json_str, serde_json::from_str(json_str).unwrap());
}

struct MockContext {
network: Network,
config: Config,
datastream: String,
}

impl MockContext {
fn test_config() -> Config {
return Config{
chain: ChainId::from("00000000-0000-0000-0000-000000000000"),
account: "ed0120CE7FA46C9DCE7EA4B125E2E36BDB63EA33073E7590AC92816AE1E861B7048B03@wonderland".parse().expect("Can't parse mock account"),
key_pair: KeyPair::random(),
basic_auth: None,
torii_api_url: "http://127.0.0.1:8080/".parse().expect("Can't parse mock url"),
transaction_ttl: Duration::from_millis(100_000),
transaction_status_timeout: Duration::from_millis(100_000),
transaction_add_nonce: false,
};
}
}

impl RunContext for MockContext {
fn configuration(&self) -> &Config {
return &self.config;
}

fn client_from_config(&self) -> Client {
self.network.client()
}

fn print_data(&mut self, data: &dyn Serialize) -> Result<()> {
self.datastream.push_str(&to_string(data)?);
Ok(())
}
}

#[test]
fn listen_events_timeouts() {
let (network, _rt) = NetworkBuilder::new()
.start_blocking()
.expect("Failed to start network.");
let mut tc = MockContext {
network,
config: MockContext::test_config(),
datastream: String::new(),
};

assert!(events::listen(
ExecuteTriggerEventFilter::new(),
&mut tc,
Some(Duration::from_secs(1))
)
.is_ok());
}

#[test]
fn listen_blocks_timeouts() {
use std::num::NonZeroU64;

let (network, _rt) = NetworkBuilder::new()
.start_blocking()
.expect("Failed to start network.");
let mut tc = MockContext {
network,
config: MockContext::test_config(),
datastream: String::new(),
};

assert!(blocks::listen(
NonZeroU64::new(1).expect("Blocks cannot be zero"),
&mut tc,
Some(Duration::from_secs(1))
)
.is_ok());
}
}
Loading