From e33208ddf89aba047412ec21730c954fbae5e399 Mon Sep 17 00:00:00 2001 From: 0x009922 <43530070+0x009922@users.noreply.github.com> Date: Fri, 15 Dec 2023 08:09:44 +0000 Subject: [PATCH] [refactor]: split `iroha_torii` from `iroha` (#4139) * [refactor]: split `iroha_torii` from `iroha` Signed-off-by: Dmitry Balashov <43530070+0x009922@users.noreply.github.com> * [test]: fix doctest Signed-off-by: Dmitry Balashov <43530070+0x009922@users.noreply.github.com> * [refactor]: rename `iroha_torii_` to `*_` Signed-off-by: Dmitry Balashov <43530070+0x009922@users.noreply.github.com> --------- Signed-off-by: Dmitry Balashov <43530070+0x009922@users.noreply.github.com> --- Cargo.lock | 63 +++-- Cargo.toml | 6 +- cli/Cargo.toml | 33 +-- cli/src/lib.rs | 5 +- cli/src/torii/mod.rs | 145 ---------- torii/Cargo.toml | 48 ++++ {cli/derive => torii/macro}/Cargo.toml | 2 +- {cli/derive => torii/macro}/src/lib.rs | 3 +- {cli => torii}/src/event.rs | 0 torii/src/lib.rs | 359 ++++++++++++++++++++++++ {cli/src/torii => torii/src}/routing.rs | 238 ++-------------- {cli => torii}/src/stream.rs | 0 {cli/src/torii => torii/src}/utils.rs | 2 +- 13 files changed, 480 insertions(+), 424 deletions(-) delete mode 100644 cli/src/torii/mod.rs create mode 100644 torii/Cargo.toml rename {cli/derive => torii/macro}/Cargo.toml (93%) rename {cli/derive => torii/macro}/src/lib.rs (98%) rename {cli => torii}/src/event.rs (100%) create mode 100644 torii/src/lib.rs rename {cli/src/torii => torii/src}/routing.rs (57%) rename {cli => torii}/src/stream.rs (100%) rename {cli/src/torii => torii/src}/utils.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index fb0b976dace..1ff201d4026 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2610,14 +2610,8 @@ dependencies = [ name = "iroha" version = "2.0.0-pre-rc.20" dependencies = [ - "async-trait", "color-eyre", - "dashmap", - "derive_more", - "displaydoc", "eyre", - "futures", - "iroha_cli_derive", "iroha_config", "iroha_core", "iroha_crypto", @@ -2625,37 +2619,19 @@ dependencies = [ "iroha_futures", "iroha_genesis", "iroha_logger", - "iroha_macro", "iroha_p2p", "iroha_primitives", - "iroha_schema_gen", "iroha_telemetry", - "iroha_version", + "iroha_torii", "iroha_wasm_builder", "once_cell", "owo-colors", - "parity-scale-codec", - "serde", - "serde_json", "serial_test", "supports-color 2.1.0", - "tempfile", - "thiserror", "thread-local-panic-hook", "tokio", "tracing", "vergen", - "warp", -] - -[[package]] -name = "iroha_cli_derive" -version = "2.0.0-pre-rc.20" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "warp", ] [[package]] @@ -3240,6 +3216,43 @@ dependencies = [ "trybuild", ] +[[package]] +name = "iroha_torii" +version = "2.0.0-pre-rc.20" +dependencies = [ + "async-trait", + "displaydoc", + "eyre", + "futures", + "iroha_config", + "iroha_core", + "iroha_data_model", + "iroha_futures", + "iroha_logger", + "iroha_macro", + "iroha_primitives", + "iroha_schema_gen", + "iroha_telemetry", + "iroha_torii_macro", + "iroha_version", + "parity-scale-codec", + "serde", + "serde_json", + "thiserror", + "tokio", + "warp", +] + +[[package]] +name = "iroha_torii_macro" +version = "2.0.0-pre-rc.20" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", + "warp", +] + [[package]] name = "iroha_trigger" version = "2.0.0-pre-rc.20" diff --git a/Cargo.toml b/Cargo.toml index 3da58f35d73..66d4334ade9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,8 @@ categories = ["cryptography::cryptocurrencies"] [workspace.dependencies] iroha = { path = "cli" } iroha_dsl = { version = "=2.0.0-pre-rc.20", path = "dsl" } -iroha_cli_derive = { version = "=2.0.0-pre-rc.20", path = "cli/derive" } +iroha_torii = { version = "=2.0.0-pre-rc.20", path = "torii" } +iroha_torii_macro = { version = "=2.0.0-pre-rc.20", path = "torii/macro" } iroha_macro_utils = { version = "=2.0.0-pre-rc.20", path = "macro/utils" } iroha_telemetry = { version = "=2.0.0-pre-rc.20", path = "telemetry" } iroha_telemetry_derive = { version = "=2.0.0-pre-rc.20", path = "telemetry/derive" } @@ -205,7 +206,6 @@ clippy.wildcard_dependencies = "deny" resolver = "2" members = [ "cli", - "cli/derive", "client", "client_cli", "config", @@ -245,6 +245,8 @@ members = [ "tools/swarm", "tools/wasm_builder_cli", "tools/wasm_test_runner", + "torii", + "torii/macro", "version", "version/derive", "wasm_codec", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index e0abd7c480b..0ca76872233 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -25,13 +25,13 @@ bridge = ["iroha_core/bridge"] # Support Decentralised Exchange, including functionality for atomic exchange instruction dex = ["iroha_core/dex"] # Support lightweight telemetry, including diagnostics -telemetry = ["iroha_telemetry", "iroha_core/telemetry"] +telemetry = ["iroha_telemetry", "iroha_core/telemetry", "iroha_torii/telemetry"] # Support developer-specific telemetry. # Should not be enabled on production builds. dev-telemetry = ["iroha_core/dev-telemetry", "iroha_telemetry"] # Support schema generation from the `schema` endpoint in the local binary. # Useful for debugging issues with decoding in SDKs. -schema-endpoint = ["iroha_schema_gen"] +schema-endpoint = ["iroha_torii/schema"] # Support internal testing infrastructure for integration tests. # Disable in production. test-network = ["thread-local-panic-hook"] @@ -43,40 +43,25 @@ maintenance = { status = "actively-developed" } [dependencies] iroha_core = { workspace = true } -iroha_macro = { workspace = true } iroha_logger = { workspace = true } iroha_futures = { workspace = true } iroha_data_model = { workspace = true, features = ["http"] } iroha_primitives = { workspace = true } iroha_telemetry = { workspace = true, optional = true } -iroha_version = { workspace = true, features = ["http"] } iroha_config = { workspace = true } iroha_crypto = { workspace = true } iroha_p2p = { workspace = true } -iroha_schema_gen = { workspace = true, optional = true } -iroha_cli_derive = { workspace = true } +iroha_torii = { workspace = true } iroha_genesis = { workspace = true } iroha_wasm_builder = { workspace = true } - -derive_more = { workspace = true } -async-trait = { workspace = true } color-eyre = { workspace = true } eyre = { workspace = true } tracing = { workspace = true } -futures = { workspace = true, features = ["std", "async-await"] } -parity-scale-codec = { workspace = true, features = ["derive"] } -serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } -thiserror = { workspace = true } -displaydoc = { workspace = true } -tokio = { workspace = true, features = ["sync", "time", "rt", "io-util", "rt-multi-thread", "macros", "fs", "signal"] } -warp = { workspace = true, features = ["multipart", "websocket"] } +tokio = { workspace = true, features = ["macros", "signal"] } once_cell = { workspace = true } owo-colors = { workspace = true, features = ["supports-colors"] } supports-color = { workspace = true } -tempfile = { workspace = true } -dashmap = { workspace = true } thread-local-panic-hook = { version = "0.1.0", optional = true } @@ -91,10 +76,10 @@ vergen = { workspace = true, features = ["cargo"] } [package.metadata.cargo-all-features] denylist = [ -"bridge", -"dex", -"schema-endpoint", -"telemetry", -"test-network" + "bridge", + "dex", + "schema-endpoint", + "telemetry", + "test-network", ] # TODO: remove `dex` and `bridge` once there's code for them. skip_optional_dependencies = true diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 2d8a9a9c078..53ee9fe3306 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -33,18 +33,15 @@ use iroha_core::{ use iroha_data_model::prelude::*; use iroha_genesis::GenesisNetwork; use iroha_logger::actor::LoggerHandle; +use iroha_torii::Torii; use tokio::{ signal, sync::{broadcast, mpsc, Notify}, task, }; -use torii::Torii; -mod event; pub mod samples; -mod stream; pub mod style; -pub mod torii; /// Arguments for Iroha2. Configuration for arguments is parsed from /// environment variables and then the appropriate object is diff --git a/cli/src/torii/mod.rs b/cli/src/torii/mod.rs deleted file mode 100644 index 7780d4e5b74..00000000000 --- a/cli/src/torii/mod.rs +++ /dev/null @@ -1,145 +0,0 @@ -//! Translates to gateway. Request handling logic of Iroha. `Torii` -//! is used to receive, accept and route incoming instructions, -//! queries and messages. - -use std::{ - convert::Infallible, - fmt::{Debug, Write as _}, - net::ToSocketAddrs, - sync::Arc, -}; - -use futures::{stream::FuturesUnordered, StreamExt}; -use iroha_config::torii::Configuration as ToriiConfiguration; -use iroha_core::{ - kiso::{Error as KisoError, KisoHandle}, - kura::Kura, - prelude::*, - query::store::LiveQueryStoreHandle, - queue::{self, Queue}, - sumeragi::SumeragiHandle, - EventsSender, -}; -use iroha_primitives::addr::SocketAddr; -use tokio::sync::Notify; -use utils::*; -use warp::{ - http::StatusCode, - reply::{self, Json, Response}, - ws::{WebSocket, Ws}, - Filter as _, Reply, -}; - -#[macro_use] -pub(crate) mod utils; -mod routing; - -/// Main network handler and the only entrypoint of the Iroha. -pub struct Torii { - kiso: KisoHandle, - queue: Arc, - events: EventsSender, - notify_shutdown: Arc, - sumeragi: SumeragiHandle, - query_service: LiveQueryStoreHandle, - kura: Arc, - transaction_max_content_length: u64, - address: SocketAddr, -} - -/// Torii errors. -#[derive(Debug, thiserror::Error, displaydoc::Display)] -pub enum Error { - /// Failed to process query - Query(#[from] iroha_data_model::ValidationFail), - /// Failed to accept transaction - AcceptTransaction(#[from] iroha_core::tx::AcceptTransactionFail), - /// Error while getting or setting configuration - Config(#[source] eyre::Report), - /// Failed to push into queue - PushIntoQueue(#[from] Box), - #[cfg(feature = "telemetry")] - /// Error while getting Prometheus metrics - Prometheus(#[source] eyre::Report), - /// Internal error while getting status - StatusFailure(#[source] eyre::Report), - /// Failure caused by configuration subsystem - ConfigurationFailure(#[from] KisoError), - /// Cannot find status segment by provided path - StatusSegmentNotFound(#[source] eyre::Report), -} - -impl Reply for Error { - fn into_response(self) -> Response { - match self { - Self::Query(err) => { - reply::with_status(utils::Scale(&err), Self::query_status_code(&err)) - .into_response() - } - _ => reply::with_status(Self::to_string(&self), self.status_code()).into_response(), - } - } -} - -impl Error { - fn status_code(&self) -> StatusCode { - use Error::*; - - match self { - Query(e) => Self::query_status_code(e), - AcceptTransaction(_) => StatusCode::BAD_REQUEST, - Config(_) | StatusSegmentNotFound(_) => StatusCode::NOT_FOUND, - PushIntoQueue(err) => match **err { - queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR, - queue::Error::SignatureCondition { .. } => StatusCode::UNAUTHORIZED, - _ => StatusCode::BAD_REQUEST, - }, - #[cfg(feature = "telemetry")] - Prometheus(_) | StatusFailure(_) | ConfigurationFailure(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } - } - } - - fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode { - use iroha_data_model::{ - isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*, - ValidationFail::*, - }; - - match validation_error { - NotPermitted(_) => StatusCode::FORBIDDEN, - QueryFailed(query_error) - | InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error - { - Conversion(_) | UnknownCursor | FetchSizeTooBig => StatusCode::BAD_REQUEST, - Signature(_) => StatusCode::UNAUTHORIZED, - Find(_) => StatusCode::NOT_FOUND, - }, - TooComplex => StatusCode::UNPROCESSABLE_ENTITY, - InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, - InstructionFailed(error) => { - iroha_logger::error!( - ?error, - "Query validation failed with unexpected error. This means a bug inside Runtime Executor", - ); - StatusCode::INTERNAL_SERVER_ERROR - } - } - } - - fn to_string(err: &dyn std::error::Error) -> String { - let mut s = "Error:\n".to_owned(); - let mut idx = 0_i32; - let mut err_opt = Some(err); - while let Some(e) = err_opt { - write!(s, " {idx}: {}", &e.to_string()).expect("Valid"); - idx += 1_i32; - err_opt = e.source() - } - s - } -} - -/// Result type -pub type Result = std::result::Result; diff --git a/torii/Cargo.toml b/torii/Cargo.toml new file mode 100644 index 00000000000..5c6783dc63c --- /dev/null +++ b/torii/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "iroha_torii" + +edition.workspace = true +version.workspace = true +authors.workspace = true + +description.workspace = true +repository.workspace = true +homepage.workspace = true +documentation.workspace = true + +license.workspace = true +keywords.workspace = true +categories.workspace = true + +[lints] +workspace = true + +[features] +# Enables Telemetry (i.e. Status, Metrics, and API Version) endpoints +telemetry = ["iroha_telemetry", "iroha_core/telemetry", "serde_json"] +# Enables Data Model Schema endpoint +schema = ["iroha_schema_gen"] + +[dependencies] +iroha_core = { workspace = true } +iroha_config = { workspace = true } +iroha_primitives = { workspace = true } +iroha_logger = { workspace = true } +iroha_data_model = { workspace = true, features = ["http"] } +iroha_version = { workspace = true, features = ["http"] } +iroha_torii_macro = { workspace = true } +iroha_futures = { workspace = true } +iroha_macro = { workspace = true } +iroha_schema_gen = { workspace = true, optional = true } +iroha_telemetry = { workspace = true, optional = true } + +thiserror = { workspace = true } +displaydoc = { workspace = true } +futures = { workspace = true, features = ["std", "async-await"] } +warp = { workspace = true, features = ["multipart", "websocket"] } +tokio = { workspace = true, features = ["sync", "time", "macros"] } +eyre = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true, optional = true } +async-trait = { workspace = true } +parity-scale-codec = { workspace = true, features = ["derive"] } diff --git a/cli/derive/Cargo.toml b/torii/macro/Cargo.toml similarity index 93% rename from cli/derive/Cargo.toml rename to torii/macro/Cargo.toml index d258df86e55..3baec913c14 100644 --- a/cli/derive/Cargo.toml +++ b/torii/macro/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "iroha_cli_derive" +name = "iroha_torii_macro" edition.workspace = true version.workspace = true diff --git a/cli/derive/src/lib.rs b/torii/macro/src/lib.rs similarity index 98% rename from cli/derive/src/lib.rs rename to torii/macro/src/lib.rs index 00438a1df47..44bab73e304 100644 --- a/cli/derive/src/lib.rs +++ b/torii/macro/src/lib.rs @@ -39,9 +39,8 @@ use syn::{ /// ```rust /// use warp::{Rejection, Filter}; /// use std::{convert::Infallible, marker::PhantomData}; -/// // use iroha_cli::torii::utils::WarpResult; /// pub struct WarpResult(Result); -/// use iroha_cli_derive::generate_endpoints; +/// use iroha_torii_macro::generate_endpoints; /// /// // An example with arguments of both acceptable kinds. /// // This would generate endpoints accepting functions with diff --git a/cli/src/event.rs b/torii/src/event.rs similarity index 100% rename from cli/src/event.rs rename to torii/src/event.rs diff --git a/torii/src/lib.rs b/torii/src/lib.rs new file mode 100644 index 00000000000..31245d6ffc0 --- /dev/null +++ b/torii/src/lib.rs @@ -0,0 +1,359 @@ +//! The web server of Iroha. `Torii` translates to gateway. +//! +//! Crate provides the following features that are not enabled by default: +//! +//! - `telemetry`: enables Status, Metrics, and API Version endpoints +//! - `schema`: enables Data Model Schema endpoint + +use std::{ + convert::Infallible, + fmt::{Debug, Write as _}, + net::ToSocketAddrs, + sync::Arc, +}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use iroha_config::torii::{uri, Configuration as ToriiConfiguration}; +use iroha_core::{ + kiso::{Error as KisoError, KisoHandle}, + kura::Kura, + prelude::*, + query::store::LiveQueryStoreHandle, + queue::{self, Queue}, + sumeragi::SumeragiHandle, + EventsSender, +}; +use iroha_primitives::addr::SocketAddr; +use tokio::{sync::Notify, task}; +use utils::*; +use warp::{ + http::StatusCode, + reply::{self, Json, Response}, + ws::{WebSocket, Ws}, + Filter as _, Reply, +}; + +#[macro_use] +pub(crate) mod utils; +mod event; +mod routing; +mod stream; + +/// Main network handler and the only entrypoint of the Iroha. +pub struct Torii { + kiso: KisoHandle, + queue: Arc, + events: EventsSender, + notify_shutdown: Arc, + sumeragi: SumeragiHandle, + query_service: LiveQueryStoreHandle, + kura: Arc, + transaction_max_content_length: u64, + address: SocketAddr, +} + +impl Torii { + /// Construct `Torii`. + #[allow(clippy::too_many_arguments)] + pub fn new( + kiso: KisoHandle, + config: &ToriiConfiguration, + queue: Arc, + events: EventsSender, + notify_shutdown: Arc, + sumeragi: SumeragiHandle, + query_service: LiveQueryStoreHandle, + kura: Arc, + ) -> Self { + Self { + kiso, + queue, + events, + notify_shutdown, + sumeragi, + query_service, + kura, + address: config.api_url.clone(), + transaction_max_content_length: config.max_content_len.into(), + } + } + + /// Helper function to create router. This router can be tested without starting up an HTTP server + #[allow(clippy::too_many_lines)] + fn create_api_router(&self) -> impl warp::Filter + Clone + Send { + let health_route = warp::get() + .and(warp::path(uri::HEALTH)) + .and_then(|| async { Ok::<_, Infallible>(routing::handle_health()) }); + + let get_router = warp::get().and( + endpoint3( + routing::handle_pending_transactions, + warp::path(uri::PENDING_TRANSACTIONS) + .and(add_state!(self.queue, self.sumeragi,)) + .and(routing::paginate()), + ) + .or(warp::path(uri::CONFIGURATION) + .and(add_state!(self.kiso)) + .and_then(|kiso| async move { + Ok::<_, Infallible>(WarpResult(routing::handle_get_configuration(kiso).await)) + })), + ); + + #[cfg(feature = "telemetry")] + let get_router = get_router + .or(warp::path(uri::STATUS) + .and(add_state!(self.sumeragi.clone())) + .and(warp::header::optional(warp::http::header::ACCEPT.as_str())) + .and(warp::path::tail()) + .and_then(|sumeragi, accept: Option, tail| async move { + Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_status( + &sumeragi, + accept.as_ref(), + &tail, + ))) + })) + .or(warp::path(uri::METRICS) + .and(add_state!(self.sumeragi)) + .and_then(|sumeragi| async move { + Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_metrics( + &sumeragi, + ))) + })) + .or(warp::path(uri::API_VERSION) + .and(add_state!(self.sumeragi.clone())) + .and_then(|sumeragi| async { + Ok::<_, Infallible>(routing::handle_version(sumeragi).await) + })); + + #[cfg(feature = "schema")] + let get_router = get_router.or(warp::path(uri::SCHEMA) + .and_then(|| async { Ok::<_, Infallible>(routing::handle_schema().await) })); + + let post_router = warp::post() + .and( + endpoint3( + routing::handle_transaction, + warp::path(uri::TRANSACTION) + .and(add_state!(self.queue, self.sumeragi)) + .and(warp::body::content_length_limit( + self.transaction_max_content_length, + )) + .and(body::versioned()), + ) + .or(endpoint3( + routing::handle_queries, + warp::path(uri::QUERY) + .and(add_state!(self.query_service, self.sumeragi,)) + .and(routing::client_query_request()), + )) + .or(endpoint2( + routing::handle_post_configuration, + warp::path(uri::CONFIGURATION) + .and(add_state!(self.kiso)) + .and(warp::body::json()), + )), + ) + .recover(|rejection| async move { body::recover_versioned(rejection) }); + + let events_ws_router = warp::path(uri::SUBSCRIPTION) + .and(add_state!(self.events)) + .and(warp::ws()) + .map(|events, ws: Ws| { + ws.on_upgrade(|this_ws| async move { + if let Err(error) = + routing::subscription::handle_subscription(events, this_ws).await + { + iroha_logger::error!(%error, "Failure during subscription"); + } + }) + }); + + // `warp` panics if there is `/` in the string given to the `warp::path` filter + // Path filter has to be boxed to have a single uniform type during iteration + let block_ws_router_path = uri::BLOCKS_STREAM + .split('/') + .skip_while(|p| p.is_empty()) + .fold(warp::any().boxed(), |path_filter, path| { + path_filter.and(warp::path(path)).boxed() + }); + + let blocks_ws_router = block_ws_router_path + .and(add_state!(self.kura)) + .and(warp::ws()) + .map(|sumeragi: Arc<_>, ws: Ws| { + ws.on_upgrade(|this_ws| async move { + if let Err(error) = routing::handle_blocks_stream(sumeragi, this_ws).await { + iroha_logger::error!(%error, "Failed to subscribe to blocks stream"); + } + }) + }); + + let ws_router = events_ws_router.or(blocks_ws_router); + + warp::any() + .and( + // we want to avoid logging for the "health" endpoint. + // we have to place it **first** so that warp's trace will + // not log 404 if it doesn't find "/health" which might be placed + // **after** `.with(trace)` + health_route, + ) + .or(ws_router + .or(get_router) + .or(post_router) + .with(warp::trace::request())) + } + + /// Start main api endpoints. + /// + /// # Errors + /// Can fail due to listening to network or if http server fails + fn start_api(self: Arc) -> eyre::Result>> { + let torii_address = &self.address; + + let mut handles = vec![]; + match torii_address.to_socket_addrs() { + Ok(addrs) => { + for addr in addrs { + let torii = Arc::clone(&self); + + let api_router = torii.create_api_router(); + let signal_fut = async move { torii.notify_shutdown.notified().await }; + let (_, serve_fut) = + warp::serve(api_router).bind_with_graceful_shutdown(addr, signal_fut); + + handles.push(task::spawn(serve_fut)); + } + + Ok(handles) + } + Err(error) => { + iroha_logger::error!(%torii_address, %error, "API address configuration parse error"); + Err(eyre::Error::new(error)) + } + } + } + + /// To handle incoming requests `Torii` should be started first. + /// + /// # Errors + /// Can fail due to listening to network or if http server fails + #[iroha_futures::telemetry_future] + pub async fn start(self) -> eyre::Result<()> { + let torii = Arc::new(self); + let mut handles = vec![]; + + handles.extend(Arc::clone(&torii).start_api()?); + + handles + .into_iter() + .collect::>() + .for_each(|handle| { + if let Err(error) = handle { + iroha_logger::error!(%error, "Join handle error"); + } + + futures::future::ready(()) + }) + .await; + + Ok(()) + } +} + +/// Torii errors. +#[derive(Debug, thiserror::Error, displaydoc::Display)] +pub enum Error { + /// Failed to process query + Query(#[from] iroha_data_model::ValidationFail), + /// Failed to accept transaction + AcceptTransaction(#[from] iroha_core::tx::AcceptTransactionFail), + /// Error while getting or setting configuration + Config(#[source] eyre::Report), + /// Failed to push into queue + PushIntoQueue(#[from] Box), + #[cfg(feature = "telemetry")] + /// Error while getting Prometheus metrics + Prometheus(#[source] eyre::Report), + #[cfg(feature = "telemetry")] + /// Internal error while getting status + StatusFailure(#[source] eyre::Report), + /// Failure caused by configuration subsystem + ConfigurationFailure(#[from] KisoError), + /// Cannot find status segment by provided path + StatusSegmentNotFound(#[source] eyre::Report), +} + +impl Reply for Error { + fn into_response(self) -> Response { + match self { + Self::Query(err) => { + reply::with_status(utils::Scale(&err), Self::query_status_code(&err)) + .into_response() + } + _ => reply::with_status(Self::to_string(&self), self.status_code()).into_response(), + } + } +} + +impl Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + Query(e) => Self::query_status_code(e), + AcceptTransaction(_) => StatusCode::BAD_REQUEST, + Config(_) | StatusSegmentNotFound(_) => StatusCode::NOT_FOUND, + PushIntoQueue(err) => match **err { + queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR, + queue::Error::SignatureCondition { .. } => StatusCode::UNAUTHORIZED, + _ => StatusCode::BAD_REQUEST, + }, + #[cfg(feature = "telemetry")] + Prometheus(_) | StatusFailure(_) => StatusCode::INTERNAL_SERVER_ERROR, + ConfigurationFailure(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode { + use iroha_data_model::{ + isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*, + ValidationFail::*, + }; + + match validation_error { + NotPermitted(_) => StatusCode::FORBIDDEN, + QueryFailed(query_error) + | InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error + { + Conversion(_) | UnknownCursor | FetchSizeTooBig => StatusCode::BAD_REQUEST, + Signature(_) => StatusCode::UNAUTHORIZED, + Find(_) => StatusCode::NOT_FOUND, + }, + TooComplex => StatusCode::UNPROCESSABLE_ENTITY, + InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, + InstructionFailed(error) => { + iroha_logger::error!( + ?error, + "Query validation failed with unexpected error. This means a bug inside Runtime Executor", + ); + StatusCode::INTERNAL_SERVER_ERROR + } + } + } + + fn to_string(err: &dyn std::error::Error) -> String { + let mut s = "Error:\n".to_owned(); + let mut idx = 0_i32; + let mut err_opt = Some(err); + while let Some(e) = err_opt { + write!(s, " {idx}: {}", &e.to_string()).expect("Valid"); + idx += 1_i32; + err_opt = e.source() + } + s + } +} + +/// Result type +pub type Result = std::result::Result; diff --git a/cli/src/torii/routing.rs b/torii/src/routing.rs similarity index 57% rename from cli/src/torii/routing.rs rename to torii/src/routing.rs index 6a9298974ce..baf083998b6 100644 --- a/cli/src/torii/routing.rs +++ b/torii/src/routing.rs @@ -5,9 +5,10 @@ // FIXME: This can't be fixed, because one trait in `warp` is private. #![allow(opaque_hidden_inferred_bound)] +#[cfg(feature = "telemetry")] use eyre::{eyre, WrapErr}; use futures::TryStreamExt; -use iroha_config::{client_api::ConfigurationDTO, torii::uri}; +use iroha_config::client_api::ConfigurationDTO; use iroha_core::{ query::{pagination::Paginate, store::LiveQueryStoreHandle}, smartcontracts::query::ValidQueryRequest, @@ -33,7 +34,7 @@ use super::*; use crate::stream::{Sink, Stream}; /// Filter for warp which extracts [`http::ClientQueryRequest`] -fn client_query_request( +pub fn client_query_request( ) -> impl warp::Filter + Copy { body::versioned::() .and(sorting()) @@ -64,7 +65,7 @@ fn cursor() -> impl warp::Filter impl warp::Filter + Copy { +pub fn paginate() -> impl warp::Filter + Copy { warp::query() } @@ -74,7 +75,7 @@ fn fetch_size() -> impl warp::Filter, sumeragi: SumeragiHandle, transaction: SignedTransaction, @@ -98,13 +99,13 @@ async fn handle_transaction( } #[iroha_futures::telemetry_future] -async fn handle_queries( +pub async fn handle_queries( live_query_store: LiveQueryStoreHandle, sumeragi: SumeragiHandle, query_request: http::ClientQueryRequest, ) -> Result>> { - let handle = tokio::task::spawn_blocking(move || match query_request.0 { + let handle = task::spawn_blocking(move || match query_request.0 { QueryRequest::Query(QueryWithParameters { query: signed_query, sorting, @@ -134,18 +135,18 @@ enum Health { Healthy, } -fn handle_health() -> Json { +pub fn handle_health() -> Json { reply::json(&Health::Healthy) } #[iroha_futures::telemetry_future] -#[cfg(feature = "schema-endpoint")] -async fn handle_schema() -> Json { +#[cfg(feature = "schema")] +pub async fn handle_schema() -> Json { reply::json(&iroha_schema_gen::build_schemas()) } #[iroha_futures::telemetry_future] -async fn handle_pending_transactions( +pub async fn handle_pending_transactions( queue: Arc, sumeragi: SumeragiHandle, pagination: Pagination, @@ -164,13 +165,13 @@ async fn handle_pending_transactions( } #[iroha_futures::telemetry_future] -async fn handle_get_configuration(kiso: KisoHandle) -> Result { +pub async fn handle_get_configuration(kiso: KisoHandle) -> Result { let dto = kiso.get_dto().await?; Ok(reply::json(&dto)) } #[iroha_futures::telemetry_future] -async fn handle_post_configuration( +pub async fn handle_post_configuration( kiso: KisoHandle, value: ConfigurationDTO, ) -> Result { @@ -179,7 +180,7 @@ async fn handle_post_configuration( } #[iroha_futures::telemetry_future] -async fn handle_blocks_stream(kura: Arc, mut stream: WebSocket) -> eyre::Result<()> { +pub async fn handle_blocks_stream(kura: Arc, mut stream: WebSocket) -> eyre::Result<()> { let BlockSubscriptionRequest(mut from_height) = stream.recv().await?; let mut interval = tokio::time::interval(std::time::Duration::from_millis(10)); @@ -221,7 +222,7 @@ async fn handle_blocks_stream(kura: Arc, mut stream: WebSocket) -> eyre::R } } -mod subscription { +pub mod subscription { //! Contains the `handle_subscription` functions and used for general routing. use super::*; @@ -303,7 +304,7 @@ mod subscription { #[iroha_futures::telemetry_future] #[cfg(feature = "telemetry")] -async fn handle_version(sumeragi: SumeragiHandle) -> Json { +pub async fn handle_version(sumeragi: SumeragiHandle) -> Json { use iroha_version::Version; let string = sumeragi @@ -315,7 +316,7 @@ async fn handle_version(sumeragi: SumeragiHandle) -> Json { } #[cfg(feature = "telemetry")] -fn handle_metrics(sumeragi: &SumeragiHandle) -> Result { +pub fn handle_metrics(sumeragi: &SumeragiHandle) -> Result { if let Err(error) = sumeragi.update_metrics() { iroha_logger::error!(%error, "Error while calling sumeragi::update_metrics."); } @@ -333,7 +334,7 @@ fn update_metrics_gracefully(sumeragi: &SumeragiHandle) { #[cfg(feature = "telemetry")] #[allow(clippy::unnecessary_wraps)] -fn handle_status( +pub fn handle_status( sumeragi: &SumeragiHandle, accept: Option>, tail: &warp::path::Tail, @@ -370,206 +371,3 @@ fn handle_status( Ok(reply) } } - -impl Torii { - /// Construct `Torii`. - #[allow(clippy::too_many_arguments)] - pub fn new( - kiso: KisoHandle, - config: &ToriiConfiguration, - queue: Arc, - events: EventsSender, - notify_shutdown: Arc, - sumeragi: SumeragiHandle, - query_service: LiveQueryStoreHandle, - kura: Arc, - ) -> Self { - Self { - kiso, - queue, - events, - notify_shutdown, - sumeragi, - query_service, - kura, - address: config.api_url.clone(), - transaction_max_content_length: config.max_content_len.into(), - } - } - - /// Helper function to create router. This router can tested without starting up an HTTP server - #[allow(clippy::too_many_lines)] - fn create_api_router(&self) -> impl warp::Filter + Clone + Send { - let health_route = warp::get() - .and(warp::path(uri::HEALTH)) - .and_then(|| async { Ok::<_, Infallible>(handle_health()) }); - - let get_router = warp::get().and( - endpoint3( - handle_pending_transactions, - warp::path(uri::PENDING_TRANSACTIONS) - .and(add_state!(self.queue, self.sumeragi,)) - .and(paginate()), - ) - .or(warp::path(uri::CONFIGURATION) - .and(add_state!(self.kiso)) - .and_then(|kiso| async move { - Ok::<_, Infallible>(WarpResult(handle_get_configuration(kiso).await)) - })), - ); - - let get_router_status = warp::path(uri::STATUS) - .and(add_state!(self.sumeragi.clone())) - .and(warp::header::optional(warp::http::header::ACCEPT.as_str())) - .and(warp::path::tail()) - .and_then(|sumeragi, accept: Option, tail| async move { - Ok::<_, Infallible>(WarpResult(handle_status(&sumeragi, accept.as_ref(), &tail))) - }); - let get_router_metrics = warp::path(uri::METRICS) - .and(add_state!(self.sumeragi)) - .and_then(|sumeragi| async move { - Ok::<_, Infallible>(WarpResult(handle_metrics(&sumeragi))) - }); - let get_api_version = warp::path(uri::API_VERSION) - .and(add_state!(self.sumeragi.clone())) - .and_then(|sumeragi| async { Ok::<_, Infallible>(handle_version(sumeragi).await) }); - - #[cfg(feature = "telemetry")] - let get_router = get_router.or(warp::any() - .and(get_router_status) - .or(get_router_metrics) - .or(get_api_version)); - - #[cfg(feature = "schema-endpoint")] - let get_router = get_router.or(warp::path(uri::SCHEMA) - .and_then(|| async { Ok::<_, Infallible>(handle_schema().await) })); - - let post_router = warp::post() - .and( - endpoint3( - handle_transaction, - warp::path(uri::TRANSACTION) - .and(add_state!(self.queue, self.sumeragi)) - .and(warp::body::content_length_limit( - self.transaction_max_content_length, - )) - .and(body::versioned()), - ) - .or(endpoint3( - handle_queries, - warp::path(uri::QUERY) - .and(add_state!(self.query_service, self.sumeragi,)) - .and(client_query_request()), - )) - .or(endpoint2( - handle_post_configuration, - warp::path(uri::CONFIGURATION) - .and(add_state!(self.kiso)) - .and(warp::body::json()), - )), - ) - .recover(|rejection| async move { body::recover_versioned(rejection) }); - - let events_ws_router = warp::path(uri::SUBSCRIPTION) - .and(add_state!(self.events)) - .and(warp::ws()) - .map(|events, ws: Ws| { - ws.on_upgrade(|this_ws| async move { - if let Err(error) = subscription::handle_subscription(events, this_ws).await { - iroha_logger::error!(%error, "Failure during subscription"); - } - }) - }); - - // `warp` panics if there is `/` in the string given to the `warp::path` filter - // Path filter has to be boxed to have a single uniform type during iteration - let block_ws_router_path = uri::BLOCKS_STREAM - .split('/') - .skip_while(|p| p.is_empty()) - .fold(warp::any().boxed(), |path_filter, path| { - path_filter.and(warp::path(path)).boxed() - }); - - let blocks_ws_router = block_ws_router_path - .and(add_state!(self.kura)) - .and(warp::ws()) - .map(|sumeragi: Arc<_>, ws: Ws| { - ws.on_upgrade(|this_ws| async move { - if let Err(error) = handle_blocks_stream(sumeragi, this_ws).await { - iroha_logger::error!(%error, "Failed to subscribe to blocks stream"); - } - }) - }); - - let ws_router = events_ws_router.or(blocks_ws_router); - - warp::any() - .and( - // we want to avoid logging for the "health" endpoint. - // we have to place it **first** so that warp's trace will - // not log 404 if it doesn't find "/health" which might be placed - // **after** `.with(trace)` - health_route, - ) - .or(ws_router - .or(get_router) - .or(post_router) - .with(warp::trace::request())) - } - - /// Start main api endpoints. - /// - /// # Errors - /// Can fail due to listening to network or if http server fails - fn start_api(self: Arc) -> eyre::Result>> { - let torii_address = &self.address; - - let mut handles = vec![]; - match torii_address.to_socket_addrs() { - Ok(addrs) => { - for addr in addrs { - let torii = Arc::clone(&self); - - let api_router = torii.create_api_router(); - let signal_fut = async move { torii.notify_shutdown.notified().await }; - let (_, serve_fut) = - warp::serve(api_router).bind_with_graceful_shutdown(addr, signal_fut); - - handles.push(task::spawn(serve_fut)); - } - - Ok(handles) - } - Err(error) => { - iroha_logger::error!(%torii_address, %error, "API address configuration parse error"); - Err(eyre::Error::new(error)) - } - } - } - - /// To handle incoming requests `Torii` should be started first. - /// - /// # Errors - /// Can fail due to listening to network or if http server fails - #[iroha_futures::telemetry_future] - pub(crate) async fn start(self) -> eyre::Result<()> { - let torii = Arc::new(self); - let mut handles = vec![]; - - handles.extend(Arc::clone(&torii).start_api()?); - - handles - .into_iter() - .collect::>() - .for_each(|handle| { - if let Err(error) = handle { - iroha_logger::error!(%error, "Join handle error"); - } - - futures::future::ready(()) - }) - .await; - - Ok(()) - } -} diff --git a/cli/src/stream.rs b/torii/src/stream.rs similarity index 100% rename from cli/src/stream.rs rename to torii/src/stream.rs diff --git a/cli/src/torii/utils.rs b/torii/src/utils.rs similarity index 97% rename from cli/src/torii/utils.rs rename to torii/src/utils.rs index 6a5e2432fa6..805917872bb 100644 --- a/cli/src/torii/utils.rs +++ b/torii/src/utils.rs @@ -81,4 +81,4 @@ impl Reply for WarpResult { } } -iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 6, 7); +iroha_torii_macro::generate_endpoints!(2, 3, 4, 5, 6, 7);