Skip to content

Commit

Permalink
[refactor] #4315: split pipeline events
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic committed Mar 27, 2024
1 parent 1f0e21d commit f28b04b
Show file tree
Hide file tree
Showing 89 changed files with 1,520 additions and 1,060 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ tokio-console http://127.0.0.1:5555

To optimize performance it's useful to profile iroha.

To do that you should compile iroha with `profiling` profile and with `profiling` feature:
To do that you should compile iroha with `profiling` profile and with `profiling` feature:

```bash
RUSTFLAGS="-C force-frame-pointers=on" cargo +nightly -Z build-std build --target your-desired-target --profile profiling --features profiling
Expand Down
2 changes: 1 addition & 1 deletion cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The results of the compilation can be found in `<IROHA REPO ROOT>/target/release

### Add features

To add optional features, use ``--features``. For example, to add the support for _dev_telemetry_, run:
To add optional features, use ``--features``. For example, to add the support for _dev telemetry_, run:

```bash
cargo build --release --features dev-telemetry
Expand Down
2 changes: 1 addition & 1 deletion cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Iroha {
});
let state = Arc::new(state);

let queue = Arc::new(Queue::from_config(config.queue));
let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone()));

#[cfg(feature = "telemetry")]
Self::start_telemetry(&logger, &config).await?;
Expand Down
7 changes: 3 additions & 4 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iroha_client::{
prelude::*,
},
};
use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use serde::Deserialize;
use test_network::*;

Expand Down Expand Up @@ -172,13 +173,11 @@ impl MeasurerUnit {
fn spawn_event_counter(&self) -> thread::JoinHandle<Result<()>> {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.for_entity(PipelineEntityKind::Block)
.for_status(PipelineStatusKind::Committed);
let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
let mut event_iterator = listener.listen_for_events(event_filter)?;
let mut event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for i in 1..=blocks_expected {
let _event = event_iterator.next().expect("Event stream closed")?;
Expand Down
98 changes: 61 additions & 37 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
use iroha_data_model::query::QueryOutputBox;
use iroha_data_model::{
events::pipeline::{
BlockEventFilter, BlockStatus, PipelineEventBox, PipelineEventFilterBox,
TransactionEventFilter, TransactionStatus,
},
query::QueryOutputBox,
};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
Expand Down Expand Up @@ -603,14 +609,19 @@ impl Client {

rt.block_on(async {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())),
)
.await
.map_err(Into::into)
.and_then(std::convert::identity)
.wrap_err("Failed to establish event listener connection");
let filters = vec![
TransactionEventFilter::default().for_hash(hash).into(),
PipelineEventFilterBox::from(
BlockEventFilter::default().for_status(BlockStatus::Applied),
),
];

let event_iterator_result =
tokio::time::timeout_at(deadline, self.listen_for_events_async(filters))
.await
.map_err(Into::into)
.and_then(std::convert::identity)
.wrap_err("Failed to establish event listener connection");
let _send_result = init_sender.send(event_iterator_result.is_ok());
event_iterator_result?
};
Expand All @@ -631,17 +642,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf<SignedTransaction>,
) -> Result<HashOf<SignedTransaction>> {
let mut block_height = None;

while let Some(event) = event_iterator.next().await {
if let Event::Pipeline(this_event) = event? {
match this_event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
if let EventBox::Pipeline(this_event) = event? {
match this_event {
PipelineEventBox::Transaction(transaction_event) => {
match transaction_event.status() {
TransactionStatus::Queued => {}
TransactionStatus::Approved => {
block_height = transaction_event.block_height;
}
TransactionStatus::Rejected(reason) => {
return Err((Clone::clone(&**reason)).into());
}
TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
}
}
PipelineEventBox::Block(block_event) => {
if Some(block_event.header().height()) == block_height {
if let BlockStatus::Applied = block_event.status() {
return Ok(hash);
}
}
}
PipelineStatus::Committed => return Ok(hash),
}
}
}

Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
Expand Down Expand Up @@ -903,11 +931,9 @@ impl Client {
/// - Forwards from [`events_api::EventIterator::new`]
pub fn listen_for_events(
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<impl Iterator<Item = Result<Event>>> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);
events_api::EventIterator::new(self.events_handler(event_filter)?)
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>>,
) -> Result<impl Iterator<Item = Result<EventBox>>> {
events_api::EventIterator::new(self.events_handler(event_filters)?)
}

/// Connect asynchronously (through `WebSocket`) to listen for `Iroha` `pipeline` and `data` events.
Expand All @@ -917,11 +943,9 @@ impl Client {
/// - Forwards from [`events_api::AsyncEventStream::new`]
pub async fn listen_for_events_async(
&self,
event_filter: impl Into<EventFilterBox> + Send,
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>> + Send,
) -> Result<AsyncEventStream> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter, "Async listening with");
events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
events_api::AsyncEventStream::new(self.events_handler(event_filters)?).await
}

/// Constructs an Events API handler. With it, you can use any WS client you want.
Expand All @@ -931,10 +955,10 @@ impl Client {
#[inline]
pub fn events_handler(
&self,
event_filter: impl Into<EventFilterBox>,
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>>,
) -> Result<events_api::flow::Init> {
events_api::flow::Init::new(
event_filter.into(),
event_filters.into_iter().map(Into::into).collect(),
self.headers.clone(),
self.torii_url
.join(torii_uri::SUBSCRIPTION)
Expand Down Expand Up @@ -1237,12 +1261,12 @@ pub mod events_api {

/// Initialization struct for Events API flow.
pub struct Init {
/// Event filter
filter: EventFilterBox,
/// HTTP request headers
headers: HashMap<String, String>,
/// TORII URL
url: Url,
/// HTTP request headers
headers: HashMap<String, String>,
/// Event filter
filters: Vec<EventFilterBox>,
}

impl Init {
Expand All @@ -1252,14 +1276,14 @@ pub mod events_api {
/// Fails if [`transform_ws_url`] fails.
#[inline]
pub(in super::super) fn new(
filter: EventFilterBox,
filters: Vec<EventFilterBox>,
headers: HashMap<String, String>,
url: Url,
) -> Result<Self> {
Ok(Self {
filter,
headers,
url: transform_ws_url(url)?,
headers,
filters,
})
}
}
Expand All @@ -1269,12 +1293,12 @@ pub mod events_api {

fn init(self) -> InitData<R, Self::Next> {
let Self {
filter,
headers,
url,
headers,
filters,
} = self;

let msg = EventSubscriptionRequest::new(filter).encode();
let msg = EventSubscriptionRequest::new(filters).encode();
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
}
}
Expand All @@ -1284,7 +1308,7 @@ pub mod events_api {
pub struct Events;

impl FlowEvents for Events {
type Event = crate::data_model::prelude::Event;
type Event = crate::data_model::prelude::EventBox;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
Expand Down
2 changes: 1 addition & 1 deletion client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroha_config::{
base,
base::{FromEnv, StdEnv, UnwrapPartial},
};
use iroha_crypto::prelude::*;
use iroha_crypto::KeyPair;
use iroha_data_model::{prelude::*, ChainId};
use iroha_primitives::small::SmallStr;
use serde::{Deserialize, Serialize};
Expand Down
6 changes: 3 additions & 3 deletions client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub mod ws {
/// use eyre::Result;
/// use url::Url;
/// use iroha_client::{
/// data_model::prelude::Event,
/// data_model::prelude::EventBox,
/// client::events_api::flow as events_api_flow,
/// http::{
/// ws::conn_flow::{Events, Init, InitData},
Expand Down Expand Up @@ -203,7 +203,7 @@ pub mod ws {
/// }
/// }
///
/// fn collect_5_events(flow: events_api_flow::Init) -> Result<Vec<Event>> {
/// fn collect_5_events(flow: events_api_flow::Init) -> Result<Vec<EventBox>> {
/// // Constructing initial flow data
/// let InitData {
/// next: flow,
Expand All @@ -216,7 +216,7 @@ pub mod ws {
/// stream.send(first_message);
///
/// // And now we are able to collect events
/// let mut events: Vec<Event> = Vec::with_capacity(5);
/// let mut events: Vec<EventBox> = Vec::with_capacity(5);
/// while events.len() < 5 {
/// let msg = stream.get_next();
/// let event = flow.message(msg)?;
Expand Down
13 changes: 7 additions & 6 deletions client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
asset::{AssetId, AssetValue, AssetValueType},
isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError},
transaction::error::TransactionRejectionReason,
};
use serde_json::json;
use test_network::*;
Expand Down Expand Up @@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() {
.expect_err("Should be rejected due to non integer value");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert_eq!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate(
InstructionEvaluationError::Type(TypeError::from(Mismatch {
&TransactionRejectionReason::Validation(ValidationFail::InstructionFailed(
InstructionExecutionError::Evaluate(InstructionEvaluationError::Type(
TypeError::from(Mismatch {
expected: AssetValueType::Numeric(NumericSpec::integer()),
actual: AssetValueType::Numeric(NumericSpec::fractional(2))
}))
})
))
))
);
Expand Down
9 changes: 4 additions & 5 deletions client/tests/integration/domain_owner_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::{account::SignatureCheckCondition, prelude::*},
};
use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;

Expand Down Expand Up @@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> {
.expect_err("Tx should fail due to permissions");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));

// "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner
Expand Down
4 changes: 2 additions & 2 deletions client/tests/integration/events/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fn transaction_execution_should_produce_events(
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
let event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for event in event_iterator {
event_sender.send(event)?
Expand Down Expand Up @@ -184,7 +184,7 @@ fn produce_multiple_events() -> Result<()> {
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
let event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for event in event_iterator {
event_sender.send(event)?
Expand Down
16 changes: 6 additions & 10 deletions client/tests/integration/events/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client.listen_for_events(
TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Success),
)?;
let mut event_it = thread_client.listen_for_events([TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Success)])?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down Expand Up @@ -79,11 +77,9 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client.listen_for_events(
TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Failure),
)?;
let mut event_it = thread_client.listen_for_events([TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Failure)])?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down
Loading

0 comments on commit f28b04b

Please sign in to comment.