diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index b6bc087b1ba..51cb1f0cc10 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -242,12 +242,12 @@ Set the `LOG_FILE_PATH` environment variable to an appropriate location to store
Expand to learn how to compile iroha with tokio console support.
-Sometimes it might be helpful for debugging to analyze tokio tasks using [tokio-console](https://github.com/tokio-rs/console).
+Sometimes it might be helpful for debugging to analyze tokio tasks using [tokio_console](https://github.com/tokio-rs/console).
In this case you should compile iroha with support of tokio console like that:
```bash
-RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console
+RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
```
Port for tokio console can by configured through `LOG_TOKIO_CONSOLE_ADDR` configuration parameter (or environment variable).
@@ -257,11 +257,11 @@ Example of running iroha with tokio console support using `scripts/test_env.sh`:
```bash
# 1. Compile iroha
-RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console
+RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
# 2. Run iroha with TRACE log level
LOG_LEVEL=TRACE ./scripts/test_env.sh setup
# 3. Access iroha. Peers will be available on ports 5555, 5556, ...
-tokio-console http://127.0.0.1:5555
+tokio_console http://127.0.0.1:5555
```
@@ -272,7 +272,7 @@ tokio-console http://127.0.0.1:5555
To optimize performance it's useful to profile iroha.
-To do that you should compile iroha with `profiling` profile and with `profiling` feature:
+To do that you should compile iroha with `profiling` profile and with `profiling` feature:
```bash
RUSTFLAGS="-C force-frame-pointers=on" cargo +nightly -Z build-std build --target your-desired-target --profile profiling --features profiling
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 4ab631e22e2..da3f1a714e5 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -18,19 +18,19 @@ categories.workspace = true
workspace = true
[features]
-default = ["telemetry", "schema-endpoint"]
+default = ["telemetry", "schema_endpoint"]
# Support lightweight telemetry, including diagnostics
telemetry = ["iroha_telemetry", "iroha_core/telemetry", "iroha_torii/telemetry"]
# Support developer-specific telemetry.
# Should not be enabled on production builds.
-dev-telemetry = ["iroha_core/dev-telemetry", "iroha_telemetry"]
+dev_telemetry = ["iroha_core/dev_telemetry", "iroha_telemetry"]
# Support schema generation from the `schema` endpoint in the local binary.
# Useful for debugging issues with decoding in SDKs.
-schema-endpoint = ["iroha_torii/schema"]
+schema_endpoint = ["iroha_torii/schema"]
# Support internal testing infrastructure for integration tests.
# Disable in production.
-test-network = ["thread-local-panic-hook"]
+test_network = ["thread-local-panic-hook"]
[badges]
is-it-maintained-issue-resolution = { repository = "https://github.com/hyperledger/iroha" }
@@ -79,8 +79,8 @@ vergen = { workspace = true, features = ["cargo"] }
[package.metadata.cargo-all-features]
denylist = [
- "schema-endpoint",
+ "schema_endpoint",
"telemetry",
- "test-network",
+ "test_network",
]
skip_optional_dependencies = true
diff --git a/cli/README.md b/cli/README.md
index 5ba8d269b39..6ecceb3f6f9 100644
--- a/cli/README.md
+++ b/cli/README.md
@@ -25,14 +25,14 @@ The results of the compilation can be found in `/target/release
To add optional features, use ``--features``. For example, to add the support for _dev_telemetry_, run:
```bash
-cargo build --release --features dev-telemetry
+cargo build --release --features dev_telemetry
```
A full list of features can be found in the [cargo manifest file](Cargo.toml) for this crate.
### Disable default features
-By default, the Iroha binary is compiled with the `telemetry`, and `schema-endpoint` features. If you wish to remove those features, add `--no-default-features` to the command.
+By default, the Iroha binary is compiled with the `telemetry`, and `schema_endpoint` features. If you wish to remove those features, add `--no-default-features` to the command.
```bash
cargo build --release --no-default-features
diff --git a/cli/src/lib.rs b/cli/src/lib.rs
index eb631467a21..c6c08a7747f 100644
--- a/cli/src/lib.rs
+++ b/cli/src/lib.rs
@@ -140,7 +140,7 @@ impl NetworkRelay {
impl Iroha {
fn prepare_panic_hook(notify_shutdown: Arc) {
- #[cfg(not(feature = "test-network"))]
+ #[cfg(not(feature = "test_network"))]
use std::panic::set_hook;
// This is a hot-fix for tests
@@ -160,7 +160,7 @@ impl Iroha {
//
// Remove this when all Rust integrations tests will be converted to a
// separate Python tests.
- #[cfg(feature = "test-network")]
+ #[cfg(feature = "test_network")]
use thread_local_panic_hook::set_hook;
set_hook(Box::new(move |info| {
@@ -251,7 +251,7 @@ impl Iroha {
});
let state = Arc::new(state);
- let queue = Arc::new(Queue::from_config(config.queue));
+ let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone()));
match Self::start_telemetry(&logger, &config).await? {
TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"),
TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"),
@@ -369,7 +369,7 @@ impl Iroha {
///
/// # Errors
/// - Forwards initialisation error.
- #[cfg(feature = "test-network")]
+ #[cfg(feature = "test_network")]
pub fn start_as_task(&mut self) -> Result>> {
iroha_logger::info!("Starting Iroha as task");
let torii = self
@@ -386,7 +386,7 @@ impl Iroha {
logger: &LoggerHandle,
config: &Config,
) -> Result {
- #[cfg(feature = "dev-telemetry")]
+ #[cfg(feature = "dev_telemetry")]
{
if let Some(config) = &config.dev_telemetry {
let receiver = logger
@@ -539,7 +539,7 @@ mod tests {
use super::*;
- #[cfg(not(feature = "test-network"))]
+ #[cfg(not(feature = "test_network"))]
mod no_test_network {
use std::{iter::repeat, panic, thread};
diff --git a/client/Cargo.toml b/client/Cargo.toml
index 1d38df505b9..4ae8ad2a1f1 100644
--- a/client/Cargo.toml
+++ b/client/Cargo.toml
@@ -24,24 +24,24 @@ maintenance = { status = "actively-developed" }
[features]
# Use rustls by default to avoid OpenSSL dependency, simplifying compilation with musl
-default = ["tls-rustls-native-roots"]
+default = ["tls_rustls_native_roots"]
-tls-native = [
+tls_native = [
"attohttpc/tls-native",
"tokio-tungstenite/native-tls",
"tungstenite/native-tls",
]
-tls-native-vendored = [
+tls_native_vendored = [
"attohttpc/tls-native-vendored",
"tokio-tungstenite/native-tls-vendored",
"tungstenite/native-tls-vendored",
]
-tls-rustls-native-roots = [
+tls_rustls_native_roots = [
"attohttpc/tls-rustls-native-roots",
"tokio-tungstenite/rustls-tls-native-roots",
"tungstenite/rustls-tls-native-roots",
]
-tls-rustls-webpki-roots = [
+tls_rustls_webpki_roots = [
"attohttpc/tls-rustls-webpki-roots",
"tokio-tungstenite/rustls-tls-webpki-roots",
"tungstenite/rustls-tls-webpki-roots",
@@ -83,7 +83,7 @@ iroha_wasm_builder = { workspace = true }
# TODO: These three activate `transparent_api` but client should never activate this feature.
# Additionally there is a dependency on iroha_core in dev-dependencies in telemetry/derive
# Hopefully, once the integration tests migration is finished these can be removed
-iroha = { workspace = true, features = ["dev-telemetry", "telemetry"] }
+iroha = { workspace = true, features = ["dev_telemetry", "telemetry"] }
iroha_genesis = { workspace = true }
test_network = { workspace = true }
diff --git a/client/benches/tps/utils.rs b/client/benches/tps/utils.rs
index d215d1ce203..6e2f74d83fc 100644
--- a/client/benches/tps/utils.rs
+++ b/client/benches/tps/utils.rs
@@ -18,6 +18,7 @@ use iroha_client::{
prelude::*,
},
};
+use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use serde::Deserialize;
use test_network::*;
@@ -172,13 +173,11 @@ impl MeasurerUnit {
fn spawn_event_counter(&self) -> thread::JoinHandle> {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
- let event_filter = PipelineEventFilter::new()
- .for_entity(PipelineEntityKind::Block)
- .for_status(PipelineStatusKind::Committed);
+ let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
- let mut event_iterator = listener.listen_for_events(event_filter)?;
+ let mut event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for i in 1..=blocks_expected {
let _event = event_iterator.next().expect("Event stream closed")?;
diff --git a/client/src/client.rs b/client/src/client.rs
index ce942c1752c..b30a0d67193 100644
--- a/client/src/client.rs
+++ b/client/src/client.rs
@@ -14,7 +14,13 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
-use iroha_data_model::query::QueryOutputBox;
+use iroha_data_model::{
+ events::pipeline::{
+ BlockEventFilter, BlockStatus, PipelineEventBox, PipelineEventFilterBox,
+ TransactionEventFilter, TransactionStatus,
+ },
+ query::QueryOutputBox,
+};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
@@ -603,14 +609,19 @@ impl Client {
rt.block_on(async {
let mut event_iterator = {
- let event_iterator_result = tokio::time::timeout_at(
- deadline,
- self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())),
- )
- .await
- .map_err(Into::into)
- .and_then(std::convert::identity)
- .wrap_err("Failed to establish event listener connection");
+ let filters = vec![
+ TransactionEventFilter::default().for_hash(hash).into(),
+ PipelineEventFilterBox::from(
+ BlockEventFilter::default().for_status(BlockStatus::Applied),
+ ),
+ ];
+
+ let event_iterator_result =
+ tokio::time::timeout_at(deadline, self.listen_for_events_async(filters))
+ .await
+ .map_err(Into::into)
+ .and_then(std::convert::identity)
+ .wrap_err("Failed to establish event listener connection");
let _send_result = init_sender.send(event_iterator_result.is_ok());
event_iterator_result?
};
@@ -631,17 +642,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf,
) -> Result> {
+ let mut block_height = None;
+
while let Some(event) = event_iterator.next().await {
- if let Event::Pipeline(this_event) = event? {
- match this_event.status() {
- PipelineStatus::Validating => {}
- PipelineStatus::Rejected(ref reason) => {
- return Err(reason.clone().into());
+ if let EventBox::Pipeline(this_event) = event? {
+ match this_event {
+ PipelineEventBox::Transaction(transaction_event) => {
+ match transaction_event.status() {
+ TransactionStatus::Queued => {}
+ TransactionStatus::Approved => {
+ block_height = transaction_event.block_height;
+ }
+ TransactionStatus::Rejected(reason) => {
+ return Err((Clone::clone(&**reason)).into());
+ }
+ TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
+ }
+ }
+ PipelineEventBox::Block(block_event) => {
+ if Some(block_event.header().height()) == block_height {
+ if let BlockStatus::Applied = block_event.status() {
+ return Ok(hash);
+ }
+ }
}
- PipelineStatus::Committed => return Ok(hash),
}
}
}
+
Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
@@ -903,11 +931,9 @@ impl Client {
/// - Forwards from [`events_api::EventIterator::new`]
pub fn listen_for_events(
&self,
- event_filter: impl Into,
- ) -> Result>> {
- let event_filter = event_filter.into();
- iroha_logger::trace!(?event_filter);
- events_api::EventIterator::new(self.events_handler(event_filter)?)
+ event_filters: impl IntoIterator- >,
+ ) -> Result>> {
+ events_api::EventIterator::new(self.events_handler(event_filters)?)
}
/// Connect asynchronously (through `WebSocket`) to listen for `Iroha` `pipeline` and `data` events.
@@ -917,11 +943,9 @@ impl Client {
/// - Forwards from [`events_api::AsyncEventStream::new`]
pub async fn listen_for_events_async(
&self,
- event_filter: impl Into + Send,
+ event_filters: impl IntoIterator
- > + Send,
) -> Result {
- let event_filter = event_filter.into();
- iroha_logger::trace!(?event_filter, "Async listening with");
- events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
+ events_api::AsyncEventStream::new(self.events_handler(event_filters)?).await
}
/// Constructs an Events API handler. With it, you can use any WS client you want.
@@ -931,10 +955,10 @@ impl Client {
#[inline]
pub fn events_handler(
&self,
- event_filter: impl Into,
+ event_filters: impl IntoIterator
- >,
) -> Result {
events_api::flow::Init::new(
- event_filter.into(),
+ event_filters,
self.headers.clone(),
self.torii_url
.join(torii_uri::SUBSCRIPTION)
@@ -1237,12 +1261,12 @@ pub mod events_api {
/// Initialization struct for Events API flow.
pub struct Init {
- /// Event filter
- filter: EventFilterBox,
- /// HTTP request headers
- headers: HashMap,
/// TORII URL
url: Url,
+ /// HTTP request headers
+ headers: HashMap,
+ /// Event filter
+ filters: Vec,
}
impl Init {
@@ -1252,14 +1276,14 @@ pub mod events_api {
/// Fails if [`transform_ws_url`] fails.
#[inline]
pub(in super::super) fn new(
- filter: EventFilterBox,
+ filters: impl IntoIterator
- >,
headers: HashMap,
url: Url,
) -> Result {
Ok(Self {
- filter,
- headers,
url: transform_ws_url(url)?,
+ headers,
+ filters: filters.into_iter().map(Into::into).collect(),
})
}
}
@@ -1269,12 +1293,12 @@ pub mod events_api {
fn init(self) -> InitData {
let Self {
- filter,
- headers,
url,
+ headers,
+ filters,
} = self;
- let msg = EventSubscriptionRequest::new(filter).encode();
+ let msg = EventSubscriptionRequest::new(filters).encode();
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
}
}
@@ -1284,7 +1308,7 @@ pub mod events_api {
pub struct Events;
impl FlowEvents for Events {
- type Event = crate::data_model::prelude::Event;
+ type Event = crate::data_model::prelude::EventBox;
fn message(&self, message: Vec) -> Result {
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
diff --git a/client/src/config.rs b/client/src/config.rs
index 34b7e8663c7..72bb909d8c7 100644
--- a/client/src/config.rs
+++ b/client/src/config.rs
@@ -9,7 +9,7 @@ use iroha_config::{
base,
base::{FromEnv, StdEnv, UnwrapPartial},
};
-use iroha_crypto::prelude::*;
+use iroha_crypto::KeyPair;
use iroha_data_model::{prelude::*, ChainId};
use iroha_primitives::small::SmallStr;
use serde::{Deserialize, Serialize};
diff --git a/client/tests/integration/asset.rs b/client/tests/integration/asset.rs
index fe95e30f348..34a102afd93 100644
--- a/client/tests/integration/asset.rs
+++ b/client/tests/integration/asset.rs
@@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
asset::{AssetId, AssetValue, AssetValueType},
isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError},
+ transaction::error::TransactionRejectionReason,
};
use serde_json::json;
use test_network::*;
@@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() {
.expect_err("Should be rejected due to non integer value");
let rejection_reason = err
- .downcast_ref::()
- .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
+ .downcast_ref::()
+ .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));
assert_eq!(
rejection_reason,
- &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
- ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate(
- InstructionEvaluationError::Type(TypeError::from(Mismatch {
+ &TransactionRejectionReason::Validation(ValidationFail::InstructionFailed(
+ InstructionExecutionError::Evaluate(InstructionEvaluationError::Type(
+ TypeError::from(Mismatch {
expected: AssetValueType::Numeric(NumericSpec::integer()),
actual: AssetValueType::Numeric(NumericSpec::fractional(2))
- }))
+ })
))
))
);
diff --git a/client/tests/integration/domain_owner_permissions.rs b/client/tests/integration/domain_owner_permissions.rs
index e0945b85f70..af78eff12ac 100644
--- a/client/tests/integration/domain_owner_permissions.rs
+++ b/client/tests/integration/domain_owner_permissions.rs
@@ -3,6 +3,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::{account::SignatureCheckCondition, prelude::*},
};
+use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;
@@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> {
.expect_err("Tx should fail due to permissions");
let rejection_reason = err
- .downcast_ref::()
- .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
+ .downcast_ref::()
+ .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));
assert!(matches!(
rejection_reason,
- &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
- ValidationFail::NotPermitted(_)
- ))
+ &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));
// "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner
diff --git a/client/tests/integration/events/data.rs b/client/tests/integration/events/data.rs
index 4250ff2b682..9a6d6986cc2 100644
--- a/client/tests/integration/events/data.rs
+++ b/client/tests/integration/events/data.rs
@@ -140,7 +140,7 @@ fn transaction_execution_should_produce_events(
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
- let event_iterator = listener.listen_for_events(event_filter)?;
+ let event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for event in event_iterator {
event_sender.send(event)?
@@ -184,7 +184,7 @@ fn produce_multiple_events() -> Result<()> {
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
- let event_iterator = listener.listen_for_events(event_filter)?;
+ let event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for event in event_iterator {
event_sender.send(event)?
diff --git a/client/tests/integration/events/notification.rs b/client/tests/integration/events/notification.rs
index bf26feb351b..c060d1e1e64 100644
--- a/client/tests/integration/events/notification.rs
+++ b/client/tests/integration/events/notification.rs
@@ -33,11 +33,9 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
- let mut event_it = thread_client.listen_for_events(
- TriggerCompletedEventFilter::new()
- .for_trigger(trigger_id)
- .for_outcome(TriggerCompletedOutcomeType::Success),
- )?;
+ let mut event_it = thread_client.listen_for_events([TriggerCompletedEventFilter::new()
+ .for_trigger(trigger_id)
+ .for_outcome(TriggerCompletedOutcomeType::Success)])?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
@@ -79,11 +77,9 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
- let mut event_it = thread_client.listen_for_events(
- TriggerCompletedEventFilter::new()
- .for_trigger(trigger_id)
- .for_outcome(TriggerCompletedOutcomeType::Failure),
- )?;
+ let mut event_it = thread_client.listen_for_events([TriggerCompletedEventFilter::new()
+ .for_trigger(trigger_id)
+ .for_outcome(TriggerCompletedOutcomeType::Failure)])?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
diff --git a/client/tests/integration/events/pipeline.rs b/client/tests/integration/events/pipeline.rs
index 30f17528219..cd8288e0f05 100644
--- a/client/tests/integration/events/pipeline.rs
+++ b/client/tests/integration/events/pipeline.rs
@@ -9,6 +9,14 @@ use iroha_client::{
},
};
use iroha_config::parameters::actual::Root as Config;
+use iroha_data_model::{
+ events::pipeline::{
+ BlockEvent, BlockEventFilter, BlockStatus, TransactionEventFilter, TransactionStatus,
+ },
+ isi::error::InstructionExecutionError,
+ transaction::error::TransactionRejectionReason,
+ ValidationFail,
+};
use test_network::*;
// Needed to re-enable ignored tests.
@@ -17,24 +25,28 @@ const PEER_COUNT: usize = 7;
#[ignore = "ignore, more in #2851"]
#[test]
fn transaction_with_no_instructions_should_be_committed() -> Result<()> {
- test_with_instruction_and_status_and_port(None, PipelineStatusKind::Committed, 10_250)
+ test_with_instruction_and_status_and_port(None, &TransactionStatus::Approved, 10_250)
}
#[ignore = "ignore, more in #2851"]
// #[ignore = "Experiment"]
#[test]
fn transaction_with_fail_instruction_should_be_rejected() -> Result<()> {
- let fail = Fail::new("Should be rejected".to_owned());
+ let msg = "Should be rejected".to_owned();
+
+ let fail = Fail::new(msg.clone());
test_with_instruction_and_status_and_port(
Some(fail.into()),
- PipelineStatusKind::Rejected,
+ &TransactionStatus::Rejected(Box::new(TransactionRejectionReason::Validation(
+ ValidationFail::InstructionFailed(InstructionExecutionError::Fail(msg)),
+ ))),
10_350,
)
}
fn test_with_instruction_and_status_and_port(
instruction: Option,
- should_be: PipelineStatusKind,
+ should_be: &TransactionStatus,
port: u16,
) -> Result<()> {
let (_rt, network, client) =
@@ -56,9 +68,9 @@ fn test_with_instruction_and_status_and_port(
let mut handles = Vec::new();
for listener in clients {
let checker = Checker { listener, hash };
- let handle_validating = checker.clone().spawn(PipelineStatusKind::Validating);
+ let handle_validating = checker.clone().spawn(TransactionStatus::Queued);
handles.push(handle_validating);
- let handle_validated = checker.spawn(should_be);
+ let handle_validated = checker.spawn(should_be.clone());
handles.push(handle_validated);
}
// When
@@ -78,16 +90,13 @@ struct Checker {
}
impl Checker {
- fn spawn(self, status_kind: PipelineStatusKind) -> JoinHandle<()> {
+ fn spawn(self, status_kind: TransactionStatus) -> JoinHandle<()> {
thread::spawn(move || {
let mut event_iterator = self
.listener
- .listen_for_events(
- PipelineEventFilter::new()
- .for_entity(PipelineEntityKind::Transaction)
- .for_status(status_kind)
- .for_hash(*self.hash),
- )
+ .listen_for_events([TransactionEventFilter::default()
+ .for_status(status_kind)
+ .for_hash(self.hash)])
.expect("Failed to create event iterator.");
let event_result = event_iterator.next().expect("Stream closed");
let _event = event_result.expect("Must be valid");
@@ -96,36 +105,30 @@ impl Checker {
}
#[test]
-fn committed_block_must_be_available_in_kura() {
+fn applied_block_must_be_available_in_kura() {
let (_rt, peer, client) = ::new().with_port(11_040).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);
- let event_filter = PipelineEventFilter::new()
- .for_entity(PipelineEntityKind::Block)
- .for_status(PipelineStatusKind::Committed);
+ let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
let mut event_iter = client
- .listen_for_events(event_filter)
+ .listen_for_events([event_filter])
.expect("Failed to subscribe for events");
client
.submit(Fail::new("Dummy instruction".to_owned()))
.expect("Failed to submit transaction");
- let event = event_iter.next().expect("Block must be committed");
- let Ok(Event::Pipeline(PipelineEvent {
- entity_kind: PipelineEntityKind::Block,
- status: PipelineStatus::Committed,
- hash,
- })) = event
- else {
- panic!("Received unexpected event")
- };
- let hash = HashOf::from_untyped_unchecked(hash);
+ let event: BlockEvent = event_iter
+ .next()
+ .expect("Block must be committed")
+ .expect("Block must be committed")
+ .try_into()
+ .expect("Received unexpected event");
peer.iroha
.as_ref()
.expect("Must be some")
.kura
- .get_block_height_by_hash(&hash)
- .expect("Block committed event was received earlier");
+ .get_block_by_height(event.header().height())
+ .expect("Block applied event was received earlier");
}
diff --git a/client/tests/integration/permissions.rs b/client/tests/integration/permissions.rs
index e7fea53ac18..9a4578b8660 100644
--- a/client/tests/integration/permissions.rs
+++ b/client/tests/integration/permissions.rs
@@ -6,7 +6,9 @@ use iroha_client::{
crypto::KeyPair,
data_model::prelude::*,
};
-use iroha_data_model::permission::PermissionToken;
+use iroha_data_model::{
+ permission::PermissionToken, transaction::error::TransactionRejectionReason,
+};
use iroha_genesis::GenesisNetwork;
use serde_json::json;
use test_network::{PeerBuilder, *};
@@ -104,14 +106,12 @@ fn permissions_disallow_asset_transfer() {
.submit_transaction_blocking(&transfer_tx)
.expect_err("Transaction was not rejected.");
let rejection_reason = err
- .downcast_ref::()
- .expect("Error {err} is not PipelineRejectionReason");
+ .downcast_ref::()
+ .expect("Error {err} is not TransactionRejectionReason");
//Then
assert!(matches!(
rejection_reason,
- &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
- ValidationFail::NotPermitted(_)
- ))
+ &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));
let alice_assets = get_assets(&iroha_client, &alice_id);
assert_eq!(alice_assets, alice_start_assets);
@@ -156,14 +156,12 @@ fn permissions_disallow_asset_burn() {
.submit_transaction_blocking(&burn_tx)
.expect_err("Transaction was not rejected.");
let rejection_reason = err
- .downcast_ref::()
- .expect("Error {err} is not PipelineRejectionReason");
+ .downcast_ref::()
+ .expect("Error {err} is not TransactionRejectionReason");
assert!(matches!(
rejection_reason,
- &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
- ValidationFail::NotPermitted(_)
- ))
+ &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));
let alice_assets = get_assets(&iroha_client, &alice_id);
diff --git a/client/tests/integration/roles.rs b/client/tests/integration/roles.rs
index 12a03f333c1..6f260e3709f 100644
--- a/client/tests/integration/roles.rs
+++ b/client/tests/integration/roles.rs
@@ -6,6 +6,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::prelude::*,
};
+use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;
@@ -164,14 +165,12 @@ fn role_with_invalid_permissions_is_not_accepted() -> Result<()> {
.expect_err("Submitting role with invalid permission token should fail");
let rejection_reason = err
- .downcast_ref::()
- .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
+ .downcast_ref::()
+ .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));
assert!(matches!(
rejection_reason,
- &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
- ValidationFail::NotPermitted(_)
- ))
+ &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));
Ok(())
diff --git a/client/tests/integration/smartcontracts/query_assets_and_save_cursor/src/lib.rs b/client/tests/integration/smartcontracts/query_assets_and_save_cursor/src/lib.rs
index 5028ca4e01d..feadea44447 100644
--- a/client/tests/integration/smartcontracts/query_assets_and_save_cursor/src/lib.rs
+++ b/client/tests/integration/smartcontracts/query_assets_and_save_cursor/src/lib.rs
@@ -26,7 +26,7 @@ fn main(owner: AccountId) {
.execute()
.dbg_unwrap();
- let (_batch, cursor) = asset_cursor.into_raw_parts();
+ let (_batch, cursor) = asset_cursor.into_parts();
SetKeyValue::account(
owner,
diff --git a/client/tests/integration/triggers/by_call_trigger.rs b/client/tests/integration/triggers/by_call_trigger.rs
index a2c2ac2b41d..ff76caf34f2 100644
--- a/client/tests/integration/triggers/by_call_trigger.rs
+++ b/client/tests/integration/triggers/by_call_trigger.rs
@@ -58,11 +58,9 @@ fn execute_trigger_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
- let mut event_it = thread_client.listen_for_events(
- ExecuteTriggerEventFilter::new()
- .for_trigger(trigger_id)
- .under_authority(account_id),
- )?;
+ let mut event_it = thread_client.listen_for_events([ExecuteTriggerEventFilter::new()
+ .for_trigger(trigger_id)
+ .under_authority(account_id)])?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
diff --git a/client/tests/integration/triggers/time_trigger.rs b/client/tests/integration/triggers/time_trigger.rs
index 1f29a0d8ba9..8c335d894b3 100644
--- a/client/tests/integration/triggers/time_trigger.rs
+++ b/client/tests/integration/triggers/time_trigger.rs
@@ -5,12 +5,30 @@ use iroha_client::{
client::{self, Client, QueryResult},
data_model::{prelude::*, transaction::WasmSmartContract},
};
-use iroha_config::parameters::defaults::chain_wide::DEFAULT_CONSENSUS_ESTIMATION;
+use iroha_config::parameters::defaults::chain_wide::{DEFAULT_BLOCK_TIME, DEFAULT_COMMIT_TIME};
+use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use iroha_logger::info;
use test_network::*;
use crate::integration::new_account_with_random_public_key;
+const DEFAULT_CONSENSUS_ESTIMATION: Duration =
+ match DEFAULT_BLOCK_TIME.checked_add(match DEFAULT_COMMIT_TIME.checked_div(2) {
+ Some(x) => x,
+ None => unreachable!(),
+ }) {
+ Some(x) => x,
+ None => unreachable!(),
+ };
+
+fn curr_time() -> core::time::Duration {
+ use std::time::SystemTime;
+
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Failed to get the current system time")
+}
+
/// Macro to abort compilation, if `e` isn't `true`
macro_rules! const_assert {
($e:expr) => {
@@ -33,7 +51,7 @@ fn time_trigger_execution_count_error_should_be_less_than_15_percent() -> Result
let (_rt, _peer, mut test_client) = ::new().with_port(10_775).start_with_runtime();
wait_for_genesis_committed(&vec![test_client.clone()], 0);
- let start_time = current_time();
+ let start_time = curr_time();
// Start listening BEFORE submitting any transaction not to miss any block committed event
let event_listener = get_block_committed_event_listener(&test_client)?;
@@ -66,7 +84,7 @@ fn time_trigger_execution_count_error_should_be_less_than_15_percent() -> Result
)?;
std::thread::sleep(DEFAULT_CONSENSUS_ESTIMATION);
- let finish_time = current_time();
+ let finish_time = curr_time();
let average_count = finish_time.saturating_sub(start_time).as_millis() / PERIOD.as_millis();
let actual_value = get_asset_value(&mut test_client, asset_id);
@@ -92,7 +110,7 @@ fn change_asset_metadata_after_1_sec() -> Result<()> {
let (_rt, _peer, mut test_client) = ::new().with_port(10_660).start_with_runtime();
wait_for_genesis_committed(&vec![test_client.clone()], 0);
- let start_time = current_time();
+ let start_time = curr_time();
// Start listening BEFORE submitting any transaction not to miss any block committed event
let event_listener = get_block_committed_event_listener(&test_client)?;
@@ -220,7 +238,7 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
let event_listener = get_block_committed_event_listener(&test_client)?;
// Registering trigger
- let start_time = current_time();
+ let start_time = curr_time();
let schedule =
TimeSchedule::starting_at(start_time).with_period(Duration::from_millis(TRIGGER_PERIOD_MS));
let register_trigger = Register::trigger(Trigger::new(
@@ -272,11 +290,9 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
/// Get block committed event listener
fn get_block_committed_event_listener(
client: &Client,
-) -> Result>> {
- let block_filter = PipelineEventFilter::new()
- .for_entity(PipelineEntityKind::Block)
- .for_status(PipelineStatusKind::Committed);
- client.listen_for_events(block_filter)
+) -> Result>> {
+ let block_filter = BlockEventFilter::default().for_status(BlockStatus::Committed);
+ client.listen_for_events([block_filter])
}
/// Get asset numeric value
@@ -292,7 +308,7 @@ fn get_asset_value(client: &mut Client, asset_id: AssetId) -> Numeric {
/// Submit some sample ISIs to create new blocks
fn submit_sample_isi_on_every_block_commit(
- block_committed_event_listener: impl Iterator
- >,
+ block_committed_event_listener: impl Iterator
- >,
test_client: &mut Client,
account_id: &AccountId,
timeout: Duration,
diff --git a/client_cli/src/main.rs b/client_cli/src/main.rs
index 807d504a280..7a817316e57 100644
--- a/client_cli/src/main.rs
+++ b/client_cli/src/main.rs
@@ -249,13 +249,17 @@ mod filter {
mod events {
+ use iroha_client::data_model::events::pipeline::{BlockEventFilter, TransactionEventFilter};
+
use super::*;
/// Get event stream from iroha peer
#[derive(clap::Subcommand, Debug, Clone, Copy)]
pub enum Args {
- /// Gets pipeline events
- Pipeline,
+ /// Gets block pipeline events
+ BlockPipeline,
+ /// Gets transaction pipeline events
+ TransactionPipeline,
/// Gets data events
Data,
/// Get execute trigger events
@@ -267,7 +271,8 @@ mod events {
impl RunArgs for Args {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
match self {
- Args::Pipeline => listen(PipelineEventFilter::new(), context),
+ Args::TransactionPipeline => listen(TransactionEventFilter::default(), context),
+ Args::BlockPipeline => listen(BlockEventFilter::default(), context),
Args::Data => listen(DataEventFilter::Any, context),
Args::ExecuteTrigger => listen(ExecuteTriggerEventFilter::new(), context),
Args::TriggerCompleted => listen(TriggerCompletedEventFilter::new(), context),
@@ -280,7 +285,7 @@ mod events {
let iroha_client = context.client_from_config();
eprintln!("Listening to events with filter: {filter:?}");
iroha_client
- .listen_for_events(filter)
+ .listen_for_events([filter])
.wrap_err("Failed to listen for events.")?
.try_for_each(|event| context.print_data(&event?))?;
Ok(())
diff --git a/config/Cargo.toml b/config/Cargo.toml
index 8c354f4372b..8daf3edca93 100644
--- a/config/Cargo.toml
+++ b/config/Cargo.toml
@@ -44,4 +44,4 @@ trybuild = { workspace = true }
hex = { workspace = true }
[features]
-tokio-console = []
+tokio_console = []
diff --git a/config/src/parameters/defaults.rs b/config/src/parameters/defaults.rs
index ff55704ee09..a212d4dfb56 100644
--- a/config/src/parameters/defaults.rs
+++ b/config/src/parameters/defaults.rs
@@ -24,7 +24,7 @@ pub mod kura {
pub const DEFAULT_STORE_DIR: &str = "./storage";
}
-#[cfg(feature = "tokio-console")]
+#[cfg(feature = "tokio_console")]
pub mod logger {
use iroha_primitives::addr::{socket_addr, SocketAddr};
@@ -61,16 +61,6 @@ pub mod chain_wide {
// TODO: wrap into a `Bytes` newtype
pub const DEFAULT_WASM_MAX_MEMORY_BYTES: u32 = 500 * 2_u32.pow(20);
- /// Default estimation of consensus duration.
- pub const DEFAULT_CONSENSUS_ESTIMATION: Duration =
- match DEFAULT_BLOCK_TIME.checked_add(match DEFAULT_COMMIT_TIME.checked_div(2) {
- Some(x) => x,
- None => unreachable!(),
- }) {
- Some(x) => x,
- None => unreachable!(),
- };
-
/// Default limits for metadata
pub const DEFAULT_METADATA_LIMITS: MetadataLimits =
MetadataLimits::new(2_u32.pow(20), 2_u32.pow(12));
diff --git a/config/src/parameters/user.rs b/config/src/parameters/user.rs
index d395f50fbb6..9fc5f3255d4 100644
--- a/config/src/parameters/user.rs
+++ b/config/src/parameters/user.rs
@@ -461,7 +461,7 @@ pub struct Queue {
pub future_threshold: Duration,
}
-#[allow(missing_copy_implementations)] // triggered without tokio-console
+#[allow(missing_copy_implementations)] // triggered without tokio_console
#[derive(Debug, Clone)]
pub struct Logger {
/// Level of logging verbosity
@@ -471,18 +471,18 @@ pub struct Logger {
pub level: Level,
/// Output format
pub format: LoggerFormat,
- #[cfg(feature = "tokio-console")]
- /// Address of tokio console (only available under "tokio-console" feature)
+ #[cfg(feature = "tokio_console")]
+ /// Address of tokio console (only available under "tokio_console" feature)
pub tokio_console_address: SocketAddr,
}
-#[allow(clippy::derivable_impls)] // triggers in absence of `tokio-console` feature
+#[allow(clippy::derivable_impls)] // triggers in absence of `tokio_console` feature
impl Default for Logger {
fn default() -> Self {
Self {
level: Level::default(),
format: LoggerFormat::default(),
- #[cfg(feature = "tokio-console")]
+ #[cfg(feature = "tokio_console")]
tokio_console_address: super::defaults::logger::DEFAULT_TOKIO_CONSOLE_ADDR,
}
}
diff --git a/config/src/parameters/user/boilerplate.rs b/config/src/parameters/user/boilerplate.rs
index 34474918934..922ae429ce8 100644
--- a/config/src/parameters/user/boilerplate.rs
+++ b/config/src/parameters/user/boilerplate.rs
@@ -502,7 +502,7 @@ impl FromEnvDefaultFallback for QueuePartial {}
/// 'Logger' configuration.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Default, Merge)]
-// `tokio_console_addr` is not `Copy`, but warning appears without `tokio-console` feature
+// `tokio_console_addr` is not `Copy`, but warning appears without `tokio_console` feature
#[allow(missing_copy_implementations)]
#[serde(deny_unknown_fields, default)]
pub struct LoggerPartial {
@@ -510,8 +510,8 @@ pub struct LoggerPartial {
pub level: UserField,
/// Output format
pub format: UserField,
- #[cfg(feature = "tokio-console")]
- /// Address of tokio console (only available under "tokio-console" feature)
+ #[cfg(feature = "tokio_console")]
+ /// Address of tokio console (only available under "tokio_console" feature)
pub tokio_console_address: UserField,
}
@@ -522,7 +522,7 @@ impl UnwrapPartial for LoggerPartial {
Ok(Logger {
level: self.level.unwrap_or_default(),
format: self.format.unwrap_or_default(),
- #[cfg(feature = "tokio-console")]
+ #[cfg(feature = "tokio_console")]
tokio_console_address: self.tokio_console_address.get().unwrap_or_else(|| {
super::super::defaults::logger::DEFAULT_TOKIO_CONSOLE_ADDR.clone()
}),
diff --git a/config/tests/fixtures/full.toml b/config/tests/fixtures/full.toml
index f38ad0e38ef..dedb06adae0 100644
--- a/config/tests/fixtures/full.toml
+++ b/config/tests/fixtures/full.toml
@@ -58,7 +58,7 @@ min_retry_period = 5_000
max_retry_delay_exponent = 4
[telemetry.dev]
-out_file = "./dev-telemetry.json5"
+out_file = "./dev_telemetry.json5"
[chain_wide]
max_transactions_in_block = 512
diff --git a/configs/peer.template.toml b/configs/peer.template.toml
index bc01942940e..68cb6c63e70 100644
--- a/configs/peer.template.toml
+++ b/configs/peer.template.toml
@@ -64,4 +64,4 @@
[telemetry.dev]
## FIXME: is it JSON5?
-# out_file = "./dev-telemetry.json5"
+# out_file = "./dev_telemetry.json5"
diff --git a/configs/swarm/executor.wasm b/configs/swarm/executor.wasm
index fb05db1652e..15dd310a9d8 100644
Binary files a/configs/swarm/executor.wasm and b/configs/swarm/executor.wasm differ
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 17d88b76cd3..798cb092590 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -26,9 +26,7 @@ telemetry = []
cli = []
# Support developer-specific telemetry.
# Should not be enabled on production builds.
-dev-telemetry = ["telemetry", "iroha_telemetry/dev-telemetry"]
-# Support Prometheus metrics. See https://prometheus.io/.
-expensive-telemetry = ["iroha_telemetry/metric-instrumentation"]
+dev_telemetry = ["telemetry", "iroha_telemetry/dev_telemetry"]
# Profiler integration for wasmtime
profiling = []
@@ -109,8 +107,8 @@ path = "benches/blocks/validate_blocks_oneshot.rs"
[package.metadata.cargo-all-features]
denylist = [
-"schema-endpoint",
+"schema_endpoint",
"telemetry",
-"test-network"
+"test_network"
]
skip_optional_dependencies = true
diff --git a/core/benches/blocks/apply_blocks.rs b/core/benches/blocks/apply_blocks.rs
index f85921695d5..104f6728dca 100644
--- a/core/benches/blocks/apply_blocks.rs
+++ b/core/benches/blocks/apply_blocks.rs
@@ -39,7 +39,7 @@ impl StateApplyBlocks {
let state = build_state(rt, &account_id, &key_pair);
instructions
.into_iter()
- .map(|instructions| -> Result<_> {
+ .map(|instructions| {
let mut state_block = state.block();
let block = create_block(
&mut state_block,
@@ -47,11 +47,11 @@ impl StateApplyBlocks {
account_id.clone(),
&key_pair,
);
- state_block.apply_without_execution(&block)?;
+ let _wsv_events = state_block.apply_without_execution(&block);
state_block.commit();
- Ok(block)
+ block
})
- .collect::, _>>()?
+ .collect::>()
};
Ok(Self { state, blocks })
diff --git a/core/benches/blocks/common.rs b/core/benches/blocks/common.rs
index e4070b458c5..d88514f7c9f 100644
--- a/core/benches/blocks/common.rs
+++ b/core/benches/blocks/common.rs
@@ -42,7 +42,9 @@ pub fn create_block(
)
.chain(0, state)
.sign(key_pair)
+ .unpack(|_| {})
.commit(&topology)
+ .unpack(|_| {})
.unwrap();
// Verify that transactions are valid
diff --git a/core/benches/blocks/validate_blocks.rs b/core/benches/blocks/validate_blocks.rs
index 3390d7aaebe..6aa027d5f65 100644
--- a/core/benches/blocks/validate_blocks.rs
+++ b/core/benches/blocks/validate_blocks.rs
@@ -1,4 +1,3 @@
-use eyre::Result;
use iroha_core::{prelude::*, state::State};
use iroha_data_model::{isi::InstructionBox, prelude::*};
@@ -21,11 +20,11 @@ impl StateValidateBlocks {
/// - Failed to parse [`AccountId`]
/// - Failed to generate [`KeyPair`]
/// - Failed to create instructions for block
- pub fn setup(rt: &tokio::runtime::Handle) -> Result {
+ pub fn setup(rt: &tokio::runtime::Handle) -> Self {
let domains = 100;
let accounts_per_domain = 1000;
let assets_per_domain = 1000;
- let account_id: AccountId = "alice@wonderland".parse()?;
+ let account_id: AccountId = "alice@wonderland".parse().unwrap();
let key_pair = KeyPair::random();
let state = build_state(rt, &account_id, &key_pair);
@@ -38,12 +37,12 @@ impl StateValidateBlocks {
.into_iter()
.collect::>();
- Ok(Self {
+ Self {
state,
instructions,
key_pair,
account_id,
- })
+ }
}
/// Run benchmark body.
@@ -61,7 +60,7 @@ impl StateValidateBlocks {
key_pair,
account_id,
}: Self,
- ) -> Result<()> {
+ ) {
for (instructions, i) in instructions.into_iter().zip(1..) {
let mut state_block = state.block();
let block = create_block(
@@ -70,11 +69,9 @@ impl StateValidateBlocks {
account_id.clone(),
&key_pair,
);
- state_block.apply_without_execution(&block)?;
+ let _wsv_events = state_block.apply_without_execution(&block);
assert_eq!(state_block.height(), i);
state_block.commit();
}
-
- Ok(())
}
}
diff --git a/core/benches/blocks/validate_blocks_benchmark.rs b/core/benches/blocks/validate_blocks_benchmark.rs
index 454e07e3f4c..c3592b506f2 100644
--- a/core/benches/blocks/validate_blocks_benchmark.rs
+++ b/core/benches/blocks/validate_blocks_benchmark.rs
@@ -15,10 +15,8 @@ fn validate_blocks(c: &mut Criterion) {
group.significance_level(0.1).sample_size(10);
group.bench_function("validate_blocks", |b| {
b.iter_batched(
- || StateValidateBlocks::setup(rt.handle()).expect("Failed to setup benchmark"),
- |bench| {
- StateValidateBlocks::measure(bench).expect("Failed to execute benchmark");
- },
+ || StateValidateBlocks::setup(rt.handle()),
+ StateValidateBlocks::measure,
criterion::BatchSize::SmallInput,
);
});
diff --git a/core/benches/blocks/validate_blocks_oneshot.rs b/core/benches/blocks/validate_blocks_oneshot.rs
index 118ce739b99..8c8b20b1343 100644
--- a/core/benches/blocks/validate_blocks_oneshot.rs
+++ b/core/benches/blocks/validate_blocks_oneshot.rs
@@ -20,6 +20,6 @@ fn main() {
}
iroha_logger::test_logger();
iroha_logger::info!("Starting...");
- let bench = StateValidateBlocks::setup(rt.handle()).expect("Failed to setup benchmark");
- StateValidateBlocks::measure(bench).expect("Failed to execute bnechmark");
+ let bench = StateValidateBlocks::setup(rt.handle());
+ StateValidateBlocks::measure(bench);
}
diff --git a/core/benches/kura.rs b/core/benches/kura.rs
index 06f78dcfc9b..521e242f60e 100644
--- a/core/benches/kura.rs
+++ b/core/benches/kura.rs
@@ -56,6 +56,7 @@ async fn measure_block_size_for_n_executors(n_executors: u32) {
BlockBuilder::new(vec![tx], topology, Vec::new())
.chain(0, &mut state_block)
.sign(&KeyPair::random())
+ .unpack(|_| {})
};
for _ in 1..n_executors {
diff --git a/core/benches/validation.rs b/core/benches/validation.rs
index 8aff8c01ce0..d7e5459f090 100644
--- a/core/benches/validation.rs
+++ b/core/benches/validation.rs
@@ -186,7 +186,7 @@ fn sign_blocks(criterion: &mut Criterion) {
b.iter_batched(
|| block.clone(),
|block| {
- let _: ValidBlock = block.sign(&key_pair);
+ let _: ValidBlock = block.sign(&key_pair).unpack(|_| {});
count += 1;
},
BatchSize::SmallInput,
diff --git a/core/src/block.rs b/core/src/block.rs
index 4a6f210502e..3134f1109b0 100644
--- a/core/src/block.rs
+++ b/core/src/block.rs
@@ -6,7 +6,6 @@
//! [`Block`]s are organised into a linear sequence over time (also known as the block chain).
use std::error::Error as _;
-use iroha_config::parameters::defaults::chain_wide::DEFAULT_CONSENSUS_ESTIMATION;
use iroha_crypto::{HashOf, KeyPair, MerkleTree, SignatureOf, SignaturesOf};
use iroha_data_model::{
block::*,
@@ -18,6 +17,7 @@ use iroha_genesis::GenesisTransaction;
use iroha_primitives::unique_vec::UniqueVec;
use thiserror::Error;
+pub(crate) use self::event::WithEvents;
pub use self::{chained::Chained, commit::CommittedBlock, valid::ValidBlock};
use crate::{prelude::*, sumeragi::network_topology::Topology, tx::AcceptTransactionFail};
@@ -37,20 +37,27 @@ pub enum TransactionValidationError {
pub enum BlockValidationError {
/// Block has committed transactions
HasCommittedTransactions,
- /// Mismatch between the actual and expected hashes of the latest block. Expected: {expected:?}, actual: {actual:?}
- LatestBlockHashMismatch {
+ /// Mismatch between the actual and expected hashes of the previous block. Expected: {expected:?}, actual: {actual:?}
+ PrevBlockHashMismatch {
/// Expected value
expected: Option>,
/// Actual value
actual: Option>,
},
- /// Mismatch between the actual and expected height of the latest block. Expected: {expected}, actual: {actual}
- LatestBlockHeightMismatch {
+ /// Mismatch between the actual and expected height of the previous block. Expected: {expected}, actual: {actual}
+ PrevBlockHeightMismatch {
/// Expected value
expected: u64,
/// Actual value
actual: u64,
},
+ /// Mismatch between the actual and expected hashes of the current block. Expected: {expected:?}, actual: {actual:?}
+ IncorrectHash {
+ /// Expected value
+ expected: HashOf,
+ /// Actual value
+ actual: HashOf,
+ },
/// The transaction hash stored in the block header does not match the actual transaction hash
TransactionHashMismatch,
/// Error during transaction validation
@@ -93,6 +100,8 @@ pub enum SignatureVerificationError {
pub struct BlockBuilder(B);
mod pending {
+ use std::time::SystemTime;
+
use iroha_data_model::transaction::TransactionValue;
use super::*;
@@ -110,7 +119,7 @@ mod pending {
/// Transaction will be validated when block is chained.
transactions: Vec,
/// Event recommendations for use in triggers and off-chain work
- event_recommendations: Vec,
+ event_recommendations: Vec,
}
impl BlockBuilder {
@@ -123,7 +132,7 @@ mod pending {
pub fn new(
transactions: Vec,
commit_topology: Topology,
- event_recommendations: Vec,
+ event_recommendations: Vec,
) -> Self {
assert!(!transactions.is_empty(), "Empty block created");
@@ -136,27 +145,26 @@ mod pending {
fn make_header(
previous_height: u64,
- previous_block_hash: Option>,
+ prev_block_hash: Option>,
view_change_index: u64,
transactions: &[TransactionValue],
) -> BlockHeader {
BlockHeader {
- timestamp_ms: iroha_data_model::current_time()
- .as_millis()
- .try_into()
- .expect("Time should fit into u64"),
- consensus_estimation_ms: DEFAULT_CONSENSUS_ESTIMATION
+ creation_time_ms: SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Failed to get the current system time")
.as_millis()
.try_into()
.expect("Time should fit into u64"),
height: previous_height + 1,
view_change_index,
- previous_block_hash,
+ prev_block_hash,
transactions_hash: transactions
.iter()
.map(|value| value.as_ref().hash())
.collect::>()
- .hash(),
+ .hash()
+ .unwrap(),
}
}
@@ -222,16 +230,16 @@ mod chained {
impl BlockBuilder {
/// Sign this block and get [`SignedBlock`].
- pub fn sign(self, key_pair: &KeyPair) -> ValidBlock {
+ pub fn sign(self, key_pair: &KeyPair) -> WithEvents {
let signature = SignatureOf::new(key_pair, &self.0 .0);
- ValidBlock(
+ WithEvents::new(ValidBlock(
SignedBlockV1 {
payload: self.0 .0,
signatures: SignaturesOf::from(signature),
}
.into(),
- )
+ ))
}
}
}
@@ -245,7 +253,7 @@ mod valid {
/// Block that was validated and accepted
#[derive(Debug, Clone)]
#[repr(transparent)]
- pub struct ValidBlock(pub(crate) SignedBlock);
+ pub struct ValidBlock(pub(super) SignedBlock);
impl ValidBlock {
/// Validate a block against the current state of the world.
@@ -264,7 +272,7 @@ mod valid {
topology: &Topology,
expected_chain_id: &ChainId,
state_block: &mut StateBlock<'_>,
- ) -> Result {
+ ) -> WithEvents> {
if !block.header().is_genesis() {
let actual_commit_topology = block.commit_topology();
let expected_commit_topology = &topology.ordered_peers;
@@ -272,20 +280,23 @@ mod valid {
if actual_commit_topology != expected_commit_topology {
let actual_commit_topology = actual_commit_topology.clone();
- return Err((
+ return WithEvents::new(Err((
block,
BlockValidationError::TopologyMismatch {
expected: expected_commit_topology.clone(),
actual: actual_commit_topology,
},
- ));
+ )));
}
if topology
.filter_signatures_by_roles(&[Role::Leader], block.signatures())
.is_empty()
{
- return Err((block, SignatureVerificationError::LeaderMissing.into()));
+ return WithEvents::new(Err((
+ block,
+ SignatureVerificationError::LeaderMissing.into(),
+ )));
}
}
@@ -293,48 +304,51 @@ mod valid {
let actual_height = block.header().height;
if expected_block_height != actual_height {
- return Err((
+ return WithEvents::new(Err((
block,
- BlockValidationError::LatestBlockHeightMismatch {
+ BlockValidationError::PrevBlockHeightMismatch {
expected: expected_block_height,
actual: actual_height,
},
- ));
+ )));
}
- let expected_previous_block_hash = state_block.latest_block_hash();
- let actual_block_hash = block.header().previous_block_hash;
+ let expected_prev_block_hash = state_block.latest_block_hash();
+ let actual_prev_block_hash = block.header().prev_block_hash;
- if expected_previous_block_hash != actual_block_hash {
- return Err((
+ if expected_prev_block_hash != actual_prev_block_hash {
+ return WithEvents::new(Err((
block,
- BlockValidationError::LatestBlockHashMismatch {
- expected: expected_previous_block_hash,
- actual: actual_block_hash,
+ BlockValidationError::PrevBlockHashMismatch {
+ expected: expected_prev_block_hash,
+ actual: actual_prev_block_hash,
},
- ));
+ )));
}
if block
.transactions()
.any(|tx| state_block.has_transaction(tx.as_ref().hash()))
{
- return Err((block, BlockValidationError::HasCommittedTransactions));
+ return WithEvents::new(Err((
+ block,
+ BlockValidationError::HasCommittedTransactions,
+ )));
}
if let Err(error) = Self::validate_transactions(&block, expected_chain_id, state_block)
{
- return Err((block, error.into()));
+ return WithEvents::new(Err((block, error.into())));
}
let SignedBlock::V1(block) = block;
- Ok(ValidBlock(
+ WithEvents::new(Ok(ValidBlock(
SignedBlockV1 {
payload: block.payload,
signatures: block.signatures,
}
.into(),
- ))
+ )))
}
fn validate_transactions(
@@ -379,24 +393,44 @@ mod valid {
///
/// - Not enough signatures
/// - Not signed by proxy tail
- pub(crate) fn commit_with_signatures(
+ pub fn commit_with_signatures(
mut self,
topology: &Topology,
signatures: SignaturesOf,
- ) -> Result {
+ expected_hash: HashOf,
+ ) -> WithEvents> {
if topology
.filter_signatures_by_roles(&[Role::Leader], &signatures)
.is_empty()
{
- return Err((self, SignatureVerificationError::LeaderMissing.into()));
+ return WithEvents::new(Err((
+ self,
+ SignatureVerificationError::LeaderMissing.into(),
+ )));
}
if !self.as_ref().signatures().is_subset(&signatures) {
- return Err((self, SignatureVerificationError::SignatureMissing.into()));
+ return WithEvents::new(Err((
+ self,
+ SignatureVerificationError::SignatureMissing.into(),
+ )));
}
if !self.0.replace_signatures(signatures) {
- return Err((self, SignatureVerificationError::UnknownSignature.into()));
+ return WithEvents::new(Err((
+ self,
+ SignatureVerificationError::UnknownSignature.into(),
+ )));
+ }
+
+ let actual_block_hash = self.as_ref().hash();
+ if actual_block_hash != expected_hash {
+ let err = BlockValidationError::IncorrectHash {
+ expected: expected_hash,
+ actual: actual_block_hash,
+ };
+
+ return WithEvents::new(Err((self, err)));
}
self.commit(topology)
@@ -411,19 +445,19 @@ mod valid {
pub fn commit(
self,
topology: &Topology,
- ) -> Result {
+ ) -> WithEvents> {
if !self.0.header().is_genesis() {
if let Err(err) = self.verify_signatures(topology) {
- return Err((self, err.into()));
+ return WithEvents::new(Err((self, err.into())));
}
}
- Ok(CommittedBlock(self))
+ WithEvents::new(Ok(CommittedBlock(self)))
}
/// Add additional signatures for [`Self`].
#[must_use]
- pub fn sign(self, key_pair: &KeyPair) -> Self {
+ pub fn sign(self, key_pair: &KeyPair) -> ValidBlock {
ValidBlock(self.0.sign(key_pair))
}
@@ -443,21 +477,20 @@ mod valid {
pub(crate) fn new_dummy() -> Self {
BlockBuilder(Chained(BlockPayload {
header: BlockHeader {
- timestamp_ms: 0,
- consensus_estimation_ms: DEFAULT_CONSENSUS_ESTIMATION
- .as_millis()
- .try_into()
- .expect("Should never overflow?"),
height: 2,
+ creation_time_ms: 0,
+ prev_block_hash: None,
+ transactions_hash: HashOf::from_untyped_unchecked(Hash::prehashed(
+ [0_u8; Hash::LENGTH],
+ )),
view_change_index: 0,
- previous_block_hash: None,
- transactions_hash: None,
},
transactions: Vec::new(),
commit_topology: UniqueVec::new(),
event_recommendations: Vec::new(),
}))
.sign(&KeyPair::random())
+ .unpack(|_| {})
}
/// Check if block's signatures meet requirements for given topology.
@@ -628,31 +661,7 @@ mod commit {
/// Represents a block accepted by consensus.
/// Every [`Self`] will have a different height.
#[derive(Debug, Clone)]
- pub struct CommittedBlock(pub(crate) ValidBlock);
-
- impl CommittedBlock {
- pub(crate) fn produce_events(&self) -> Vec {
- let tx = self.as_ref().transactions().map(|tx| {
- let status = tx.error.as_ref().map_or_else(
- || PipelineStatus::Committed,
- |error| PipelineStatus::Rejected(error.clone().into()),
- );
-
- PipelineEvent {
- entity_kind: PipelineEntityKind::Transaction,
- status,
- hash: tx.as_ref().hash().into(),
- }
- });
- let current_block = core::iter::once(PipelineEvent {
- entity_kind: PipelineEntityKind::Block,
- status: PipelineStatus::Committed,
- hash: self.as_ref().hash().into(),
- });
-
- tx.chain(current_block).collect()
- }
- }
+ pub struct CommittedBlock(pub(super) ValidBlock);
impl From for ValidBlock {
fn from(source: CommittedBlock) -> Self {
@@ -666,12 +675,105 @@ mod commit {
}
}
- // Invariants of [`CommittedBlock`] can't be violated through immutable reference
impl AsRef for CommittedBlock {
fn as_ref(&self) -> &SignedBlock {
&self.0 .0
}
}
+
+ #[cfg(test)]
+ impl AsMut for CommittedBlock {
+ fn as_mut(&mut self) -> &mut SignedBlock {
+ &mut self.0 .0
+ }
+ }
+}
+
+mod event {
+ use super::*;
+
+ pub trait EventProducer {
+ fn produce_events(&self) -> impl Iterator
- ;
+ }
+
+ #[derive(Debug)]
+ #[must_use]
+ pub struct WithEvents(B);
+
+ impl WithEvents {
+ pub(super) fn new(source: B) -> Self {
+ Self(source)
+ }
+ }
+
+ impl WithEvents> {
+ pub fn unpack(self, f: F) -> Result {
+ match self.0 {
+ Ok(ok) => Ok(WithEvents(ok).unpack(f)),
+ Err(err) => Err(WithEvents(err).unpack(f)),
+ }
+ }
+ }
+ impl WithEvents {
+ pub fn unpack(self, f: F) -> B {
+ self.0.produce_events().for_each(f);
+ self.0
+ }
+ }
+
+ impl WithEvents<(B, E)> {
+ pub(crate) fn unpack(self, f: F) -> (B, E) {
+ self.0 .1.produce_events().for_each(f);
+ self.0
+ }
+ }
+
+ impl EventProducer for ValidBlock {
+ fn produce_events(&self) -> impl Iterator
- {
+ let block_height = self.as_ref().header().height;
+
+ let tx_events = self.as_ref().transactions().map(move |tx| {
+ let status = tx.error.as_ref().map_or_else(
+ || TransactionStatus::Approved,
+ |error| TransactionStatus::Rejected(error.clone().into()),
+ );
+
+ TransactionEvent {
+ block_height: Some(block_height),
+ hash: tx.as_ref().hash(),
+ status,
+ }
+ });
+
+ let block_event = core::iter::once(BlockEvent {
+ header: self.as_ref().header().clone(),
+ hash: self.as_ref().hash(),
+ status: BlockStatus::Approved,
+ });
+
+ tx_events
+ .map(PipelineEventBox::from)
+ .chain(block_event.map(Into::into))
+ }
+ }
+
+ impl EventProducer for CommittedBlock {
+ fn produce_events(&self) -> impl Iterator
- {
+ let block_event = core::iter::once(BlockEvent {
+ header: self.as_ref().header().clone(),
+ hash: self.as_ref().hash(),
+ status: BlockStatus::Committed,
+ });
+
+ block_event.map(Into::into)
+ }
+ }
+
+ impl EventProducer for BlockValidationError {
+ fn produce_events(&self) -> impl Iterator
- {
+ core::iter::empty()
+ }
+ }
}
#[cfg(test)]
@@ -690,12 +792,13 @@ mod tests {
pub fn committed_and_valid_block_hashes_are_equal() {
let valid_block = ValidBlock::new_dummy();
let topology = Topology::new(UniqueVec::new());
- let committed_block = valid_block.clone().commit(&topology).unwrap();
+ let committed_block = valid_block
+ .clone()
+ .commit(&topology)
+ .unpack(|_| {})
+ .unwrap();
- assert_eq!(
- valid_block.0.hash_of_payload(),
- committed_block.as_ref().hash_of_payload()
- )
+ assert_eq!(valid_block.0.hash(), committed_block.as_ref().hash())
}
#[tokio::test]
@@ -733,13 +836,26 @@ mod tests {
let topology = Topology::new(UniqueVec::new());
let valid_block = BlockBuilder::new(transactions, topology, Vec::new())
.chain(0, &mut state_block)
- .sign(&alice_keys);
+ .sign(&alice_keys)
+ .unpack(|_| {});
// The first transaction should be confirmed
- assert!(valid_block.0.transactions().next().unwrap().error.is_none());
+ assert!(valid_block
+ .as_ref()
+ .transactions()
+ .next()
+ .unwrap()
+ .error
+ .is_none());
// The second transaction should be rejected
- assert!(valid_block.0.transactions().nth(1).unwrap().error.is_some());
+ assert!(valid_block
+ .as_ref()
+ .transactions()
+ .nth(1)
+ .unwrap()
+ .error
+ .is_some());
}
#[tokio::test]
@@ -795,13 +911,26 @@ mod tests {
let topology = Topology::new(UniqueVec::new());
let valid_block = BlockBuilder::new(transactions, topology, Vec::new())
.chain(0, &mut state_block)
- .sign(&alice_keys);
+ .sign(&alice_keys)
+ .unpack(|_| {});
// The first transaction should fail
- assert!(valid_block.0.transactions().next().unwrap().error.is_some());
+ assert!(valid_block
+ .as_ref()
+ .transactions()
+ .next()
+ .unwrap()
+ .error
+ .is_some());
// The third transaction should succeed
- assert!(valid_block.0.transactions().nth(2).unwrap().error.is_none());
+ assert!(valid_block
+ .as_ref()
+ .transactions()
+ .nth(2)
+ .unwrap()
+ .error
+ .is_none());
}
#[tokio::test]
@@ -852,17 +981,30 @@ mod tests {
let topology = Topology::new(UniqueVec::new());
let valid_block = BlockBuilder::new(transactions, topology, Vec::new())
.chain(0, &mut state_block)
- .sign(&alice_keys);
+ .sign(&alice_keys)
+ .unpack(|_| {});
// The first transaction should be rejected
assert!(
- valid_block.0.transactions().next().unwrap().error.is_some(),
+ valid_block
+ .as_ref()
+ .transactions()
+ .next()
+ .unwrap()
+ .error
+ .is_some(),
"The first transaction should be rejected, as it contains `Fail`."
);
// The second transaction should be accepted
assert!(
- valid_block.0.transactions().nth(1).unwrap().error.is_none(),
+ valid_block
+ .as_ref()
+ .transactions()
+ .nth(1)
+ .unwrap()
+ .error
+ .is_none(),
"The second transaction should be accepted."
);
}
diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs
index d2e5c6b7219..ef7f5b8c10a 100644
--- a/core/src/block_sync.rs
+++ b/core/src/block_sync.rs
@@ -91,16 +91,13 @@ impl BlockSynchronizer {
/// Sends request for latest blocks to a chosen peer
async fn request_latest_blocks_from_peer(&mut self, peer_id: PeerId) {
- let (previous_hash, latest_hash) = {
+ let (prev_hash, latest_hash) = {
let state_view = self.state.view();
- (
- state_view.previous_block_hash(),
- state_view.latest_block_hash(),
- )
+ (state_view.prev_block_hash(), state_view.latest_block_hash())
};
message::Message::GetBlocksAfter(message::GetBlocksAfter::new(
latest_hash,
- previous_hash,
+ prev_hash,
self.peer_id.clone(),
))
.send_to(&self.network, peer_id)
@@ -138,7 +135,7 @@ pub mod message {
/// Hash of latest available block
pub latest_hash: Option>,
/// Hash of second to latest block
- pub previous_hash: Option>,
+ pub prev_hash: Option>,
/// Peer id
pub peer_id: PeerId,
}
@@ -147,12 +144,12 @@ pub mod message {
/// Construct [`GetBlocksAfter`].
pub const fn new(
latest_hash: Option>,
- previous_hash: Option>,
+ prev_hash: Option>,
peer_id: PeerId,
) -> Self {
Self {
latest_hash,
- previous_hash,
+ prev_hash,
peer_id,
}
}
@@ -190,21 +187,21 @@ pub mod message {
match self {
Message::GetBlocksAfter(GetBlocksAfter {
latest_hash,
- previous_hash,
+ prev_hash,
peer_id,
}) => {
let local_latest_block_hash = block_sync.state.view().latest_block_hash();
if *latest_hash == local_latest_block_hash
- || *previous_hash == local_latest_block_hash
+ || *prev_hash == local_latest_block_hash
{
return;
}
- let start_height = match previous_hash {
+ let start_height = match prev_hash {
Some(hash) => match block_sync.kura.get_block_height_by_hash(hash) {
None => {
- error!(?previous_hash, "Block hash not found");
+ error!(?prev_hash, "Block hash not found");
return;
}
Some(height) => height + 1, // It's get blocks *after*, so we add 1.
@@ -223,9 +220,9 @@ pub mod message {
// The only case where the blocks array could be empty is if we got queried for blocks
// after the latest hash. There is a check earlier in the function that returns early
// so it should not be possible for us to get here.
- error!(hash=?previous_hash, "Blocks array is empty but shouldn't be.");
+ error!(hash=?prev_hash, "Blocks array is empty but shouldn't be.");
} else {
- trace!(hash=?previous_hash, "Sharing blocks after hash");
+ trace!(hash=?prev_hash, "Sharing blocks after hash");
Message::ShareBlocks(ShareBlocks::new(blocks, block_sync.peer_id.clone()))
.send_to(&block_sync.network, peer_id.clone())
.await;
diff --git a/core/src/kura.rs b/core/src/kura.rs
index 3dc536f9c2d..49bbf9d401a 100644
--- a/core/src/kura.rs
+++ b/core/src/kura.rs
@@ -154,7 +154,7 @@ impl Kura {
let mut block_indices = vec![BlockIndex::default(); block_index_count];
block_store.read_block_indices(0, &mut block_indices)?;
- let mut previous_block_hash = None;
+ let mut prev_block_hash = None;
for block in block_indices {
// This is re-allocated every iteration. This could cause a problem.
let mut block_data_buffer = vec![0_u8; block.length.try_into()?];
@@ -162,13 +162,13 @@ impl Kura {
match block_store.read_block_data(block.start, &mut block_data_buffer) {
Ok(()) => match SignedBlock::decode_all_versioned(&block_data_buffer) {
Ok(decoded_block) => {
- if previous_block_hash != decoded_block.header().previous_block_hash {
+ if prev_block_hash != decoded_block.header().prev_block_hash {
error!("Block has wrong previous block hash. Not reading any blocks beyond this height.");
break;
}
let decoded_block_hash = decoded_block.hash();
block_hashes.push(decoded_block_hash);
- previous_block_hash = Some(decoded_block_hash);
+ prev_block_hash = Some(decoded_block_hash);
}
Err(error) => {
error!(?error, "Encountered malformed block. Not reading any blocks beyond this height.");
diff --git a/core/src/lib.rs b/core/src/lib.rs
index ab0b9be0d6b..06a0bd4103f 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -18,7 +18,7 @@ use core::time::Duration;
use gossiper::TransactionGossip;
use indexmap::IndexSet;
-use iroha_data_model::prelude::*;
+use iroha_data_model::{events::EventBox, prelude::*};
use iroha_primitives::unique_vec::UniqueVec;
use parity_scale_codec::{Decode, Encode};
use tokio::sync::broadcast;
@@ -41,8 +41,8 @@ pub type PeersIds = UniqueVec;
/// Parameters set.
pub type Parameters = IndexSet;
-/// Type of `Sender` which should be used for channels of `Event` messages.
-pub type EventsSender = broadcast::Sender;
+/// Type of `Sender` which should be used for channels of `Event` messages.
+pub type EventsSender = broadcast::Sender;
/// The network message
#[derive(Clone, Debug, Encode, Decode)]
diff --git a/core/src/queue.rs b/core/src/queue.rs
index d463a655a4c..cf5e18c0e5b 100644
--- a/core/src/queue.rs
+++ b/core/src/queue.rs
@@ -1,6 +1,6 @@
//! Module with queue actor
use core::time::Duration;
-use std::num::NonZeroUsize;
+use std::{num::NonZeroUsize, time::SystemTime};
use crossbeam_queue::ArrayQueue;
use dashmap::{mapref::entry::Entry, DashMap};
@@ -8,13 +8,17 @@ use eyre::Result;
use indexmap::IndexSet;
use iroha_config::parameters::actual::Queue as Config;
use iroha_crypto::HashOf;
-use iroha_data_model::{account::AccountId, transaction::prelude::*};
+use iroha_data_model::{
+ account::AccountId,
+ events::pipeline::{TransactionEvent, TransactionStatus},
+ transaction::prelude::*,
+};
use iroha_logger::{trace, warn};
use iroha_primitives::must_use::MustUse;
use rand::seq::IteratorRandom;
use thiserror::Error;
-use crate::prelude::*;
+use crate::{prelude::*, EventsSender};
impl AcceptedTransaction {
// TODO: We should have another type of transaction like `CheckedTransaction` in the type system?
@@ -48,6 +52,7 @@ impl AcceptedTransaction {
/// Multiple producers, single consumer
#[derive(Debug)]
pub struct Queue {
+ events_sender: EventsSender,
/// The queue for transactions
tx_hashes: ArrayQueue>,
/// [`AcceptedTransaction`]s addressed by `Hash`
@@ -96,8 +101,9 @@ pub struct Failure {
impl Queue {
/// Makes queue from configuration
- pub fn from_config(cfg: Config) -> Self {
+ pub fn from_config(cfg: Config, events_sender: EventsSender) -> Self {
Self {
+ events_sender,
tx_hashes: ArrayQueue::new(cfg.capacity.get()),
accepted_txs: DashMap::new(),
txs_per_user: DashMap::new(),
@@ -121,13 +127,19 @@ impl Queue {
|tx_time_to_live| core::cmp::min(self.tx_time_to_live, tx_time_to_live),
);
- iroha_data_model::current_time().saturating_sub(tx_creation_time) > time_limit
+ let curr_time = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Failed to get the current system time");
+ curr_time.saturating_sub(tx_creation_time) > time_limit
}
/// If `true`, this transaction is regarded to have been tampered to have a future timestamp.
fn is_in_future(&self, tx: &AcceptedTransaction) -> bool {
let tx_timestamp = tx.as_ref().creation_time();
- tx_timestamp.saturating_sub(iroha_data_model::current_time()) > self.future_threshold
+ let curr_time = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Failed to get the current system time");
+ tx_timestamp.saturating_sub(curr_time) > self.future_threshold
}
/// Returns all pending transactions.
@@ -226,6 +238,14 @@ impl Queue {
err: Error::Full,
}
})?;
+ let _ = self.events_sender.send(
+ TransactionEvent {
+ hash,
+ block_height: None,
+ status: TransactionStatus::Queued,
+ }
+ .into(),
+ );
trace!("Transaction queue length = {}", self.tx_hashes.len(),);
Ok(())
}
@@ -281,12 +301,7 @@ impl Queue {
max_txs_in_block: usize,
) -> Vec {
let mut transactions = Vec::with_capacity(max_txs_in_block);
- self.get_transactions_for_block(
- state_view,
- max_txs_in_block,
- &mut transactions,
- &mut Vec::new(),
- );
+ self.get_transactions_for_block(state_view, max_txs_in_block, &mut transactions);
transactions
}
@@ -298,17 +313,16 @@ impl Queue {
state_view: &StateView,
max_txs_in_block: usize,
transactions: &mut Vec,
- expired_transactions: &mut Vec,
) {
if transactions.len() >= max_txs_in_block {
return;
}
let mut seen_queue = Vec::new();
- let mut expired_transactions_queue = Vec::new();
+ let mut expired_transactions = Vec::new();
let txs_from_queue = core::iter::from_fn(|| {
- self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions_queue)
+ self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions)
});
let transactions_hashes: IndexSet> =
@@ -322,7 +336,17 @@ impl Queue {
.into_iter()
.try_for_each(|hash| self.tx_hashes.push(hash))
.expect("Exceeded the number of transactions pending");
- expired_transactions.extend(expired_transactions_queue);
+
+ expired_transactions
+ .into_iter()
+ .map(|tx| TransactionEvent {
+ hash: tx.as_ref().hash(),
+ block_height: None,
+ status: TransactionStatus::Expired,
+ })
+ .for_each(|e| {
+ let _ = self.events_sender.send(e.into());
+ });
}
/// Check that the user adhered to the maximum transaction per user limit and increment their transaction count.
@@ -381,6 +405,21 @@ pub mod tests {
PeersIds,
};
+ impl Queue {
+ pub fn test(cfg: Config) -> Self {
+ Self {
+ events_sender: tokio::sync::broadcast::Sender::new(1),
+ tx_hashes: ArrayQueue::new(cfg.capacity.get()),
+ accepted_txs: DashMap::new(),
+ txs_per_user: DashMap::new(),
+ capacity: cfg.capacity,
+ capacity_per_user: cfg.capacity_per_user,
+ tx_time_to_live: cfg.transaction_time_to_live,
+ future_threshold: cfg.future_threshold,
+ }
+ }
+ }
+
fn accepted_tx(account_id: &str, key: &KeyPair) -> AcceptedTransaction {
let chain_id = ChainId::from("0");
@@ -437,7 +476,7 @@ pub mod tests {
));
let state_view = state.view();
- let queue = Queue::from_config(config_factory());
+ let queue = Queue::test(config_factory());
queue
.push(accepted_tx("alice@wonderland", &key_pair), &state_view)
@@ -458,7 +497,7 @@ pub mod tests {
));
let state_view = state.view();
- let queue = Queue::from_config(Config {
+ let queue = Queue::test(Config {
transaction_time_to_live: Duration::from_secs(100),
capacity,
..Config::default()
@@ -504,7 +543,7 @@ pub mod tests {
};
let state_view = state.view();
- let queue = Queue::from_config(config_factory());
+ let queue = Queue::test(config_factory());
let instructions: [InstructionBox; 0] = [];
let tx =
TransactionBuilder::new(chain_id.clone(), "alice@wonderland".parse().expect("Valid"))
@@ -560,7 +599,7 @@ pub mod tests {
query_handle,
));
let state_view = state.view();
- let queue = Queue::from_config(Config {
+ let queue = Queue::test(Config {
transaction_time_to_live: Duration::from_secs(100),
..config_factory()
});
@@ -590,7 +629,7 @@ pub mod tests {
state_block.transactions.insert(tx.as_ref().hash(), 1);
state_block.commit();
let state_view = state.view();
- let queue = Queue::from_config(config_factory());
+ let queue = Queue::test(config_factory());
assert!(matches!(
queue.push(tx, &state_view),
Err(Failure {
@@ -613,7 +652,7 @@ pub mod tests {
query_handle,
);
let tx = accepted_tx("alice@wonderland", &alice_key);
- let queue = Queue::from_config(config_factory());
+ let queue = Queue::test(config_factory());
queue.push(tx.clone(), &state.view()).unwrap();
let mut state_block = state.block();
state_block.transactions.insert(tx.as_ref().hash(), 1);
@@ -639,7 +678,7 @@ pub mod tests {
query_handle,
));
let state_view = state.view();
- let queue = Queue::from_config(Config {
+ let queue = Queue::test(Config {
transaction_time_to_live: Duration::from_millis(300),
..config_factory()
});
@@ -687,7 +726,7 @@ pub mod tests {
query_handle,
));
let state_view = state.view();
- let queue = Queue::from_config(config_factory());
+ let queue = Queue::test(config_factory());
queue
.push(accepted_tx("alice@wonderland", &alice_key), &state_view)
.expect("Failed to push tx into queue");
@@ -722,7 +761,9 @@ pub mod tests {
query_handle,
));
let state_view = state.view();
- let queue = Queue::from_config(config_factory());
+ let mut queue = Queue::test(config_factory());
+ let (event_sender, mut event_receiver) = tokio::sync::broadcast::channel(1);
+ queue.events_sender = event_sender;
let instructions = [Fail {
message: "expired".to_owned(),
}];
@@ -737,18 +778,26 @@ pub mod tests {
max_instruction_number: 4096,
max_wasm_size_bytes: 0,
};
+ let tx_hash = tx.hash();
let tx = AcceptedTransaction::accept(tx, &chain_id, &limits)
.expect("Failed to accept Transaction.");
queue
.push(tx.clone(), &state_view)
.expect("Failed to push tx into queue");
let mut txs = Vec::new();
- let mut expired_txs = Vec::new();
thread::sleep(Duration::from_millis(TTL_MS));
- queue.get_transactions_for_block(&state_view, max_txs_in_block, &mut txs, &mut expired_txs);
+ queue.get_transactions_for_block(&state_view, max_txs_in_block, &mut txs);
assert!(txs.is_empty());
- assert_eq!(expired_txs.len(), 1);
- assert_eq!(expired_txs[0], tx);
+
+ assert_eq!(
+ event_receiver.recv().await.unwrap(),
+ TransactionEvent {
+ hash: tx_hash,
+ block_height: None,
+ status: TransactionStatus::Expired,
+ }
+ .into()
+ )
}
#[test]
@@ -763,7 +812,7 @@ pub mod tests {
query_handle,
));
- let queue = Arc::new(Queue::from_config(Config {
+ let queue = Arc::new(Queue::test(Config {
transaction_time_to_live: Duration::from_secs(100),
capacity: 100_000_000.try_into().unwrap(),
..Config::default()
@@ -837,7 +886,7 @@ pub mod tests {
));
let state_view = state.view();
- let queue = Queue::from_config(Config {
+ let queue = Queue::test(Config {
future_threshold,
..Config::default()
});
@@ -898,7 +947,7 @@ pub mod tests {
let query_handle = LiveQueryStore::test().start();
let state = State::new(world, kura, query_handle);
- let queue = Queue::from_config(Config {
+ let queue = Queue::test(Config {
transaction_time_to_live: Duration::from_secs(100),
capacity: 100.try_into().unwrap(),
capacity_per_user: 1.try_into().unwrap(),
diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs
index 1b8f8715ad8..e74c18ee217 100644
--- a/core/src/smartcontracts/isi/query.rs
+++ b/core/src/smartcontracts/isi/query.rs
@@ -316,7 +316,9 @@ mod tests {
let first_block = BlockBuilder::new(transactions.clone(), topology.clone(), Vec::new())
.chain(0, &mut state_block)
.sign(&ALICE_KEYS)
+ .unpack(|_| {})
.commit(&topology)
+ .unpack(|_| {})
.expect("Block is valid");
state_block.apply(&first_block)?;
@@ -326,7 +328,9 @@ mod tests {
let block = BlockBuilder::new(transactions.clone(), topology.clone(), Vec::new())
.chain(0, &mut state_block)
.sign(&ALICE_KEYS)
+ .unpack(|_| {})
.commit(&topology)
+ .unpack(|_| {})
.expect("Block is valid");
state_block.apply(&block)?;
@@ -466,7 +470,9 @@ mod tests {
let vcb = BlockBuilder::new(vec![va_tx.clone()], topology.clone(), Vec::new())
.chain(0, &mut state_block)
.sign(&ALICE_KEYS)
+ .unpack(|_| {})
.commit(&topology)
+ .unpack(|_| {})
.expect("Block is valid");
state_block.apply(&vcb)?;
diff --git a/core/src/smartcontracts/isi/triggers/set.rs b/core/src/smartcontracts/isi/triggers/set.rs
index d7bfca0b769..63d7732e92b 100644
--- a/core/src/smartcontracts/isi/triggers/set.rs
+++ b/core/src/smartcontracts/isi/triggers/set.rs
@@ -58,8 +58,8 @@ type WasmSmartContractMap = IndexMap, (WasmSmartContra
pub struct Set {
/// Triggers using [`DataEventFilter`]
data_triggers: IndexMap>,
- /// Triggers using [`PipelineEventFilter`]
- pipeline_triggers: IndexMap>,
+ /// Triggers using [`PipelineEventFilterBox`]
+ pipeline_triggers: IndexMap>,
/// Triggers using [`TimeEventFilter`]
time_triggers: IndexMap>,
/// Triggers using [`ExecuteTriggerEventFilter`]
@@ -70,7 +70,7 @@ pub struct Set {
original_contracts: WasmSmartContractMap,
/// List of actions that should be triggered by events provided by `handle_*` methods.
/// Vector is used to save the exact triggers order.
- matched_ids: Vec<(Event, TriggerId)>,
+ matched_ids: Vec<(EventBox, TriggerId)>,
}
/// Helper struct for serializing triggers.
@@ -177,7 +177,7 @@ impl<'de> DeserializeSeed<'de> for WasmSeed<'_, Set> {
"pipeline_triggers" => {
let triggers: IndexMap<
TriggerId,
- SpecializedAction,
+ SpecializedAction,
> = map.next_value()?;
for (id, action) in triggers {
set.add_pipeline_trigger(
@@ -259,7 +259,7 @@ impl Set {
})
}
- /// Add trigger with [`PipelineEventFilter`]
+ /// Add trigger with [`PipelineEventFilterBox`]
///
/// Return `false` if a trigger with given id already exists
///
@@ -270,7 +270,7 @@ impl Set {
pub fn add_pipeline_trigger(
&mut self,
engine: &wasmtime::Engine,
- trigger: SpecializedTrigger,
+ trigger: SpecializedTrigger,
) -> Result {
self.add_to(engine, trigger, TriggeringEventType::Pipeline, |me| {
&mut me.pipeline_triggers
@@ -721,18 +721,6 @@ impl Set {
};
}
- /// Handle [`PipelineEvent`].
- ///
- /// Find all actions that are triggered by `event` and store them.
- /// These actions are inspected in the next [`Set::inspect_matched()`] call.
- // Passing by value to follow other `handle_` methods interface
- #[allow(clippy::needless_pass_by_value)]
- pub fn handle_pipeline_event(&mut self, event: PipelineEvent) {
- self.pipeline_triggers.iter().for_each(|entry| {
- Self::match_and_insert_trigger(&mut self.matched_ids, event.clone(), entry)
- });
- }
-
/// Handle [`TimeEvent`].
///
/// Find all actions that are triggered by `event` and store them.
@@ -747,7 +735,7 @@ impl Set {
continue;
}
- let ids = core::iter::repeat_with(|| (Event::Time(event), id.clone())).take(
+ let ids = core::iter::repeat_with(|| (EventBox::Time(event), id.clone())).take(
count
.try_into()
.expect("`u32` should always fit in `usize`"),
@@ -761,8 +749,8 @@ impl Set {
/// Skips insertion:
/// - If the action's filter doesn't match an event
/// - If the action's repeats count equals to 0
- fn match_and_insert_trigger, F: EventFilter>(
- matched_ids: &mut Vec<(Event, TriggerId)>,
+ fn match_and_insert_trigger, F: EventFilter>(
+ matched_ids: &mut Vec<(EventBox, TriggerId)>,
event: E,
(id, action): (&TriggerId, &LoadedAction),
) {
@@ -825,7 +813,7 @@ impl Set {
}
/// Extract `matched_id`
- pub fn extract_matched_ids(&mut self) -> Vec<(Event, TriggerId)> {
+ pub fn extract_matched_ids(&mut self) -> Vec<(EventBox, TriggerId)> {
core::mem::take(&mut self.matched_ids)
}
}
diff --git a/core/src/smartcontracts/isi/triggers/specialized.rs b/core/src/smartcontracts/isi/triggers/specialized.rs
index 09e898b126d..24aa7b34500 100644
--- a/core/src/smartcontracts/isi/triggers/specialized.rs
+++ b/core/src/smartcontracts/isi/triggers/specialized.rs
@@ -103,7 +103,7 @@ macro_rules! impl_try_from_box {
impl_try_from_box! {
Data => DataEventFilter,
- Pipeline => PipelineEventFilter,
+ Pipeline => PipelineEventFilterBox,
Time => TimeEventFilter,
ExecuteTrigger => ExecuteTriggerEventFilter,
}
@@ -228,7 +228,7 @@ mod tests {
.unwrap()
}
TriggeringEventFilterBox::Pipeline(_) => {
- SpecializedTrigger::::try_from(boxed)
+ SpecializedTrigger::::try_from(boxed)
.map(|_| ())
.unwrap()
}
diff --git a/core/src/smartcontracts/wasm.rs b/core/src/smartcontracts/wasm.rs
index dd8df4bd163..25f27e25675 100644
--- a/core/src/smartcontracts/wasm.rs
+++ b/core/src/smartcontracts/wasm.rs
@@ -465,7 +465,7 @@ pub mod state {
#[derive(Constructor)]
pub struct Trigger {
/// Event which activated this trigger
- pub(in super::super) triggering_event: Event,
+ pub(in super::super) triggering_event: EventBox,
}
pub mod executor {
@@ -977,7 +977,7 @@ impl<'wrld, 'block: 'wrld, 'state: 'block> Runtime Result<()> {
let span = wasm_log_span!("Trigger execution", %id, %authority);
let state = state::Trigger::new(
diff --git a/core/src/state.rs b/core/src/state.rs
index b9291530cbf..febb38f6c20 100644
--- a/core/src/state.rs
+++ b/core/src/state.rs
@@ -7,7 +7,12 @@ use iroha_crypto::HashOf;
use iroha_data_model::{
account::AccountId,
block::SignedBlock,
- events::trigger_completed::{TriggerCompletedEvent, TriggerCompletedOutcome},
+ events::{
+ pipeline::BlockEvent,
+ time::TimeEvent,
+ trigger_completed::{TriggerCompletedEvent, TriggerCompletedOutcome},
+ EventBox,
+ },
isi::error::{InstructionExecutionError as Error, MathError},
parameter::{Parameter, ParameterValueBox},
permission::{PermissionTokenSchema, Permissions},
@@ -95,7 +100,7 @@ pub struct WorldBlock<'world> {
/// Runtime Executor
pub(crate) executor: CellBlock<'world, Executor>,
/// Events produced during execution of block
- pub(crate) events_buffer: Vec,
+ events_buffer: Vec,
}
/// Struct for single transaction's aggregated changes
@@ -126,7 +131,7 @@ pub struct WorldTransaction<'block, 'world> {
/// Wrapper for event's buffer to apply transaction rollback
struct TransactionEventBuffer<'block> {
/// Events produced during execution of block
- events_buffer: &'block mut Vec,
+ events_buffer: &'block mut Vec,
/// Number of events produced during execution current transaction
events_created_in_transaction: usize,
}
@@ -285,7 +290,7 @@ impl World {
}
}
- /// Create struct to apply block's changes while reverting changes made in the latest block
+ /// Create struct to apply block's changes while reverting changes made in the latest block
pub fn block_and_revert(&self) -> WorldBlock {
WorldBlock {
parameters: self.parameters.block_and_revert(),
@@ -895,14 +900,14 @@ impl WorldTransaction<'_, '_> {
}
impl TransactionEventBuffer<'_> {
- fn push(&mut self, event: Event) {
+ fn push(&mut self, event: EventBox) {
self.events_created_in_transaction += 1;
self.events_buffer.push(event);
}
}
-impl Extend for TransactionEventBuffer<'_> {
- fn extend>(&mut self, iter: T) {
+impl Extend for TransactionEventBuffer<'_> {
+ fn extend>(&mut self, iter: T) {
let len_before = self.events_buffer.len();
self.events_buffer.extend(iter);
let len_after = self.events_buffer.len();
@@ -1024,7 +1029,7 @@ pub trait StateReadOnly {
}
/// Return the hash of the block one before the latest block
- fn previous_block_hash(&self) -> Option> {
+ fn prev_block_hash(&self) -> Option> {
self.block_hashes().iter().nth_back(1).copied()
}
@@ -1087,7 +1092,7 @@ pub trait StateReadOnly {
let opt = self
.kura()
.get_block_by_height(1)
- .map(|genesis_block| genesis_block.header().timestamp());
+ .map(|genesis_block| genesis_block.header().creation_time());
if opt.is_none() {
error!("Failed to get genesis block from Kura.");
@@ -1183,13 +1188,10 @@ impl<'state> StateBlock<'state> {
deprecated(note = "This function is to be used in testing only. ")
)]
#[iroha_logger::log(skip_all, fields(block_height))]
- pub fn apply(&mut self, block: &CommittedBlock) -> Result<()> {
+ pub fn apply(&mut self, block: &CommittedBlock) -> Result> {
self.execute_transactions(block)?;
debug!("All block transactions successfully executed");
-
- self.apply_without_execution(block)?;
-
- Ok(())
+ Ok(self.apply_without_execution(block))
}
/// Execute `block` transactions and store their hashes as well as
@@ -1217,12 +1219,12 @@ impl<'state> StateBlock<'state> {
/// Apply transactions without actually executing them.
/// It's assumed that block's transaction was already executed (as part of validation for example).
#[iroha_logger::log(skip_all, fields(block_height = block.as_ref().header().height))]
- pub fn apply_without_execution(&mut self, block: &CommittedBlock) -> Result<()> {
+ pub fn apply_without_execution(&mut self, block: &CommittedBlock) -> Vec {
let block_hash = block.as_ref().hash();
trace!(%block_hash, "Applying block");
let time_event = self.create_time_event(block);
- self.world.events_buffer.push(Event::Time(time_event));
+ self.world.events_buffer.push(time_event.into());
let block_height = block.as_ref().header().height;
block
@@ -1248,24 +1250,44 @@ impl<'state> StateBlock<'state> {
self.block_hashes.push(block_hash);
self.apply_parameters();
-
- Ok(())
+ self.world.events_buffer.push(
+ BlockEvent {
+ header: block.as_ref().header().clone(),
+ hash: block.as_ref().hash(),
+ status: BlockStatus::Applied,
+ }
+ .into(),
+ );
+ core::mem::take(&mut self.world.events_buffer)
}
/// Create time event using previous and current blocks
fn create_time_event(&self, block: &CommittedBlock) -> TimeEvent {
+ use iroha_config::parameters::defaults::chain_wide::{
+ DEFAULT_BLOCK_TIME, DEFAULT_COMMIT_TIME,
+ };
+
+ const DEFAULT_CONSENSUS_ESTIMATION: Duration =
+ match DEFAULT_BLOCK_TIME.checked_add(match DEFAULT_COMMIT_TIME.checked_div(2) {
+ Some(x) => x,
+ None => unreachable!(),
+ }) {
+ Some(x) => x,
+ None => unreachable!(),
+ };
+
let prev_interval = self.latest_block_ref().map(|latest_block| {
let header = &latest_block.as_ref().header();
TimeInterval {
- since: header.timestamp(),
- length: header.consensus_estimation(),
+ since: header.creation_time(),
+ length: DEFAULT_CONSENSUS_ESTIMATION,
}
});
let interval = TimeInterval {
- since: block.as_ref().header().timestamp(),
- length: block.as_ref().header().consensus_estimation(),
+ since: block.as_ref().header().creation_time(),
+ length: DEFAULT_CONSENSUS_ESTIMATION,
};
TimeEvent {
@@ -1388,7 +1410,7 @@ impl StateTransaction<'_, '_> {
&mut self,
id: &TriggerId,
action: &dyn LoadedActionTrait,
- event: Event,
+ event: EventBox,
) -> Result<()> {
use triggers::set::LoadedExecutable::*;
let authority = action.authority();
@@ -1751,7 +1773,7 @@ mod tests {
/// Used to inject faulty payload for testing
fn payload_mut(block: &mut CommittedBlock) -> &mut BlockPayload {
- let SignedBlock::V1(signed) = &mut block.0 .0;
+ let SignedBlock::V1(signed) = block.as_mut();
&mut signed.payload
}
@@ -1760,7 +1782,10 @@ mod tests {
const BLOCK_CNT: usize = 10;
let topology = Topology::new(UniqueVec::new());
- let block = ValidBlock::new_dummy().commit(&topology).unwrap();
+ let block = ValidBlock::new_dummy()
+ .commit(&topology)
+ .unpack(|_| {})
+ .unwrap();
let kura = Kura::blank_kura_for_testing();
let query_handle = LiveQueryStore::test().start();
let state = State::new(World::default(), kura, query_handle);
@@ -1771,7 +1796,7 @@ mod tests {
let mut block = block.clone();
payload_mut(&mut block).header.height = i as u64;
- payload_mut(&mut block).header.previous_block_hash = block_hashes.last().copied();
+ payload_mut(&mut block).header.prev_block_hash = block_hashes.last().copied();
block_hashes.push(block.as_ref().hash());
state_block.apply(&block).unwrap();
@@ -1788,7 +1813,10 @@ mod tests {
const BLOCK_CNT: usize = 10;
let topology = Topology::new(UniqueVec::new());
- let block = ValidBlock::new_dummy().commit(&topology).unwrap();
+ let block = ValidBlock::new_dummy()
+ .commit(&topology)
+ .unpack(|_| {})
+ .unwrap();
let kura = Kura::blank_kura_for_testing();
let query_handle = LiveQueryStore::test().start();
let state = State::new(World::default(), kura.clone(), query_handle);
@@ -1806,7 +1834,7 @@ mod tests {
&state_block
.all_blocks()
.skip(7)
- .map(|block| *block.header().height())
+ .map(|block| block.header().height())
.collect::>(),
&[8, 9, 10]
);
diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs
index 13bb94bb01a..0af6b5cb5cc 100644
--- a/core/src/sumeragi/main_loop.rs
+++ b/core/src/sumeragi/main_loop.rs
@@ -2,10 +2,7 @@
use std::sync::mpsc;
use iroha_crypto::HashOf;
-use iroha_data_model::{
- block::*, events::pipeline::PipelineEvent, peer::PeerId,
- transaction::error::TransactionRejectionReason,
-};
+use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId};
use iroha_p2p::UpdateTopology;
use tracing::{span, Level};
@@ -82,17 +79,19 @@ impl Sumeragi {
#[allow(clippy::needless_pass_by_value, single_use_lifetimes)] // TODO: uncomment when anonymous lifetimes are stable
fn broadcast_packet_to<'peer_id>(
&self,
- msg: BlockMessage,
+ msg: impl Into,
ids: impl IntoIterator
- + Send,
) {
+ let msg = msg.into();
+
for peer_id in ids {
self.post_packet_to(msg.clone(), peer_id);
}
}
- fn broadcast_packet(&self, msg: BlockMessage) {
+ fn broadcast_packet(&self, msg: impl Into) {
let broadcast = iroha_p2p::Broadcast {
- data: NetworkMessage::SumeragiBlock(Box::new(msg)),
+ data: NetworkMessage::SumeragiBlock(Box::new(msg.into())),
};
self.network.broadcast(broadcast);
}
@@ -116,17 +115,8 @@ impl Sumeragi {
self.block_time + self.commit_time
}
- fn send_events(&self, events: impl IntoIterator
- >) {
- let addr = &self.peer_id.address;
-
- if self.events_sender.receiver_count() > 0 {
- for event in events {
- self.events_sender
- .send(event.into())
- .map_err(|err| warn!(%addr, ?err, "Event not sent"))
- .unwrap_or(0);
- }
- }
+ fn send_event(&self, event: impl Into) {
+ let _ = self.events_sender.send(event.into());
}
fn receive_network_packet(
@@ -239,13 +229,15 @@ impl Sumeragi {
&self.chain_id,
&mut state_block,
)
+ .unpack(|e| self.send_event(e))
.and_then(|block| {
block
.commit(&self.current_topology)
+ .unpack(|e| self.send_event(e))
.map_err(|(block, error)| (block.into(), error))
}) {
Ok(block) => block,
- Err((_, error)) => {
+ Err(error) => {
error!(?error, "Received invalid genesis block");
continue;
}
@@ -280,12 +272,14 @@ impl Sumeragi {
let mut state_block = state.block();
let genesis = BlockBuilder::new(transactions, self.current_topology.clone(), vec![])
.chain(0, &mut state_block)
- .sign(&self.key_pair);
+ .sign(&self.key_pair)
+ .unpack(|e| self.send_event(e));
- let genesis_msg = BlockCreated::from(genesis.clone()).into();
+ let genesis_msg = BlockCreated::from(genesis.clone());
let genesis = genesis
.commit(&self.current_topology)
+ .unpack(|e| self.send_event(e))
.expect("Genesis invalid");
assert!(
@@ -319,24 +313,17 @@ impl Sumeragi {
info!(
addr=%self.peer_id.address,
role=%self.current_topology.role(&self.peer_id),
- block_height=%state_block.height(),
- block_hash=%block.as_ref().hash(),
+ block=%block.as_ref().header().height,
"{}", Strategy::LOG_MESSAGE,
);
- state_block
- .apply_without_execution(&block)
- .expect("Failed to apply block on state. Bailing.");
-
- let state_events = core::mem::take(&mut state_block.world.events_buffer);
- self.send_events(state_events);
+ let state_events = state_block.apply_without_execution(&block);
let new_topology = Topology::recreate_topology(
block.as_ref(),
0,
state_block.world.peers().cloned().collect(),
);
- let events = block.produce_events();
// https://github.com/hyperledger/iroha/issues/3396
// Kura should store the block only upon successful application to the internal state to avoid storing a corrupted block.
@@ -346,6 +333,7 @@ impl Sumeragi {
// Parameters are updated before updating public copy of sumeragi
self.update_params(&state_block);
self.cache_transaction(&state_block);
+
self.current_topology = new_topology;
self.connect_peers(&self.current_topology);
@@ -353,7 +341,7 @@ impl Sumeragi {
state_block.commit();
// NOTE: This sends "Block committed" event,
// so it should be done AFTER public facing state update
- self.send_events(events);
+ state_events.into_iter().for_each(|e| self.send_event(e));
}
fn update_params(&mut self, state_block: &StateBlock<'_>) {
@@ -385,22 +373,23 @@ impl Sumeragi {
topology: &Topology,
BlockCreated { block }: BlockCreated,
) -> Option> {
- let block_hash = block.hash_of_payload();
+ let block_height = block.header().height;
let addr = &self.peer_id.address;
let role = self.current_topology.role(&self.peer_id);
- trace!(%addr, %role, block_hash=%block_hash, "Block received, voting...");
+ trace!(%addr, %role, block=%block_height, "Block received, voting...");
let mut state_block = state.block();
- let block = match ValidBlock::validate(block, topology, &self.chain_id, &mut state_block) {
+ let block = match ValidBlock::validate(block, topology, &self.chain_id, &mut state_block)
+ .unpack(|e| self.send_event(e))
+ {
Ok(block) => block,
- Err((_, error)) => {
+ Err(error) => {
warn!(%addr, %role, ?error, "Block validation failed");
return None;
}
};
let signed_block = block.sign(&self.key_pair);
-
Some(VotingBlock::new(signed_block, state_block))
}
@@ -433,31 +422,31 @@ impl Sumeragi {
#[allow(clippy::suspicious_operation_groupings)]
match (message, role) {
(BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }), _) => {
- let block_hash = block.hash();
- info!(%addr, %role, hash=%block_hash, "Block sync update received");
+ let block_height = block.header().height;
+ info!(%addr, %role, hash=%block_height, "Block sync update received");
// Release writer before handling block sync
let _ = voting_block.take();
- match handle_block_sync(&self.chain_id, block, state) {
+ match handle_block_sync(&self.chain_id, block, state, &|e| self.send_event(e)) {
Ok(BlockSyncOk::CommitBlock(block, state_block)) => {
- self.commit_block(block, state_block)
+ self.commit_block(block, state_block);
}
Ok(BlockSyncOk::ReplaceTopBlock(block, state_block)) => {
warn!(
%addr, %role,
peer_latest_block_hash=?state_block.latest_block_hash(),
peer_latest_block_view_change_index=?state_block.latest_block_view_change_index(),
- consensus_latest_block_hash=%block.as_ref().hash(),
+ consensus_latest_block=%block.as_ref().header().height,
consensus_latest_block_view_change_index=%block.as_ref().header().view_change_index,
"Soft fork occurred: peer in inconsistent state. Rolling back and replacing top block."
);
self.replace_top_block(block, state_block)
}
Err((_, BlockSyncError::BlockNotValid(error))) => {
- error!(%addr, %role, %block_hash, ?error, "Block not valid.")
+ error!(%addr, %role, block=%block_height, ?error, "Block not valid.")
}
Err((_, BlockSyncError::SoftForkBlockNotValid(error))) => {
- error!(%addr, %role, %block_hash, ?error, "Soft-fork block not valid.")
+ error!(%addr, %role, %block_height, ?error, "Soft-fork block not valid.")
}
Err((
_,
@@ -470,7 +459,7 @@ impl Sumeragi {
%addr, %role,
peer_latest_block_hash=?state.view().latest_block_hash(),
peer_latest_block_view_change_index=?peer_view_change_index,
- consensus_latest_block_hash=%block_hash,
+ consensus_latest_block=%block_height,
consensus_latest_block_view_change_index=%block_view_change_index,
"Soft fork doesn't occurred: block has the same or smaller view change index"
);
@@ -482,7 +471,7 @@ impl Sumeragi {
block_height,
},
)) => {
- warn!(%addr, %role, %block_hash, %block_height, %peer_height, "Other peer send irrelevant or outdated block to the peer (it's neither `peer_height` nor `peer_height + 1`).")
+ warn!(%addr, %role, block=%block_height, %peer_height, "Other peer send irrelevant or outdated block to the peer (it's neither `peer_height` nor `peer_height + 1`).")
}
}
}
@@ -496,28 +485,30 @@ impl Sumeragi {
{
error!(%addr, %role, "Received BlockCommitted message, but shouldn't");
} else if let Some(voted_block) = voting_block.take() {
- let voting_block_hash = voted_block.block.as_ref().hash_of_payload();
-
- if hash == voting_block_hash {
- match voted_block
- .block
- .commit_with_signatures(current_topology, signatures)
- {
- Ok(committed_block) => {
- self.commit_block(committed_block, voted_block.state_block)
- }
- Err((_, error)) => {
- error!(%addr, %role, %hash, ?error, "Block failed to be committed")
- }
- };
- } else {
- error!(
- %addr, %role, committed_block_hash=%hash, %voting_block_hash,
- "The hash of the committed block does not match the hash of the block stored by the peer."
- );
-
- *voting_block = Some(voted_block);
- };
+ match voted_block
+ .block
+ .commit_with_signatures(current_topology, signatures, hash)
+ .unpack(|e| self.send_event(e))
+ {
+ Ok(committed_block) => {
+ self.commit_block(committed_block, voted_block.state_block)
+ }
+ Err((
+ valid_block,
+ BlockValidationError::IncorrectHash { expected, actual },
+ )) => {
+ error!(%addr, %role, %expected, %actual, "The hash of the committed block does not match the hash of the block stored by the peer.");
+
+ *voting_block = Some(VotingBlock {
+ voted_at: voted_block.voted_at,
+ block: valid_block,
+ state_block: voted_block.state_block,
+ });
+ }
+ Err((_, error)) => {
+ error!(%addr, %role, %hash, ?error, "Block failed to be committed")
+ }
+ }
} else {
error!(%addr, %role, %hash, "Peer missing voting block")
}
@@ -531,33 +522,32 @@ impl Sumeragi {
let _ = voting_block.take();
if let Some(v_block) = self.vote_for_block(state, ¤t_topology, block_created)
{
- let block_hash = v_block.block.as_ref().hash_of_payload();
-
- let msg = BlockSigned::from(v_block.block.clone()).into();
+ let block_height = v_block.block.as_ref().header().height;
+ let msg = BlockSigned::from(&v_block.block);
self.broadcast_packet_to(msg, [current_topology.proxy_tail()]);
- info!(%addr, %block_hash, "Block validated, signed and forwarded");
+ info!(%addr, block=%block_height, "Block validated, signed and forwarded");
*voting_block = Some(v_block);
}
}
(BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => {
let current_topology = current_topology.is_consensus_required().expect(
- "Peer has `ObservingPeer` role, which mean that current topology require consensus",
- );
+ "Peer has `ObservingPeer` role, which mean that current topology require consensus"
+ );
// Release block writer before creating new one
let _ = voting_block.take();
if let Some(v_block) = self.vote_for_block(state, ¤t_topology, block_created)
{
if current_view_change_index >= 1 {
- let block_hash = v_block.block.as_ref().hash();
+ let block_height = v_block.block.as_ref().header().height;
self.broadcast_packet_to(
- BlockSigned::from(v_block.block.clone()).into(),
+ BlockSigned::from(&v_block.block),
[current_topology.proxy_tail()],
);
- info!(%addr, %block_hash, "Block validated, signed and forwarded");
+ info!(%addr, block=%block_height, "Block validated, signed and forwarded");
*voting_block = Some(v_block);
} else {
error!(%addr, %role, "Received BlockCreated message, but shouldn't");
@@ -641,33 +631,35 @@ impl Sumeragi {
event_recommendations,
)
.chain(current_view_change_index, &mut state_block)
- .sign(&self.key_pair);
+ .sign(&self.key_pair)
+ .unpack(|e| self.send_event(e));
let created_in = create_block_start_time.elapsed();
if let Some(current_topology) = current_topology.is_consensus_required() {
- info!(%addr, created_in_ms=%created_in.as_millis(), block_payload_hash=%new_block.as_ref().hash_of_payload(), "Block created");
+ info!(%addr, created_in_ms=%created_in.as_millis(), block=%new_block.as_ref().header().height, "Block created");
if created_in > self.pipeline_time() / 2 {
warn!("Creating block takes too much time. This might prevent consensus from operating. Consider increasing `commit_time` or decreasing `max_transactions_in_block`");
}
*voting_block = Some(VotingBlock::new(new_block.clone(), state_block));
- let msg = BlockCreated::from(new_block).into();
+ let msg = BlockCreated::from(new_block);
if current_view_change_index >= 1 {
self.broadcast_packet(msg);
} else {
self.broadcast_packet_to(msg, current_topology.voting_peers());
}
} else {
- match new_block.commit(current_topology) {
+ match new_block
+ .commit(current_topology)
+ .unpack(|e| self.send_event(e))
+ {
Ok(committed_block) => {
- self.broadcast_packet(
- BlockCommitted::from(committed_block.clone()).into(),
- );
+ self.broadcast_packet(BlockCommitted::from(&committed_block));
self.commit_block(committed_block, state_block);
}
- Err((_, error)) => error!(%addr, role=%Role::Leader, ?error),
- }
+ Err(error) => error!(%addr, role=%Role::Leader, ?error),
+ };
}
}
}
@@ -677,12 +669,15 @@ impl Sumeragi {
let voted_at = voted_block.voted_at;
let state_block = voted_block.state_block;
- match voted_block.block.commit(current_topology) {
+ match voted_block
+ .block
+ .commit(current_topology)
+ .unpack(|e| self.send_event(e))
+ {
Ok(committed_block) => {
- info!(voting_block_hash = %committed_block.as_ref().hash(), "Block reached required number of votes");
-
- let msg = BlockCommitted::from(committed_block.clone()).into();
+ info!(block=%committed_block.as_ref().header().height, "Block reached required number of votes");
+ let msg = BlockCommitted::from(&committed_block);
let current_topology = current_topology
.is_consensus_required()
.expect("Peer has `ProxyTail` role, which mean that current topology require consensus");
@@ -863,14 +858,11 @@ pub(crate) fn run(
expired
});
- let mut expired_transactions = Vec::new();
sumeragi.queue.get_transactions_for_block(
&state_view,
sumeragi.max_txs_in_block,
&mut sumeragi.transaction_cache,
- &mut expired_transactions,
);
- sumeragi.send_events(expired_transactions.iter().map(expired_event));
let current_view_change_index = sumeragi
.prune_view_change_proofs_and_calculate_current_index(
@@ -928,7 +920,7 @@ pub(crate) fn run(
if node_expects_block {
if let Some(VotingBlock { block, .. }) = voting_block.as_ref() {
// NOTE: Suspecting the tail node because it hasn't yet committed a block produced by leader
- warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, block=%block.as_ref().hash_of_payload(), "Block not committed in due time, requesting view change...");
+ warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, block=%block.as_ref().header().height, "Block not committed in due time, requesting view change...");
} else {
// NOTE: Suspecting the leader node because it hasn't produced a block
// If the current node has a transaction, the leader should have as well
@@ -1001,18 +993,6 @@ fn add_signatures(
}
}
-/// Create expired pipeline event for the given transaction.
-fn expired_event(txn: &AcceptedTransaction) -> Event {
- PipelineEvent {
- entity_kind: PipelineEntityKind::Transaction,
- status: PipelineStatus::Rejected(PipelineRejectionReason::Transaction(
- TransactionRejectionReason::Expired,
- )),
- hash: txn.as_ref().hash().into(),
- }
- .into()
-}
-
/// Type enumerating early return types to reduce cyclomatic
/// complexity of the main loop items and allow direct short
/// circuiting with the `?` operator. Candidate for `impl
@@ -1092,10 +1072,11 @@ enum BlockSyncError {
},
}
-fn handle_block_sync<'state>(
+fn handle_block_sync<'state, F: Fn(PipelineEventBox)>(
chain_id: &ChainId,
block: SignedBlock,
state: &'state State,
+ handle_events: &F,
) -> Result, (SignedBlock, BlockSyncError)> {
let block_height = block.header().height;
let state_height = state.view().height();
@@ -1111,9 +1092,11 @@ fn handle_block_sync<'state>(
Topology::recreate_topology(&last_committed_block, view_change_index, new_peers)
};
ValidBlock::validate(block, &topology, chain_id, &mut state_block)
+ .unpack(handle_events)
.and_then(|block| {
block
.commit(&topology)
+ .unpack(handle_events)
.map_err(|(block, err)| (block.into(), err))
})
.map(|block| BlockSyncOk::CommitBlock(block, state_block))
@@ -1144,9 +1127,11 @@ fn handle_block_sync<'state>(
Topology::recreate_topology(&last_committed_block, view_change_index, new_peers)
};
ValidBlock::validate(block, &topology, chain_id, &mut state_block)
+ .unpack(handle_events)
.and_then(|block| {
block
.commit(&topology)
+ .unpack(handle_events)
.map_err(|(block, err)| (block.into(), err))
})
.map_err(|(block, error)| (block, BlockSyncError::SoftForkBlockNotValid(error)))
@@ -1214,9 +1199,13 @@ mod tests {
// Creating a block of two identical transactions and validating it
let block = BlockBuilder::new(vec![tx.clone(), tx], topology.clone(), Vec::new())
.chain(0, &mut state_block)
- .sign(leader_key_pair);
+ .sign(leader_key_pair)
+ .unpack(|_| {});
- let genesis = block.commit(topology).expect("Block is valid");
+ let genesis = block
+ .commit(topology)
+ .unpack(|_| {})
+ .expect("Block is valid");
state_block.apply(&genesis).expect("Failed to apply block");
state_block.commit();
kura.store_block(genesis);
@@ -1256,6 +1245,7 @@ mod tests {
BlockBuilder::new(vec![tx1, tx2], topology.clone(), Vec::new())
.chain(0, &mut state_block)
.sign(leader_key_pair)
+ .unpack(|_| {})
};
(state, kura, block.into())
@@ -1276,7 +1266,7 @@ mod tests {
// Malform block to make it invalid
payload_mut(&mut block).commit_topology.clear();
- let result = handle_block_sync(&chain_id, block, &state);
+ let result = handle_block_sync(&chain_id, block, &state, &|_| {});
assert!(matches!(result, Err((_, BlockSyncError::BlockNotValid(_)))))
}
@@ -1292,12 +1282,14 @@ mod tests {
let (state, kura, mut block) = create_data_for_test(&chain_id, &topology, &leader_key_pair);
let mut state_block = state.block();
- let validated_block =
- ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block).unwrap();
- let committed_block = validated_block.commit(&topology).expect("Block is valid");
- state_block
- .apply_without_execution(&committed_block)
- .expect("Failed to apply block");
+ let committed_block =
+ ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block)
+ .unpack(|_| {})
+ .unwrap()
+ .commit(&topology)
+ .unpack(|_| {})
+ .expect("Block is valid");
+ let _wsv_events = state_block.apply_without_execution(&committed_block);
state_block.commit();
kura.store_block(committed_block);
@@ -1305,7 +1297,7 @@ mod tests {
payload_mut(&mut block).commit_topology.clear();
payload_mut(&mut block).header.view_change_index = 1;
- let result = handle_block_sync(&chain_id, block, &state);
+ let result = handle_block_sync(&chain_id, block, &state, &|_| {});
assert!(matches!(
result,
Err((_, BlockSyncError::SoftForkBlockNotValid(_)))
@@ -1324,7 +1316,7 @@ mod tests {
// Change block height
payload_mut(&mut block).header.height = 42;
- let result = handle_block_sync(&chain_id, block, &state);
+ let result = handle_block_sync(&chain_id, block, &state, &|_| {});
assert!(matches!(
result,
Err((
@@ -1348,7 +1340,7 @@ mod tests {
leader_key_pair.public_key().clone(),
)]);
let (state, _, block) = create_data_for_test(&chain_id, &topology, &leader_key_pair);
- let result = handle_block_sync(&chain_id, block, &state);
+ let result = handle_block_sync(&chain_id, block, &state, &|_| {});
assert!(matches!(result, Ok(BlockSyncOk::CommitBlock(_, _))))
}
@@ -1364,12 +1356,14 @@ mod tests {
let (state, kura, mut block) = create_data_for_test(&chain_id, &topology, &leader_key_pair);
let mut state_block = state.block();
- let validated_block =
- ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block).unwrap();
- let committed_block = validated_block.commit(&topology).expect("Block is valid");
- state_block
- .apply_without_execution(&committed_block)
- .expect("Failed to apply block");
+ let committed_block =
+ ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block)
+ .unpack(|_| {})
+ .unwrap()
+ .commit(&topology)
+ .unpack(|_| {})
+ .expect("Block is valid");
+ let _wsv_events = state_block.apply_without_execution(&committed_block);
state_block.commit();
kura.store_block(committed_block);
@@ -1378,7 +1372,7 @@ mod tests {
// Increase block view change index
payload_mut(&mut block).header.view_change_index = 42;
- let result = handle_block_sync(&chain_id, block, &state);
+ let result = handle_block_sync(&chain_id, block, &state, &|_| {});
assert!(matches!(result, Ok(BlockSyncOk::ReplaceTopBlock(_, _))))
}
@@ -1397,12 +1391,14 @@ mod tests {
payload_mut(&mut block).header.view_change_index = 42;
let mut state_block = state.block();
- let validated_block =
- ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block).unwrap();
- let committed_block = validated_block.commit(&topology).expect("Block is valid");
- state_block
- .apply_without_execution(&committed_block)
- .expect("Failed to apply block");
+ let committed_block =
+ ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block)
+ .unpack(|_| {})
+ .unwrap()
+ .commit(&topology)
+ .unpack(|_| {})
+ .expect("Block is valid");
+ let _wsv_events = state_block.apply_without_execution(&committed_block);
state_block.commit();
kura.store_block(committed_block);
assert_eq!(state.view().latest_block_view_change_index(), 42);
@@ -1410,7 +1406,7 @@ mod tests {
// Decrease block view change index back
payload_mut(&mut block).header.view_change_index = 0;
- let result = handle_block_sync(&chain_id, block, &state);
+ let result = handle_block_sync(&chain_id, block, &state, &|_| {});
assert!(matches!(
result,
Err((
@@ -1437,7 +1433,7 @@ mod tests {
payload_mut(&mut block).header.view_change_index = 42;
payload_mut(&mut block).header.height = 1;
- let result = handle_block_sync(&chain_id, block, &state);
+ let result = handle_block_sync(&chain_id, block, &state, &|_| {});
assert!(matches!(
result,
Err((
diff --git a/core/src/sumeragi/message.rs b/core/src/sumeragi/message.rs
index b0a80207072..c5d4fa27fa7 100644
--- a/core/src/sumeragi/message.rs
+++ b/core/src/sumeragi/message.rs
@@ -62,14 +62,14 @@ pub struct BlockSigned {
pub signatures: SignaturesOf,
}
-impl From for BlockSigned {
- fn from(block: ValidBlock) -> Self {
+impl From<&ValidBlock> for BlockSigned {
+ fn from(block: &ValidBlock) -> Self {
let block_hash = block.as_ref().hash_of_payload();
- let SignedBlock::V1(block) = block.into();
+ let block_signatures = block.as_ref().signatures().clone();
Self {
hash: block_hash,
- signatures: block.signatures,
+ signatures: block_signatures,
}
}
}
@@ -79,14 +79,14 @@ impl From for BlockSigned {
#[non_exhaustive]
pub struct BlockCommitted {
/// Hash of the block being signed.
- pub hash: HashOf,
+ pub hash: HashOf,
/// Set of signatures.
pub signatures: SignaturesOf,
}
-impl From for BlockCommitted {
- fn from(block: CommittedBlock) -> Self {
- let block_hash = block.as_ref().hash_of_payload();
+impl From<&CommittedBlock> for BlockCommitted {
+ fn from(block: &CommittedBlock) -> Self {
+ let block_hash = block.as_ref().hash();
let block_signatures = block.as_ref().signatures().clone();
Self {
diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs
index 1e10895b992..f59a7ee6259 100644
--- a/core/src/sumeragi/mod.rs
+++ b/core/src/sumeragi/mod.rs
@@ -4,7 +4,7 @@
use std::{
fmt::{self, Debug, Formatter},
sync::{mpsc, Arc},
- time::{Duration, Instant},
+ time::{Duration, Instant, SystemTime},
};
use eyre::{Result, WrapErr as _};
@@ -129,9 +129,13 @@ impl SumeragiHandle {
#[allow(clippy::cast_possible_truncation)]
if let Some(timestamp) = state_view.genesis_timestamp() {
+ let curr_time = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Failed to get the current system time");
+
// this will overflow in 584942417years.
self.metrics.uptime_since_genesis_ms.set(
- (current_time() - timestamp)
+ (curr_time - timestamp)
.as_millis()
.try_into()
.expect("Timestamp should fit into u64"),
@@ -193,24 +197,33 @@ impl SumeragiHandle {
chain_id: &ChainId,
block: &SignedBlock,
state_block: &mut StateBlock<'_>,
+ events_sender: &EventsSender,
mut current_topology: Topology,
) -> Topology {
// NOTE: topology need to be updated up to block's view_change_index
current_topology.rotate_all_n(block.header().view_change_index);
let block = ValidBlock::validate(block.clone(), ¤t_topology, chain_id, state_block)
- .expect("Kura blocks should be valid")
+ .unpack(|e| {
+ let _ = events_sender.send(e.into());
+ })
+ .expect("Kura: Invalid block")
.commit(¤t_topology)
- .expect("Kura blocks should be valid");
+ .unpack(|e| {
+ let _ = events_sender.send(e.into());
+ })
+ .expect("Kura: Invalid block");
if block.as_ref().header().is_genesis() {
*state_block.world.trusted_peers_ids = block.as_ref().commit_topology().clone();
}
- state_block.apply_without_execution(&block).expect(
- "Block application in init should not fail. \
- Blocks loaded from kura assumed to be valid",
- );
+ state_block
+ .apply_without_execution(&block)
+ .into_iter()
+ .for_each(|e| {
+ let _ = events_sender.send(e);
+ });
Topology::recreate_topology(
block.as_ref(),
@@ -278,6 +291,7 @@ impl SumeragiHandle {
&common_config.chain_id,
&block,
&mut state_block,
+ &events_sender,
current_topology,
);
state_block.commit();
@@ -356,16 +370,21 @@ pub const PEERS_CONNECT_INTERVAL: Duration = Duration::from_secs(1);
pub const TELEMETRY_INTERVAL: Duration = Duration::from_secs(5);
/// Structure represents a block that is currently in discussion.
-#[non_exhaustive]
pub struct VotingBlock<'state> {
+ /// Valid Block
+ block: ValidBlock,
/// At what time has this peer voted for this block
pub voted_at: Instant,
- /// Valid Block
- pub block: ValidBlock,
/// [`WorldState`] after applying transactions to it but before it was committed
pub state_block: StateBlock<'state>,
}
+impl AsRef for VotingBlock<'_> {
+ fn as_ref(&self) -> &ValidBlock {
+ &self.block
+ }
+}
+
impl VotingBlock<'_> {
/// Construct new `VotingBlock` with current time.
pub fn new(block: ValidBlock, state_block: StateBlock<'_>) -> VotingBlock {
@@ -382,8 +401,8 @@ impl VotingBlock<'_> {
voted_at: Instant,
) -> VotingBlock {
VotingBlock {
- voted_at,
block,
+ voted_at,
state_block,
}
}
diff --git a/core/src/tx.rs b/core/src/tx.rs
index 26e0858a4e5..1ae6b4b0b2b 100644
--- a/core/src/tx.rs
+++ b/core/src/tx.rs
@@ -14,7 +14,7 @@ pub use iroha_data_model::prelude::*;
use iroha_data_model::{
isi::error::Mismatch,
query::error::FindError,
- transaction::{error::TransactionLimitError, TransactionLimits},
+ transaction::{error::TransactionLimitError, TransactionLimits, TransactionPayload},
};
use iroha_genesis::GenesisTransaction;
use iroha_logger::{debug, error};
diff --git a/core/test_network/Cargo.toml b/core/test_network/Cargo.toml
index 22cbae6888a..183eb00e4d5 100644
--- a/core/test_network/Cargo.toml
+++ b/core/test_network/Cargo.toml
@@ -8,7 +8,7 @@ authors.workspace = true
license.workspace = true
[dependencies]
-iroha = { workspace = true, features = ["test-network"] }
+iroha = { workspace = true, features = ["test_network"] }
iroha_crypto = { workspace = true }
iroha_client = { workspace = true }
iroha_core = { workspace = true }
diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs
index 012f475eda0..df2843afa06 100644
--- a/core/test_network/src/lib.rs
+++ b/core/test_network/src/lib.rs
@@ -14,7 +14,7 @@ use iroha_client::{
};
use iroha_config::parameters::actual::Root as Config;
pub use iroha_core::state::StateReadOnly;
-use iroha_crypto::prelude::*;
+use iroha_crypto::KeyPair;
use iroha_data_model::{query::QueryOutputBox, ChainId};
use iroha_genesis::{GenesisNetwork, RawGenesisBlockFile};
use iroha_logger::InstrumentFutures;
@@ -54,11 +54,11 @@ pub fn get_chain_id() -> ChainId {
/// Get a standardised key-pair from the hard-coded literals.
pub fn get_key_pair() -> KeyPair {
KeyPair::new(
- PublicKey::from_str(
+ iroha_crypto::PublicKey::from_str(
"ed01207233BFC89DCBD68C19FDE6CE6158225298EC1131B6A130D1AEB454C1AB5183C0",
).unwrap(),
- PrivateKey::from_hex(
- Algorithm::Ed25519,
+ iroha_crypto::PrivateKey::from_hex(
+ iroha_crypto::Algorithm::Ed25519,
"9AC47ABF59B356E0BD7DCBBBB4DEC080E302156A48CA907E47CB6AEA1D32719E7233BFC89DCBD68C19FDE6CE6158225298EC1131B6A130D1AEB454C1AB5183C0"
).unwrap()
).unwrap()
@@ -689,7 +689,7 @@ pub trait TestClient: Sized {
fn test_with_account(api_url: &SocketAddr, keys: KeyPair, account_id: &AccountId) -> Self;
/// Loop for events with filter and handler function
- fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result));
+ fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result));
/// Submit instruction with polling
///
@@ -828,9 +828,9 @@ impl TestClient for Client {
Client::new(config)
}
- fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)) {
+ fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)) {
for event_result in self
- .listen_for_events(event_filter)
+ .listen_for_events([event_filter])
.expect("Failed to create event iterator.")
{
f(event_result)
diff --git a/crypto/src/lib.rs b/crypto/src/lib.rs
index f1662780479..aafd7868459 100755
--- a/crypto/src/lib.rs
+++ b/crypto/src/lib.rs
@@ -27,8 +27,6 @@ use alloc::{
};
use core::{borrow::Borrow, fmt, str::FromStr};
-#[cfg(feature = "base64")]
-pub use base64;
#[cfg(not(feature = "ffi_import"))]
pub use blake2;
use derive_more::Display;
@@ -857,11 +855,6 @@ mod ffi {
pub(crate) use ffi_item;
}
-/// The prelude re-exports most commonly used items from this crate.
-pub mod prelude {
- pub use super::{Algorithm, Hash, KeyPair, PrivateKey, PublicKey, Signature};
-}
-
#[cfg(test)]
mod tests {
use parity_scale_codec::{Decode, Encode};
diff --git a/data_model/derive/src/enum_ref.rs b/data_model/derive/src/enum_ref.rs
index 8215be75795..eefb58fab78 100644
--- a/data_model/derive/src/enum_ref.rs
+++ b/data_model/derive/src/enum_ref.rs
@@ -151,7 +151,7 @@ impl ToTokens for EnumRef {
quote! {
#attrs
- pub(crate) enum #ident<'a> #impl_generics #where_clause {
+ pub(super) enum #ident<'a> #impl_generics #where_clause {
#(#variants),*
}
}
diff --git a/data_model/derive/src/model.rs b/data_model/derive/src/model.rs
index a5fdb7a7510..0547fc99ab7 100644
--- a/data_model/derive/src/model.rs
+++ b/data_model/derive/src/model.rs
@@ -7,7 +7,6 @@ use syn::{parse_quote, Attribute};
pub fn impl_model(emitter: &mut Emitter, input: &syn::ItemMod) -> TokenStream {
let syn::ItemMod {
attrs,
- vis,
mod_token,
ident,
content,
@@ -15,14 +14,6 @@ pub fn impl_model(emitter: &mut Emitter, input: &syn::ItemMod) -> TokenStream {
..
} = input;
- let syn::Visibility::Public(vis_public) = vis else {
- emit!(
- emitter,
- input,
- "The `model` attribute can only be used on public modules"
- );
- return quote!();
- };
if ident != "model" {
emit!(
emitter,
@@ -38,7 +29,7 @@ pub fn impl_model(emitter: &mut Emitter, input: &syn::ItemMod) -> TokenStream {
quote! {
#(#attrs)*
#[allow(missing_docs)]
- #vis_public #mod_token #ident {
+ #mod_token #ident {
#(#items_code)*
}#semi
}
diff --git a/data_model/src/account.rs b/data_model/src/account.rs
index 2383bdc21ac..ce3f9582770 100644
--- a/data_model/src/account.rs
+++ b/data_model/src/account.rs
@@ -431,6 +431,8 @@ pub mod prelude {
#[cfg(test)]
mod tests {
+ #[cfg(not(feature = "std"))]
+ use alloc::{vec, vec::Vec};
use core::cmp::Ordering;
use iroha_crypto::{KeyPair, PublicKey};
diff --git a/data_model/src/block.rs b/data_model/src/block.rs
index 93ce5bec045..e9d3c102074 100644
--- a/data_model/src/block.rs
+++ b/data_model/src/block.rs
@@ -9,7 +9,6 @@ use alloc::{boxed::Box, format, string::String, vec::Vec};
use core::{fmt::Display, time::Duration};
use derive_more::Display;
-use getset::Getters;
#[cfg(all(feature = "std", feature = "transparent_api"))]
use iroha_crypto::KeyPair;
use iroha_crypto::{HashOf, MerkleTree, SignaturesOf};
@@ -26,6 +25,8 @@ use crate::{events::prelude::*, peer, transaction::prelude::*};
#[model]
pub mod model {
+ use getset::{CopyGetters, Getters};
+
use super::*;
#[derive(
@@ -37,6 +38,7 @@ pub mod model {
PartialOrd,
Ord,
Getters,
+ CopyGetters,
Decode,
Encode,
Deserialize,
@@ -48,24 +50,24 @@ pub mod model {
display(fmt = "Block â„–{height} (hash: {});", "HashOf::new(&self)")
)]
#[cfg_attr(not(feature = "std"), display(fmt = "Block â„–{height}"))]
- #[getset(get = "pub")]
#[allow(missing_docs)]
#[ffi_type]
pub struct BlockHeader {
/// Number of blocks in the chain including this block.
+ #[getset(get_copy = "pub")]
pub height: u64,
/// Creation timestamp (unix time in milliseconds).
#[getset(skip)]
- pub timestamp_ms: u64,
+ pub creation_time_ms: u64,
/// Hash of the previous block in the chain.
- pub previous_block_hash: Option>,
+ #[getset(get = "pub")]
+ pub prev_block_hash: Option>,
/// Hash of merkle tree root of transactions' hashes.
- pub transactions_hash: Option>>,
+ #[getset(get = "pub")]
+ pub transactions_hash: HashOf>,
/// Value of view change index. Used to resolve soft forks.
- pub view_change_index: u64,
#[getset(skip)]
- /// Estimation of consensus duration (in milliseconds).
- pub consensus_estimation_ms: u64,
+ pub view_change_index: u64,
}
#[derive(
@@ -76,7 +78,6 @@ pub mod model {
Eq,
PartialOrd,
Ord,
- Getters,
Decode,
Encode,
Deserialize,
@@ -84,45 +85,28 @@ pub mod model {
IntoSchema,
)]
#[display(fmt = "({header})")]
- #[getset(get = "pub")]
#[allow(missing_docs)]
- #[ffi_type]
- pub struct BlockPayload {
+ pub(crate) struct BlockPayload {
/// Block header
pub header: BlockHeader,
/// Topology of the network at the time of block commit.
- #[getset(skip)] // FIXME: Because ffi related issues
pub commit_topology: UniqueVec,
/// array of transactions, which successfully passed validation and consensus step.
- #[getset(skip)] // FIXME: Because ffi related issues
pub transactions: Vec,
/// Event recommendations.
- #[getset(skip)] // NOTE: Unused ATM
- pub event_recommendations: Vec,
+ pub event_recommendations: Vec,
}
/// Signed block
#[version_with_scale(version = 1, versioned_alias = "SignedBlock")]
#[derive(
- Debug,
- Display,
- Clone,
- PartialEq,
- Eq,
- PartialOrd,
- Ord,
- Getters,
- Encode,
- Serialize,
- IntoSchema,
+ Debug, Display, Clone, PartialEq, Eq, PartialOrd, Ord, Encode, Serialize, IntoSchema,
)]
#[cfg_attr(not(feature = "std"), display(fmt = "Signed block"))]
#[cfg_attr(feature = "std", display(fmt = "{}", "self.hash()"))]
- #[getset(get = "pub")]
#[ffi_type]
pub struct SignedBlockV1 {
/// Signatures of peers which approved this block.
- #[getset(skip)]
pub signatures: SignaturesOf,
/// Block payload
pub payload: BlockPayload,
@@ -134,13 +118,6 @@ declare_versioned!(SignedBlock 1..2, Debug, Clone, PartialEq, Eq, PartialOrd, Or
#[cfg(all(not(feature = "ffi_export"), not(feature = "ffi_import")))]
declare_versioned!(SignedBlock 1..2, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, FromVariant, IntoSchema);
-impl BlockPayload {
- /// Calculate block payload [`Hash`](`iroha_crypto::HashOf`).
- pub fn hash(&self) -> iroha_crypto::HashOf {
- iroha_crypto::HashOf::new(self)
- }
-}
-
impl BlockHeader {
/// Checks if it's a header of a genesis block.
#[inline]
@@ -150,13 +127,8 @@ impl BlockHeader {
}
/// Creation timestamp
- pub fn timestamp(&self) -> Duration {
- Duration::from_millis(self.timestamp_ms)
- }
-
- /// Consensus estimation
- pub fn consensus_estimation(&self) -> Duration {
- Duration::from_millis(self.consensus_estimation_ms)
+ pub fn creation_time(&self) -> Duration {
+ Duration::from_millis(self.creation_time_ms)
}
}
@@ -168,21 +140,21 @@ impl SignedBlockV1 {
}
impl SignedBlock {
- /// Block transactions
+ /// Block header
#[inline]
- pub fn transactions(&self) -> impl ExactSizeIterator
- {
+ pub fn header(&self) -> &BlockHeader {
let SignedBlock::V1(block) = self;
- block.payload.transactions.iter()
+ &block.payload.header
}
- /// Block header
+ /// Block transactions
#[inline]
- pub fn header(&self) -> &BlockHeader {
+ pub fn transactions(&self) -> impl ExactSizeIterator
- {
let SignedBlock::V1(block) = self;
- block.payload.header()
+ block.payload.transactions.iter()
}
- /// Block commit topology
+ /// Topology of the network at the time of block commit.
#[inline]
#[cfg(feature = "transparent_api")]
pub fn commit_topology(&self) -> &UniqueVec {
@@ -202,18 +174,19 @@ impl SignedBlock {
pub fn hash(&self) -> HashOf {
iroha_crypto::HashOf::new(self)
}
+}
+#[cfg(feature = "transparent_api")]
+impl SignedBlock {
/// Calculate block payload [`Hash`](`iroha_crypto::HashOf`).
#[inline]
#[cfg(feature = "std")]
- #[cfg(feature = "transparent_api")]
pub fn hash_of_payload(&self) -> iroha_crypto::HashOf {
let SignedBlock::V1(block) = self;
iroha_crypto::HashOf::new(&block.payload)
}
/// Add additional signatures to this block
- #[cfg(feature = "transparent_api")]
#[must_use]
pub fn sign(mut self, key_pair: &KeyPair) -> Self {
let SignedBlock::V1(block) = &mut self;
@@ -227,7 +200,6 @@ impl SignedBlock {
/// # Errors
///
/// If given signature doesn't match block hash
- #[cfg(feature = "transparent_api")]
pub fn add_signature(
&mut self,
signature: iroha_crypto::SignatureOf,
@@ -242,7 +214,6 @@ impl SignedBlock {
}
/// Add additional signatures to this block
- #[cfg(feature = "transparent_api")]
pub fn replace_signatures(
&mut self,
signatures: iroha_crypto::SignaturesOf,
@@ -292,7 +263,7 @@ mod candidate {
}
fn validate_header(&self) -> Result<(), &'static str> {
- let actual_txs_hash = self.payload.header().transactions_hash;
+ let actual_txs_hash = self.payload.header.transactions_hash;
let expected_txs_hash = self
.payload
@@ -300,7 +271,8 @@ mod candidate {
.iter()
.map(|value| value.as_ref().hash())
.collect::>()
- .hash();
+ .hash()
+ .unwrap();
if expected_txs_hash != actual_txs_hash {
return Err("Transactions' hash incorrect. Expected: {expected_txs_hash:?}, actual: {actual_txs_hash:?}");
diff --git a/data_model/src/events/data/filters.rs b/data_model/src/events/data/filters.rs
index 4edc08c828e..92743725aaf 100644
--- a/data_model/src/events/data/filters.rs
+++ b/data_model/src/events/data/filters.rs
@@ -705,7 +705,6 @@ impl EventFilter for DataEventFilter {
(DataEvent::Peer(event), Peer(filter)) => filter.matches(event),
(DataEvent::Trigger(event), Trigger(filter)) => filter.matches(event),
(DataEvent::Role(event), Role(filter)) => filter.matches(event),
- (DataEvent::PermissionToken(_), PermissionTokenSchemaUpdate) => true,
(DataEvent::Configuration(event), Configuration(filter)) => filter.matches(event),
(DataEvent::Executor(event), Executor(filter)) => filter.matches(event),
diff --git a/data_model/src/events/mod.rs b/data_model/src/events/mod.rs
index 94c003526bc..d9e59fd6a8c 100644
--- a/data_model/src/events/mod.rs
+++ b/data_model/src/events/mod.rs
@@ -7,9 +7,11 @@ use iroha_data_model_derive::model;
use iroha_macro::FromVariant;
use iroha_schema::IntoSchema;
use parity_scale_codec::{Decode, Encode};
+use pipeline::{BlockEvent, TransactionEvent};
use serde::{Deserialize, Serialize};
pub use self::model::*;
+use self::pipeline::{BlockEventFilter, TransactionEventFilter};
pub mod data;
pub mod execute_trigger;
@@ -37,9 +39,9 @@ pub mod model {
IntoSchema,
)]
#[ffi_type]
- pub enum Event {
+ pub enum EventBox {
/// Pipeline event.
- Pipeline(pipeline::PipelineEvent),
+ Pipeline(pipeline::PipelineEventBox),
/// Data event.
Data(data::DataEvent),
/// Time event.
@@ -85,7 +87,7 @@ pub mod model {
#[ffi_type(opaque)]
pub enum EventFilterBox {
/// Listen to pipeline events with filter.
- Pipeline(pipeline::PipelineEventFilter),
+ Pipeline(pipeline::PipelineEventFilterBox),
/// Listen to data events with filter.
Data(data::DataEventFilter),
/// Listen to time events with filter.
@@ -116,7 +118,7 @@ pub mod model {
#[ffi_type(opaque)]
pub enum TriggeringEventFilterBox {
/// Listen to pipeline events with filter.
- Pipeline(pipeline::PipelineEventFilter),
+ Pipeline(pipeline::PipelineEventFilterBox),
/// Listen to data events with filter.
Data(data::DataEventFilter),
/// Listen to time events with filter.
@@ -126,6 +128,62 @@ pub mod model {
}
}
+impl From for EventBox {
+ fn from(source: TransactionEvent) -> Self {
+ Self::Pipeline(source.into())
+ }
+}
+
+impl From for EventBox {
+ fn from(source: BlockEvent) -> Self {
+ Self::Pipeline(source.into())
+ }
+}
+
+impl From for EventFilterBox {
+ fn from(source: TransactionEventFilter) -> Self {
+ Self::Pipeline(source.into())
+ }
+}
+
+impl From for EventFilterBox {
+ fn from(source: BlockEventFilter) -> Self {
+ Self::Pipeline(source.into())
+ }
+}
+
+impl TryFrom for TransactionEvent {
+ type Error = iroha_macro::error::ErrorTryFromEnum;
+
+ fn try_from(event: EventBox) -> Result {
+ use iroha_macro::error::ErrorTryFromEnum;
+
+ let EventBox::Pipeline(pipeline_event) = event else {
+ return Err(ErrorTryFromEnum::default());
+ };
+
+ pipeline_event
+ .try_into()
+ .map_err(|_| ErrorTryFromEnum::default())
+ }
+}
+
+impl TryFrom for BlockEvent {
+ type Error = iroha_macro::error::ErrorTryFromEnum;
+
+ fn try_from(event: EventBox) -> Result {
+ use iroha_macro::error::ErrorTryFromEnum;
+
+ let EventBox::Pipeline(pipeline_event) = event else {
+ return Err(ErrorTryFromEnum::default());
+ };
+
+ pipeline_event
+ .try_into()
+ .map_err(|_| ErrorTryFromEnum::default())
+ }
+}
+
/// Trait for filters
#[cfg(feature = "transparent_api")]
pub trait EventFilter {
@@ -156,25 +214,27 @@ pub trait EventFilter {
#[cfg(feature = "transparent_api")]
impl EventFilter for EventFilterBox {
- type Event = Event;
+ type Event = EventBox;
/// Apply filter to event.
- fn matches(&self, event: &Event) -> bool {
+ fn matches(&self, event: &EventBox) -> bool {
match (event, self) {
- (Event::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event),
- (Event::Data(event), Self::Data(filter)) => filter.matches(event),
- (Event::Time(event), Self::Time(filter)) => filter.matches(event),
- (Event::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => filter.matches(event),
- (Event::TriggerCompleted(event), Self::TriggerCompleted(filter)) => {
+ (EventBox::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event),
+ (EventBox::Data(event), Self::Data(filter)) => filter.matches(event),
+ (EventBox::Time(event), Self::Time(filter)) => filter.matches(event),
+ (EventBox::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => {
+ filter.matches(event)
+ }
+ (EventBox::TriggerCompleted(event), Self::TriggerCompleted(filter)) => {
filter.matches(event)
}
// Fail to compile in case when new variant to event or filter is added
(
- Event::Pipeline(_)
- | Event::Data(_)
- | Event::Time(_)
- | Event::ExecuteTrigger(_)
- | Event::TriggerCompleted(_),
+ EventBox::Pipeline(_)
+ | EventBox::Data(_)
+ | EventBox::Time(_)
+ | EventBox::ExecuteTrigger(_)
+ | EventBox::TriggerCompleted(_),
Self::Pipeline(_)
| Self::Data(_)
| Self::Time(_)
@@ -187,22 +247,24 @@ impl EventFilter for EventFilterBox {
#[cfg(feature = "transparent_api")]
impl EventFilter for TriggeringEventFilterBox {
- type Event = Event;
+ type Event = EventBox;
/// Apply filter to event.
- fn matches(&self, event: &Event) -> bool {
+ fn matches(&self, event: &EventBox) -> bool {
match (event, self) {
- (Event::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event),
- (Event::Data(event), Self::Data(filter)) => filter.matches(event),
- (Event::Time(event), Self::Time(filter)) => filter.matches(event),
- (Event::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => filter.matches(event),
+ (EventBox::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event),
+ (EventBox::Data(event), Self::Data(filter)) => filter.matches(event),
+ (EventBox::Time(event), Self::Time(filter)) => filter.matches(event),
+ (EventBox::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => {
+ filter.matches(event)
+ }
// Fail to compile in case when new variant to event or filter is added
(
- Event::Pipeline(_)
- | Event::Data(_)
- | Event::Time(_)
- | Event::ExecuteTrigger(_)
- | Event::TriggerCompleted(_),
+ EventBox::Pipeline(_)
+ | EventBox::Data(_)
+ | EventBox::Time(_)
+ | EventBox::ExecuteTrigger(_)
+ | EventBox::TriggerCompleted(_),
Self::Pipeline(_) | Self::Data(_) | Self::Time(_) | Self::ExecuteTrigger(_),
) => false,
}
@@ -279,16 +341,16 @@ pub mod stream {
/// Event sent by the peer.
#[derive(Debug, Clone, Decode, Encode, IntoSchema)]
#[repr(transparent)]
- pub struct EventMessage(pub Event);
+ pub struct EventMessage(pub EventBox);
/// Message sent by the stream consumer.
/// Request sent by the client to subscribe to events.
#[derive(Debug, Clone, Constructor, Decode, Encode, IntoSchema)]
#[repr(transparent)]
- pub struct EventSubscriptionRequest(pub EventFilterBox);
+ pub struct EventSubscriptionRequest(pub Vec);
}
- impl From for Event {
+ impl From for EventBox {
fn from(source: EventMessage) -> Self {
source.0
}
@@ -303,7 +365,7 @@ pub mod prelude {
pub use super::EventFilter;
pub use super::{
data::prelude::*, execute_trigger::prelude::*, pipeline::prelude::*, time::prelude::*,
- trigger_completed::prelude::*, Event, EventFilterBox, TriggeringEventFilterBox,
+ trigger_completed::prelude::*, EventBox, EventFilterBox, TriggeringEventFilterBox,
TriggeringEventType,
};
}
diff --git a/data_model/src/events/pipeline.rs b/data_model/src/events/pipeline.rs
index 5d3b962144f..0a4554adbda 100644
--- a/data_model/src/events/pipeline.rs
+++ b/data_model/src/events/pipeline.rs
@@ -1,59 +1,55 @@
//! Pipeline events.
#[cfg(not(feature = "std"))]
-use alloc::{format, string::String, vec::Vec};
+use alloc::{boxed::Box, format, string::String, vec::Vec};
-use getset::Getters;
-use iroha_crypto::Hash;
+use iroha_crypto::HashOf;
use iroha_data_model_derive::model;
use iroha_macro::FromVariant;
use iroha_schema::IntoSchema;
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};
-use strum::EnumDiscriminants;
pub use self::model::*;
+use crate::{
+ block::{BlockHeader, SignedBlock},
+ transaction::SignedTransaction,
+};
#[model]
pub mod model {
+ use getset::Getters;
+
use super::*;
- /// [`Event`] filter.
#[derive(
Debug,
Clone,
- Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
- Default,
- Getters,
+ FromVariant,
Decode,
Encode,
- Serialize,
Deserialize,
+ Serialize,
IntoSchema,
)]
- pub struct PipelineEventFilter {
- /// If `Some::`, filter by the [`EntityKind`]. If `None`, accept all the [`EntityKind`].
- pub(super) entity_kind: Option,
- /// If `Some::`, filter by the [`StatusKind`]. If `None`, accept all the [`StatusKind`].
- pub(super) status_kind: Option,
- /// If `Some::`, filter by the [`struct@Hash`]. If `None`, accept all the [`struct@Hash`].
- // TODO: Can we make hash typed like HashOf?
- pub(super) hash: Option,
+ #[ffi_type(opaque)]
+ pub enum PipelineEventBox {
+ Transaction(TransactionEvent),
+ Block(BlockEvent),
}
- /// The kind of the pipeline entity.
#[derive(
Debug,
Clone,
- Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
+ Getters,
Decode,
Encode,
Deserialize,
@@ -61,15 +57,13 @@ pub mod model {
IntoSchema,
)]
#[ffi_type]
- #[repr(u8)]
- pub enum PipelineEntityKind {
- /// Block
- Block,
- /// Transaction
- Transaction,
+ #[getset(get = "pub")]
+ pub struct BlockEvent {
+ pub header: BlockHeader,
+ pub hash: HashOf,
+ pub status: BlockStatus,
}
- /// Strongly-typed [`Event`] that tells the receiver the kind and the hash of the changed entity as well as its [`Status`].
#[derive(
Debug,
Clone,
@@ -84,18 +78,15 @@ pub mod model {
Serialize,
IntoSchema,
)]
- #[getset(get = "pub")]
#[ffi_type]
- pub struct PipelineEvent {
- /// [`EntityKind`] of the entity that caused this [`Event`].
- pub entity_kind: PipelineEntityKind,
- /// [`Status`] of the entity that caused this [`Event`].
- pub status: PipelineStatus,
- /// [`struct@Hash`] of the entity that caused this [`Event`].
- pub hash: Hash,
+ #[getset(get = "pub")]
+ pub struct TransactionEvent {
+ pub hash: HashOf,
+ pub block_height: Option,
+ pub status: TransactionStatus,
}
- /// [`Status`] of the entity.
+ /// Report of block's status in the pipeline
#[derive(
Debug,
Clone,
@@ -103,129 +94,221 @@ pub mod model {
Eq,
PartialOrd,
Ord,
- FromVariant,
- EnumDiscriminants,
Decode,
Encode,
+ Deserialize,
Serialize,
+ IntoSchema,
+ )]
+ #[ffi_type(opaque)]
+ pub enum BlockStatus {
+ /// Block was approved to participate in consensus
+ Approved,
+ /// Block was rejected by consensus
+ Rejected(crate::block::error::BlockRejectionReason),
+ /// Block has passed consensus successfully
+ Committed,
+ /// Changes have been reflected in the WSV
+ Applied,
+ }
+
+ /// Report of transaction's status in the pipeline
+ #[derive(
+ Debug,
+ Clone,
+ PartialEq,
+ Eq,
+ PartialOrd,
+ Ord,
+ Decode,
+ Encode,
Deserialize,
+ Serialize,
IntoSchema,
)]
- #[strum_discriminants(
- name(PipelineStatusKind),
- derive(PartialOrd, Ord, Decode, Encode, Deserialize, Serialize, IntoSchema,)
+ #[ffi_type(opaque)]
+ pub enum TransactionStatus {
+ /// Transaction was received and enqueued
+ Queued,
+ /// Transaction was dropped(not stored in a block)
+ Expired,
+ /// Transaction was stored in the block as valid
+ Approved,
+ /// Transaction was stored in the block as invalid
+ Rejected(Box),
+ }
+
+ #[derive(
+ Debug,
+ Clone,
+ PartialEq,
+ Eq,
+ PartialOrd,
+ Ord,
+ FromVariant,
+ Decode,
+ Encode,
+ Deserialize,
+ Serialize,
+ IntoSchema,
)]
#[ffi_type]
- pub enum PipelineStatus {
- /// Entity has been seen in the blockchain but has not passed validation.
- Validating,
- /// Entity was rejected during validation.
- Rejected(PipelineRejectionReason),
- /// Entity has passed validation.
- Committed,
+ pub enum PipelineEventFilterBox {
+ Transaction(TransactionEventFilter),
+ Block(BlockEventFilter),
}
- /// The reason for rejecting pipeline entity such as transaction or block.
#[derive(
Debug,
- displaydoc::Display,
Clone,
PartialEq,
Eq,
PartialOrd,
Ord,
- FromVariant,
+ Default,
+ Getters,
Decode,
Encode,
Deserialize,
Serialize,
IntoSchema,
)]
- #[cfg_attr(feature = "std", derive(thiserror::Error))]
#[ffi_type]
- pub enum PipelineRejectionReason {
- /// Block was rejected
- Block(#[cfg_attr(feature = "std", source)] crate::block::error::BlockRejectionReason),
- /// Transaction was rejected
- Transaction(
- #[cfg_attr(feature = "std", source)]
- crate::transaction::error::TransactionRejectionReason,
- ),
+ #[getset(get = "pub")]
+ pub struct BlockEventFilter {
+ pub height: Option,
+ pub status: Option,
+ }
+
+ #[derive(
+ Debug,
+ Clone,
+ PartialEq,
+ Eq,
+ PartialOrd,
+ Ord,
+ Default,
+ Getters,
+ Decode,
+ Encode,
+ Deserialize,
+ Serialize,
+ IntoSchema,
+ )]
+ #[ffi_type]
+ #[getset(get = "pub")]
+ pub struct TransactionEventFilter {
+ pub hash: Option>,
+ #[getset(skip)]
+ pub block_height: Option