Skip to content

Commit

Permalink
[refactor] #4315: split pipeline events
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic committed Mar 14, 2024
1 parent be0e67f commit 4b628a6
Show file tree
Hide file tree
Showing 23 changed files with 890 additions and 547 deletions.
2 changes: 1 addition & 1 deletion cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl Iroha {
},
);

let queue = Arc::new(Queue::from_config(config.queue));
let queue = Arc::new(Queue::from_config(config.queue, events_sender));
match Self::start_telemetry(&logger, &config).await? {
TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"),
TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"),
Expand Down
37 changes: 30 additions & 7 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
use iroha_data_model::query::QueryOutputBox;
use iroha_data_model::{
events::pipeline::{
BlockStatus, PipelineEventBox, PipelineEventFilterBox, TransactionEventFilter,
TransactionStatus,
},
query::QueryOutputBox,
};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
Expand Down Expand Up @@ -606,7 +612,7 @@ impl Client {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(
PipelineEventFilter::new().hash(hash.into()).into(),
TransactionEventFilter::default().with_hash(hash).into(),
),
)
.await
Expand All @@ -633,17 +639,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf<SignedTransaction>,
) -> Result<HashOf<SignedTransaction>> {
let mut block_height = None;

while let Some(event) = event_iterator.next().await {
if let Event::Pipeline(this_event) = event? {
match this_event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
match this_event {
PipelineEventBox::Transaction(transaction_event) => {
match transaction_event.status() {
TransactionStatus::Queued => {}
TransactionStatus::Approved => {
block_height = transaction_event.block_height;
}
TransactionStatus::Rejected(reason) => {
return Err(reason.clone().into());
}
TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
}
}
PipelineEventBox::Block(block_event) => {
if Some(block_event.height) == block_height {
if let BlockStatus::Applied = block_event.status() {
return Ok(hash);
}
}
}
PipelineStatus::Committed => return Ok(hash),
}
}
}

Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
Expand Down
5 changes: 3 additions & 2 deletions core/benches/blocks/apply_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ impl WsvApplyBlocks {
.into_iter()
.map(|instructions| {
let block = create_block(&mut wsv, instructions, account_id.clone(), &key_pair);
wsv.apply_without_execution(&block).map(|()| block)
let _wsv_events = wsv.apply_without_execution(&block);
block
})
.collect::<Result<Vec<_>, _>>()?
.collect::<Vec<_>>()
};

Ok(Self { wsv, blocks })
Expand Down
2 changes: 2 additions & 0 deletions core/benches/blocks/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ pub fn create_block(
)
.chain(0, wsv)
.sign(key_pair)
.unpack(|_| {})
.commit(&topology)
.unpack(|_| {})
.unwrap();

// Verify that transactions are valid
Expand Down
2 changes: 1 addition & 1 deletion core/benches/blocks/validate_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl WsvValidateBlocks {
for (instructions, i) in instructions.into_iter().zip(1..) {
finalized_wsv = wsv.clone();
let block = create_block(&mut wsv, instructions, account_id.clone(), &key_pair);
wsv.apply_without_execution(&block)?;
let _wsv_events = wsv.apply_without_execution(&block);
assert_eq!(wsv.height(), i);
assert_eq!(wsv.height(), finalized_wsv.height() + 1);
}
Expand Down
3 changes: 2 additions & 1 deletion core/benches/kura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ async fn measure_block_size_for_n_executors(n_executors: u32) {
let topology = Topology::new(UniqueVec::new());
let mut block = BlockBuilder::new(vec![tx], topology, Vec::new())
.chain(0, &mut wsv)
.sign(&KeyPair::random());
.sign(&KeyPair::random())
.unpack(|_| {});

for _ in 1..n_executors {
block = block.sign(&KeyPair::random());
Expand Down
2 changes: 1 addition & 1 deletion core/benches/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ fn sign_blocks(criterion: &mut Criterion) {
b.iter_batched(
|| block.clone(),
|block| {
let _: ValidBlock = block.sign(&key_pair);
let _: ValidBlock = block.sign(&key_pair).unpack(|_| {});
count += 1;
},
BatchSize::SmallInput,
Expand Down
Loading

0 comments on commit 4b628a6

Please sign in to comment.