diff --git a/Cargo.lock b/Cargo.lock index 96e897678c5..8e402b83a6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -388,13 +388,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "itoa", "matchit", "memchr", @@ -403,7 +403,44 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "base64 0.21.7", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "multer", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper 1.0.1", + "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -419,9 +456,29 @@ dependencies = [ "bytes", "futures-util", "http 0.2.12", - "http-body", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 0.1.2", "tower-layer", "tower-service", ] @@ -2488,30 +2545,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "headers" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" -dependencies = [ - "base64 0.21.7", - "bytes", - "headers-core", - "http 0.2.12", - "httpdate", - "mime", - "sha1", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http 0.2.12", -] - [[package]] name = "heck" version = "0.4.1" @@ -2614,6 +2647,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" @@ -2644,7 +2700,7 @@ dependencies = [ "futures-util", "h2", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -2656,18 +2712,52 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-timeout" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -3040,7 +3130,6 @@ dependencies = [ "strum 0.25.0", "thiserror", "trybuild", - "warp", ] [[package]] @@ -3417,6 +3506,7 @@ name = "iroha_torii" version = "2.0.0-pre-rc.21" dependencies = [ "async-trait", + "axum 0.7.5", "displaydoc 0.2.4 (git+https://github.com/akonradi-signal/displaydoc.git?branch=anonymous-const)", "eyre", "futures", @@ -3427,10 +3517,10 @@ dependencies = [ "iroha_logger", "iroha_macro", "iroha_primitives", + "iroha_schema", "iroha_schema_gen", "iroha_telemetry", "iroha_torii_const", - "iroha_torii_derive", "iroha_version", "nonzero_ext", "parity-scale-codec", @@ -3439,7 +3529,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "warp", + "tower-http", ] [[package]] @@ -3449,17 +3539,6 @@ dependencies = [ "iroha_primitives", ] -[[package]] -name = "iroha_torii_derive" -version = "2.0.0-pre-rc.21" -dependencies = [ - "manyhow", - "proc-macro2", - "quote", - "syn 2.0.66", - "warp", -] - [[package]] name = "iroha_trigger" version = "2.0.0-pre-rc.21" @@ -3493,7 +3572,6 @@ dependencies = [ "serde", "serde_json", "thiserror", - "warp", ] [[package]] @@ -3938,16 +4016,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3983,16 +4051,15 @@ dependencies = [ [[package]] name = "multer" -version = "2.1.0" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" dependencies = [ "bytes", "encoding_rs", "futures-util", - "http 0.2.12", + "http 1.1.0", "httparse", - "log", "memchr", "mime", "spin", @@ -5021,12 +5088,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -5148,6 +5209,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.6" @@ -5599,6 +5670,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tap" version = "1.0.1" @@ -5949,13 +6026,13 @@ checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", "h2", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-timeout", "percent-encoding", "pin-project", @@ -5988,6 +6065,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.5.0", + "bytes", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.2" @@ -6156,15 +6251,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" -[[package]] -name = "unicase" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.15" @@ -6368,35 +6454,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "warp" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "headers", - "http 0.2.12", - "hyper", - "log", - "mime", - "mime_guess", - "multer", - "percent-encoding", - "pin-project", - "scoped-tls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-tungstenite", - "tokio-util", - "tower-service", - "tracing", -] - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 19669dc7ba9..0de2ab17305 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ iroha_core = { version = "=2.0.0-pre-rc.21 ", path = "core" } irohad = { version = "=2.0.0-pre-rc.21", path = "cli" } iroha_torii = { version = "=2.0.0-pre-rc.21", path = "torii" } -iroha_torii_derive = { version = "=2.0.0-pre-rc.21", path = "torii/derive" } iroha_torii_const = { version = "=2.0.0-pre-rc.21", path = "torii/const" } iroha = { version = "=2.0.0-pre-rc.21", path = "client" } @@ -117,10 +116,10 @@ getset = "0.1.2" hex-literal = "0.4.1" rand = { version = "0.8.5", default-features = false, features = ["getrandom", "alloc"] } -warp = { version = "0.3.7", default-features = false } +axum = { version = "0.7.5", default-features = false } wasmtime = "15.0.1" -tracing = "0.1.40" +tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", default-features = false } dashmap = "5.5.3" @@ -245,7 +244,6 @@ members = [ "tools/wasm_builder_cli", "tools/wasm_test_runner", "torii", - "torii/derive", "torii/const", "version", "version/derive", diff --git a/client/Cargo.toml b/client/Cargo.toml index 42c820a2a0d..0d8e38e99cd 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -57,7 +57,7 @@ iroha_primitives = { workspace = true } iroha_logger = { workspace = true } iroha_telemetry = { workspace = true } iroha_torii_const = { workspace = true } -iroha_version = { workspace = true, features = ["http"] } +iroha_version = { workspace = true } test_samples = { workspace = true } attohttpc = { version = "0.28.0", default-features = false } diff --git a/configs/swarm/executor.wasm b/configs/swarm/executor.wasm index c3d58d763d5..9a3d26974ba 100644 Binary files a/configs/swarm/executor.wasm and b/configs/swarm/executor.wasm differ diff --git a/data_model/Cargo.toml b/data_model/Cargo.toml index a348477d243..33ad0f38467 100644 --- a/data_model/Cargo.toml +++ b/data_model/Cargo.toml @@ -23,7 +23,7 @@ default = ["std"] # Please refer to https://docs.rust-embedded.org/book/intro/no-std.html std = ["iroha_macro/std", "iroha_version/std", "iroha_crypto/std", "iroha_primitives/std", "thiserror", "displaydoc/std", "strum/std", "once_cell"] # Enable API for HTTP requests. Should be activated for HTTP clients -http = ["std", "warp", "iroha_version/http"] +http = ["std"] # Replace structures and methods with FFI equivalents to facilitate dynamic linkage (mainly used in smartcontracts) #ffi_import = ["iroha_ffi", "iroha_primitives/ffi_import", "iroha_crypto/ffi_import"] @@ -46,7 +46,6 @@ derive_more = { workspace = true, features = ["as_ref", "display", "constructor" serde = { workspace = true, features = ["derive"] } serde_with = { workspace = true, features = ["macros"] } serde_json = { workspace = true } -warp = { workspace = true, optional = true } thiserror = { workspace = true, optional = true } displaydoc = { workspace = true } getset = { workspace = true } diff --git a/genesis/Cargo.toml b/genesis/Cargo.toml index 964d5ee1aa8..4b2663eb800 100644 --- a/genesis/Cargo.toml +++ b/genesis/Cargo.toml @@ -16,7 +16,7 @@ iroha_data_model = { workspace = true, features = ["http"] } derive_more = { workspace = true, features = ["deref"] } serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } +serde_json = { workspace = true, features = ["std"] } once_cell = { workspace = true } tracing = { workspace = true } eyre = { workspace = true } diff --git a/torii/Cargo.toml b/torii/Cargo.toml index 620c0776e07..08811df2224 100644 --- a/torii/Cargo.toml +++ b/torii/Cargo.toml @@ -23,7 +23,7 @@ telemetry = ["iroha_telemetry", "iroha_core/telemetry", "serde_json"] # Enables profiling endpoint profiling = ["pprof"] # Enables Data Model Schema endpoint -schema = ["iroha_schema_gen"] +schema = ["iroha_schema", "iroha_schema_gen"] [dependencies] iroha_core = { workspace = true } @@ -31,18 +31,19 @@ iroha_config = { workspace = true } iroha_primitives = { workspace = true } iroha_logger = { workspace = true } iroha_data_model = { workspace = true, features = ["http"] } -iroha_version = { workspace = true, features = ["http"] } -iroha_torii_derive = { workspace = true } +iroha_version = { workspace = true } iroha_torii_const = { workspace = true } iroha_futures = { workspace = true } iroha_macro = { workspace = true } +iroha_schema = { workspace = true, optional = true } iroha_schema_gen = { workspace = true, optional = true } iroha_telemetry = { workspace = true, optional = true } thiserror = { workspace = true } displaydoc = { workspace = true } futures = { workspace = true, features = ["std", "async-await"] } -warp = { workspace = true, features = ["multipart", "websocket"] } +axum = { workspace = true, features = ["multipart", "ws", "query", "json", "tokio", "http1"] } +tower-http = { version = "0.5.0", features = ["trace", "timeout"] } tokio = { workspace = true, features = ["sync", "time", "macros"] } eyre = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/torii/const/src/lib.rs b/torii/const/src/lib.rs index 080149b851b..0330e4e52bf 100644 --- a/torii/const/src/lib.rs +++ b/torii/const/src/lib.rs @@ -7,30 +7,30 @@ pub mod uri { pub const DEFAULT_API_ADDR: iroha_primitives::addr::SocketAddr = iroha_primitives::addr::socket_addr!(127.0.0.1:8080); /// Query URI is used to handle incoming Query requests. - pub const QUERY: &str = "query"; + pub const QUERY: &str = "/query"; /// Transaction URI is used to handle incoming ISI requests. - pub const TRANSACTION: &str = "transaction"; + pub const TRANSACTION: &str = "/transaction"; /// Block URI is used to handle incoming Block requests. - pub const CONSENSUS: &str = "consensus"; + pub const CONSENSUS: &str = "/consensus"; /// Health URI is used to handle incoming Healthcheck requests. - pub const HEALTH: &str = "health"; + pub const HEALTH: &str = "/health"; /// The URI used for block synchronization. - pub const BLOCK_SYNC: &str = "block/sync"; + pub const BLOCK_SYNC: &str = "/block/sync"; /// The web socket uri used to subscribe to block and transactions statuses. - pub const SUBSCRIPTION: &str = "events"; + pub const SUBSCRIPTION: &str = "/events"; /// The web socket uri used to subscribe to blocks stream. - pub const BLOCKS_STREAM: &str = "block/stream"; + pub const BLOCKS_STREAM: &str = "/block/stream"; /// The URI for local config changing inspecting - pub const CONFIGURATION: &str = "configuration"; + pub const CONFIGURATION: &str = "/configuration"; /// URI to report status for administration - pub const STATUS: &str = "status"; + pub const STATUS: &str = "/status"; /// Metrics URI is used to export metrics according to [Prometheus /// Guidance](https://prometheus.io/docs/instrumenting/writing_exporters/). - pub const METRICS: &str = "metrics"; + pub const METRICS: &str = "/metrics"; /// URI for retrieving the schema with which Iroha was built. - pub const SCHEMA: &str = "schema"; + pub const SCHEMA: &str = "/schema"; /// URI for getting the API version currently used - pub const API_VERSION: &str = "api_version"; + pub const API_VERSION: &str = "/api_version"; /// URI for getting cpu profile - pub const PROFILE: &str = "debug/pprof/profile"; + pub const PROFILE: &str = "/debug/pprof/profile"; } diff --git a/torii/derive/Cargo.toml b/torii/derive/Cargo.toml deleted file mode 100644 index 249ae035fd8..00000000000 --- a/torii/derive/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "iroha_torii_derive" - -edition.workspace = true -version.workspace = true -authors.workspace = true - -license.workspace = true - -[lints] -workspace = true - -[lib] -proc-macro = true - -[dependencies] -syn = { workspace = true } -quote = { workspace = true } -proc-macro2 = { workspace = true } -manyhow = { workspace = true } - -[dev-dependencies] -warp = { workspace = true } diff --git a/torii/derive/src/lib.rs b/torii/derive/src/lib.rs deleted file mode 100644 index 1f302cab7b2..00000000000 --- a/torii/derive/src/lib.rs +++ /dev/null @@ -1,161 +0,0 @@ -//! Crate with a proc macro for torii endpoint generation - -use manyhow::{manyhow, Result}; -use proc_macro2::{Span, TokenStream}; -use quote::quote; -use syn::{ - parse::{Parse, ParseStream}, - punctuated::Punctuated, - Ident, LitInt, Token, -}; - -/// Generate warp filters for endpoints, accepting functions -/// with any positive number of arguments within the range of `u8`. -/// -/// Only the endpoint functions stated explicitly in the macro invocation -/// are created. -/// -/// There are two kinds of accepted arguments. One is supplying -/// an integer literal denoting the number of arguments in a function -/// that the endpoint accepts. The endpoint name is generated automatically -/// in this case and will be in the shape of `endpoint{arg_count}`. -/// -/// Another kind is a colon-separated string literal -/// followed by an integer literal, denoting custom name of the endpoint being -/// created and the number of arguments in a function that it accepts. -/// -/// Also relies on `WarpResult` custom wrapper, -/// and thus any module using this macro should also reexport -/// the former, as well as some types from `warp` (see example). -/// -/// # Panics: -/// 1) When provided with neither a string nor integer literal. -/// 2) When any of the argument count literals are not unique. -/// 3) When the colon-separated form has spaces in the provided name. -/// -/// # Examples -/// -/// ```rust -/// use warp::{Rejection, Filter}; -/// use std::{convert::Infallible, marker::PhantomData}; -/// pub struct WarpResult(Result); -/// use iroha_torii_derive::generate_endpoints; -/// -/// // An example with arguments of both acceptable kinds. -/// // This would generate endpoints accepting functions with -/// // 2, 3, 4 and 5 arguments. The first and the last of them -/// // have the custom names provided, whereas the other two have -/// // defaults, such as `endpoint3`. -/// generate_endpoints!(3, my_endpoint: 2, 4, anotherOne: 5, ); -/// ``` -#[manyhow] -#[proc_macro] -pub fn generate_endpoints(input: TokenStream) -> Result { - let EndpointList(list) = syn::parse2(input)?; - let lazy_arg_names = (1_u8..).map(|count| { - Ident::new( - format!("__endpoint_arg_{count}").as_str(), - Span::call_site(), - ) - }); - let lazy_arg_types = (1_u8..).map(|count| { - Ident::new( - format!("__Endpoint_Arg_{count}").as_str(), - Span::call_site(), - ) - }); - let mut endpoints = Vec::new(); - - for item in list { - let (fun_name, arg_count) = match item { - EndpointItem::ArgCount(arg_count) => { - let fun_name = Ident::new(&format!("endpoint{arg_count}"), Span::call_site()); - (fun_name, arg_count) - } - EndpointItem::NameAndArgCount { - name: fun_name, - arg_count, - } => (*fun_name, arg_count), - }; - - let count = arg_count - .base10_parse::() - .expect("Already checked at parse stage"); - let arg_names = lazy_arg_names.clone().take(count).collect::>(); - let arg_types = lazy_arg_types.clone().take(count).collect::>(); - - let expanded = quote! { - #[inline] - #[allow(clippy::redundant_pub_crate)] - pub(crate) fn #fun_name < O, E, F, Fut, Fil, #( #arg_types ),* > ( - f: F, - router: Fil, - ) -> impl Filter,), Error = Rejection> + Clone - where - Fil: Filter + Clone, - F: Fn( #( #arg_types ),* ) -> Fut + Copy + Send + Sync + 'static, - Fut: std::future::Future> + Send, - #( #arg_types: Send ),* - { - router.and_then(move | #( #arg_names ),* | async move { - Ok::<_, Infallible>(WarpResult(f( #( #arg_names ),* ).await)) - }) - } - }; - - endpoints.push(expanded); - } - - Ok(quote! { - #( #endpoints )* - }) -} - -#[derive(Debug)] -struct EndpointList(Vec); - -#[derive(Debug)] -enum EndpointItem { - NameAndArgCount { arg_count: LitInt, name: Box }, - ArgCount(LitInt), -} - -impl Parse for EndpointList { - fn parse(input: ParseStream) -> syn::Result { - let items = Punctuated::::parse_terminated(input)?; - let mut seen_arg_counts = Vec::new(); - for item in &items { - match item { - EndpointItem::NameAndArgCount { arg_count, .. } - | EndpointItem::ArgCount(arg_count) => { - let curr_count = arg_count.base10_parse::()?; - if seen_arg_counts.contains(&curr_count) { - return Err(syn::Error::new_spanned( - arg_count.token(), - "argument counts for all endpoints should be distinct", - )); - } - seen_arg_counts.push(curr_count); - } - } - } - - Ok(Self(items.into_iter().collect())) - } -} - -impl Parse for EndpointItem { - fn parse(input: ParseStream) -> syn::Result { - let lookahead = input.lookahead1(); - if lookahead.peek(LitInt) { - input.parse().map(EndpointItem::ArgCount) - } else if lookahead.peek(Ident) { - let name = input.parse()?; - let _semicolon: Token![:] = input.parse()?; - let arg_count = input.parse()?; - Ok(Self::NameAndArgCount { name, arg_count }) - } else { - Err(lookahead.error()) - } - } -} diff --git a/torii/src/event.rs b/torii/src/event.rs index ee9d72757b0..258a57c79bc 100644 --- a/torii/src/event.rs +++ b/torii/src/event.rs @@ -1,12 +1,12 @@ //! Iroha is a quite dynamic system so many events can happen. //! This module contains descriptions of such an events and //! utility Iroha Special Instructions to work with them. +use axum::extract::ws::WebSocket; use futures::TryStreamExt; use iroha_data_model::events::prelude::*; use iroha_macro::error::ErrorTryFromEnum; -use warp::ws::WebSocket; -use crate::stream::{self, Sink, Stream}; +use crate::stream::{self, Sink, Stream, StreamMessage as _}; /// Type of Stream error pub type StreamError = stream::Error<>::Err>; @@ -24,7 +24,7 @@ pub enum Error { ), /// Error from provided websocket #[error("WebSocket error: {0}")] - WebSocket(#[from] warp::Error), + WebSocket(#[from] axum::Error), /// Error that occurs than `WebSocket::next()` call returns `None` #[error("Can't receive message from stream")] CantReceiveMessage, @@ -54,7 +54,8 @@ impl Consumer { /// Can fail due to timeout or without message at websocket or during decoding request #[iroha_futures::telemetry_future] pub async fn new(mut stream: WebSocket) -> Result { - let EventSubscriptionRequest(filters) = stream.recv().await?; + let EventSubscriptionRequest(filters) = + Stream::::recv(&mut stream).await?; Ok(Consumer { stream, filters }) } @@ -68,8 +69,7 @@ impl Consumer { return Ok(()); } - self.stream - .send(EventMessage(event)) + Sink::<_>::send(&mut self.stream, EventMessage(event)) .await .map_err(Into::into) } diff --git a/torii/src/lib.rs b/torii/src/lib.rs index 3ec7f69c55f..62f6f7bc47b 100644 --- a/torii/src/lib.rs +++ b/torii/src/lib.rs @@ -5,14 +5,16 @@ //! - `telemetry`: enables Status, Metrics, and API Version endpoints //! - `schema`: enables Data Model Schema endpoint -use std::{ - convert::Infallible, - fmt::{Debug, Write as _}, - net::ToSocketAddrs, - sync::Arc, -}; +use std::{fmt::Debug, net::ToSocketAddrs, sync::Arc, time::Duration}; -use futures::{stream::FuturesUnordered, StreamExt}; +use axum::{ + extract::{DefaultBodyLimit, WebSocketUpgrade}, + http::StatusCode, + response::{IntoResponse, Json, Response}, + routing::{get, post}, + Router, +}; +use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; use iroha_config::{base::util::Bytes, parameters::actual::Torii as Config}; #[cfg(feature = "telemetry")] use iroha_core::metrics::MetricsReporter; @@ -28,13 +30,14 @@ use iroha_core::{ use iroha_data_model::ChainId; use iroha_primitives::addr::SocketAddr; use iroha_torii_const::uri; -use tokio::{sync::Notify, task}; -use utils::*; -use warp::{ - http::StatusCode, - reply::{self, Json, Response}, - ws::{WebSocket, Ws}, - Filter as _, Reply, +use tokio::{net::TcpListener, sync::Notify, task}; +use tower_http::{ + timeout::TimeoutLayer, + trace::{DefaultMakeSpan, TraceLayer}, +}; +use utils::{ + body::{ClientQueryRequestExtractor, ScaleVersioned}, + Scale, }; #[macro_use] @@ -43,6 +46,8 @@ mod event; mod routing; mod stream; +const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + /// Main network handler and the only entrypoint of the Iroha. pub struct Torii { chain_id: Arc, @@ -92,171 +97,172 @@ impl Torii { /// Helper function to create router. This router can be tested without starting up an HTTP server #[allow(clippy::too_many_lines)] - fn create_api_router(&self) -> impl warp::Filter + Clone + Send { - let health_route = warp::get() - .and(warp::path(uri::HEALTH)) - .and_then(|| async { Ok::<_, Infallible>(routing::handle_health()) }); - - let get_router = warp::get().and( - warp::path(uri::CONFIGURATION) - .and(add_state!(self.kiso)) - .and_then(|kiso| async move { - Ok::<_, Infallible>(WarpResult(routing::handle_get_configuration(kiso).await)) + fn create_api_router(&self) -> axum::Router { + let router = Router::new() + .route(uri::HEALTH, get(routing::handle_health)) + .route( + uri::CONFIGURATION, + get({ + let kiso = self.kiso.clone(); + move || routing::handle_get_configuration(kiso) }), - ); + ); #[cfg(feature = "telemetry")] - let get_router = get_router - .or(warp::path(uri::STATUS) - .and(add_state!(self.metrics_reporter.clone())) - .and(warp::header::optional(warp::http::header::ACCEPT.as_str())) - .and(warp::path::tail()) - .and_then( - |metrics_reporter, accept: Option, tail| async move { - Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_status( + let router = router + .route( + &format!("{}/*tail", uri::STATUS), + get({ + let metrics_reporter = self.metrics_reporter.clone(); + move |headers: axum::http::header::HeaderMap, axum::extract::Path(tail): axum::extract::Path| { + let accept = headers.get(axum::http::header::ACCEPT); + core::future::ready(routing::handle_status( &metrics_reporter, - accept.as_ref(), - &tail, - ))) - }, - )) - .or(warp::path(uri::METRICS) - .and(add_state!(self.metrics_reporter)) - .and_then(|metrics_reporter| async move { - Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_metrics( - &metrics_reporter, - ))) - })) - .or(warp::path(uri::API_VERSION) - .and(add_state!(self.state.clone())) - .and_then(|state| async { - Ok::<_, Infallible>(routing::handle_version(state).await) - })); + accept, + Some(&tail), + )) + } + }), + ) + .route( + uri::STATUS, + get({ + let metrics_reporter = self.metrics_reporter.clone(); + move |headers: axum::http::header::HeaderMap| { + let accept = headers.get(axum::http::header::ACCEPT); + core::future::ready(routing::handle_status(&metrics_reporter, accept, None)) + } + }), + ) + .route( + uri::METRICS, + get({ + let metrics_reporter = self.metrics_reporter.clone(); + move || core::future::ready(routing::handle_metrics(&metrics_reporter)) + }), + ) + .route( + uri::API_VERSION, + get({ + let state = self.state.clone(); + move || routing::handle_version(state) + }), + ); #[cfg(feature = "schema")] - let get_router = get_router.or(warp::path(uri::SCHEMA) - .and_then(|| async { Ok::<_, Infallible>(routing::handle_schema().await) })); + let router = router.route(uri::SCHEMA, get(routing::handle_schema)); #[cfg(feature = "profiling")] - let get_router = { - // `warp` panics if there is `/` in the string given to the `warp::path` filter - // Path filter has to be boxed to have a single uniform type during iteration - let profile_router_path = uri::PROFILE - .split('/') - .skip_while(|p| p.is_empty()) - .fold(warp::any().boxed(), |path_filter, path| { - path_filter.and(warp::path(path)).boxed() - }); - - let profiling_lock = std::sync::Arc::new(tokio::sync::Mutex::new(())); - get_router.or(profile_router_path - .and(warp::query::()) - .and_then(move |params| { + let router = router.route( + uri::PROFILE, + get({ + let profiling_lock = std::sync::Arc::new(tokio::sync::Mutex::new(())); + move |axum::extract::Query(params): axum::extract::Query<_>| { let profiling_lock = Arc::clone(&profiling_lock); - async move { - Ok::<_, Infallible>( - routing::profiling::handle_profile(params, profiling_lock).await, - ) - } - })) - }; + routing::profiling::handle_profile(params, profiling_lock) + } + }), + ); - let post_router = warp::post() - .and( - endpoint4( - routing::handle_transaction, - warp::path(uri::TRANSACTION) - .and(add_state!(self.chain_id, self.queue, self.state.clone())) - .and(warp::body::content_length_limit( - self.transaction_max_content_len.get(), - )) - .and(body::versioned()), - ) - .or(endpoint3( - routing::handle_queries, - warp::path(uri::QUERY) - .and(add_state!(self.query_service, self.state.clone(),)) - .and(routing::client_query_request()), - )) - .or(endpoint2( - routing::handle_post_configuration, - warp::path(uri::CONFIGURATION) - .and(add_state!(self.kiso)) - .and(warp::body::json()), + let router = router + .route( + uri::TRANSACTION, + post({ + let chain_id = self.chain_id.clone(); + let queue = self.queue.clone(); + let state = self.state.clone(); + move |ScaleVersioned(transaction): ScaleVersioned<_>| { + routing::handle_transaction(chain_id, queue, state, transaction) + } + }) + .layer(DefaultBodyLimit::max( + self.transaction_max_content_len + .get() + .try_into() + .expect("should't exceed usize"), )), ) - .recover(|rejection| async move { body::recover_versioned(rejection) }); - - let events_ws_router = warp::path(uri::SUBSCRIPTION) - .and(add_state!(self.events)) - .and(warp::ws()) - .map(|events, ws: Ws| { - ws.on_upgrade(|this_ws| async move { - if let Err(error) = - routing::subscription::handle_subscription(events, this_ws).await - { - iroha_logger::error!(%error, "Failure during subscription"); + .route( + uri::QUERY, + post({ + let query_service = self.query_service.clone(); + let state = self.state.clone(); + move |ClientQueryRequestExtractor(query_request): ClientQueryRequestExtractor| { + routing::handle_queries(query_service, state, query_request) } - }) - }); - - // `warp` panics if there is `/` in the string given to the `warp::path` filter - // Path filter has to be boxed to have a single uniform type during iteration - let block_ws_router_path = uri::BLOCKS_STREAM - .split('/') - .skip_while(|p| p.is_empty()) - .fold(warp::any().boxed(), |path_filter, path| { - path_filter.and(warp::path(path)).boxed() - }); + }), + ) + .route( + uri::CONFIGURATION, + post({ + let kiso = self.kiso.clone(); + move |Json(config): Json<_>| routing::handle_post_configuration(kiso, config) + }), + ); - let blocks_ws_router = block_ws_router_path - .and(add_state!(self.kura)) - .and(warp::ws()) - .map(|sumeragi: Arc<_>, ws: Ws| { - ws.on_upgrade(|this_ws| async move { - if let Err(error) = routing::handle_blocks_stream(sumeragi, this_ws).await { - iroha_logger::error!(%error, "Failed to subscribe to blocks stream"); + let router = router + .route(uri::SUBSCRIPTION, get({ + let events = self.events.clone(); + move |ws: WebSocketUpgrade| { + core::future::ready( + ws.on_upgrade(|ws| async move { + if let Err(error) = + routing::subscription::handle_subscription(events, ws).await + { + iroha_logger::error!(%error, "Failure during subscription"); + } + }) + ) + } + })) + .route(uri::BLOCKS_STREAM, + post({ + let kura = self.kura.clone(); + move |ws: WebSocketUpgrade| { + core::future::ready( + ws.on_upgrade(|ws| async move { + if let Err(error) = routing::handle_blocks_stream(kura, ws).await { + iroha_logger::error!(%error, "Failed to subscribe to blocks stream"); + } + }) + ) } }) - }); - - let ws_router = events_ws_router.or(blocks_ws_router); - - warp::any() - .and( - // we want to avoid logging for the "health" endpoint. - // we have to place it **first** so that warp's trace will - // not log 404 if it doesn't find "/health" which might be placed - // **after** `.with(trace)` - health_route, ) - .or(ws_router - .or(get_router) - .or(post_router) - .with(warp::trace::request())) + ; + + router.layer(( + TraceLayer::new_for_http() + .make_span_with(DefaultMakeSpan::default().include_headers(true)), + // Graceful shutdown will wait for outstanding requests to complete. + // Add a timeout so requests don't hang forever. + TimeoutLayer::new(SERVER_SHUTDOWN_TIMEOUT), + )) } /// Start main API endpoints. /// /// # Errors /// Can fail due to listening to network or if http server fails - fn start_api(self: Arc) -> eyre::Result>> { + fn start_api(self: Arc) -> eyre::Result>>> { let torii_address = &self.address; let handles = torii_address .to_socket_addrs()? .map(|addr| { let torii = Arc::clone(&self); - let api_router = torii.create_api_router(); - let signal_fut = async move { torii.notify_shutdown.notified().await }; - // FIXME: warp panics if fails to bind! - // handle this properly, report address origin after Axum - // migration: https://github.com/hyperledger/iroha/issues/3776 - let (_, serve_fut) = - warp::serve(api_router).bind_with_graceful_shutdown(addr, signal_fut); - task::spawn(serve_fut) + let signal = async move { torii.notify_shutdown.notified().await }; + + let serve_fut = async move { + let listener = TcpListener::bind(addr).await?; + axum::serve(listener, api_router) + .with_graceful_shutdown(signal) + .await + }; + + task::spawn(serve_fut.map_err(eyre::Report::from)) }) .collect(); @@ -278,10 +284,15 @@ impl Torii { .into_iter() .collect::>() .for_each(|handle| { - if let Err(error) = handle { - iroha_logger::error!(%error, "Join handle error"); + match handle { + Err(error) => { + iroha_logger::error!(%error, "Join handle error"); + } + Ok(Err(error)) => { + iroha_logger::error!(%error, "Error while running torii"); + } + _ => {} } - futures::future::ready(()) }) .await; @@ -316,14 +327,11 @@ pub enum Error { StatusSegmentNotFound(#[source] eyre::Report), } -impl Reply for Error { +impl IntoResponse for Error { fn into_response(self) -> Response { match self { - Self::Query(err) => { - reply::with_status(utils::Scale(&err), Self::query_status_code(&err)) - .into_response() - } - _ => reply::with_status(Self::to_string(&self), self.status_code()).into_response(), + Self::Query(err) => (Self::query_status_code(&err), utils::Scale(err)).into_response(), + _ => (self.status_code(), self.to_string()).into_response(), } } } @@ -375,18 +383,6 @@ impl Error { } } } - - fn to_string(err: &dyn std::error::Error) -> String { - let mut s = "Error:\n".to_owned(); - let mut idx = 0_i32; - let mut err_opt = Some(err); - while let Some(e) = err_opt { - write!(s, " {idx}: {}", &e.to_string()).expect("Valid"); - idx += 1_i32; - err_opt = e.source() - } - s - } } /// Result type diff --git a/torii/src/routing.rs b/torii/src/routing.rs index 87c2381673e..daf08354fe7 100644 --- a/torii/src/routing.rs +++ b/torii/src/routing.rs @@ -2,9 +2,7 @@ //! Iroha you should add it here by creating a `handle_*` function, //! and add it to impl Torii. -// FIXME: This can't be fixed, because one trait in `warp` is private. -#![allow(opaque_hidden_inferred_bound)] - +use axum::extract::ws::WebSocket; #[cfg(feature = "telemetry")] use eyre::{eyre, WrapErr}; use futures::TryStreamExt; @@ -16,41 +14,24 @@ use iroha_data_model::{ SignedBlock, }, prelude::*, - query::{cursor::ForwardCursor, http, QueryOutputBox, QueryRequest}, + query::{http, QueryOutputBox, QueryRequest}, BatchedResponse, }; #[cfg(feature = "telemetry")] use iroha_telemetry::metrics::Status; +use stream::StreamMessage as _; use tokio::task; use super::*; use crate::stream::{Sink, Stream}; -/// Filter for warp which extracts [`http::ClientQueryRequest`] -pub fn client_query_request( -) -> impl warp::Filter + Copy { - body::versioned::() - .and_then(|signed_query| async move { - Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::query(signed_query)) - }) - .or(cursor().and_then(|cursor| async move { - Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::cursor(cursor)) - })) - .unify() -} - -/// Filter for warp which extracts cursor -fn cursor() -> impl warp::Filter + Copy { - warp::query() -} - #[iroha_futures::telemetry_future] pub async fn handle_transaction( chain_id: Arc, queue: Arc, state: Arc, transaction: SignedTransaction, -) -> Result { +) -> Result<()> { let state_view = state.view(); let transaction_limits = state_view.config.transaction_limits; let transaction = AcceptedTransaction::accept(transaction, &chain_id, transaction_limits) @@ -66,7 +47,6 @@ pub async fn handle_transaction( Box::new(err) }) .map_err(Error::PushIntoQueue) - .map(|()| Empty) } #[iroha_futures::telemetry_future] @@ -97,37 +77,42 @@ pub async fn handle_queries( .map_err(Into::into) } +/// Health status #[derive(serde::Serialize)] #[non_exhaustive] -enum Health { +pub enum Health { Healthy, } -pub fn handle_health() -> Json { - reply::json(&Health::Healthy) +pub async fn handle_health() -> Json { + Json(Health::Healthy) } #[iroha_futures::telemetry_future] #[cfg(feature = "schema")] -pub async fn handle_schema() -> Json { - reply::json(&iroha_schema_gen::build_schemas()) +pub async fn handle_schema() -> Json { + Json(iroha_schema_gen::build_schemas()) } #[iroha_futures::telemetry_future] -pub async fn handle_get_configuration(kiso: KisoHandle) -> Result { +pub async fn handle_get_configuration(kiso: KisoHandle) -> Result> { let dto = kiso.get_dto().await?; - Ok(reply::json(&dto)) + Ok(Json(dto)) } #[iroha_futures::telemetry_future] -pub async fn handle_post_configuration(kiso: KisoHandle, value: ConfigDTO) -> Result { +pub async fn handle_post_configuration( + kiso: KisoHandle, + value: ConfigDTO, +) -> Result { kiso.update_with_dto(value).await?; - Ok(reply::with_status(reply::reply(), StatusCode::ACCEPTED)) + Ok((StatusCode::ACCEPTED, ())) } #[iroha_futures::telemetry_future] pub async fn handle_blocks_stream(kura: Arc, mut stream: WebSocket) -> eyre::Result<()> { - let BlockSubscriptionRequest(mut from_height) = stream.recv().await?; + let BlockSubscriptionRequest(mut from_height) = + Stream::::recv(&mut stream).await?; let mut interval = tokio::time::interval(std::time::Duration::from_millis(10)); loop { @@ -154,10 +139,8 @@ pub async fn handle_blocks_stream(kura: Arc, mut stream: WebSocket) -> eyr // This branch sends blocks _ = interval.tick() => { if let Some(block) = kura.get_block_by_height(from_height.try_into().expect("INTERNAL BUG: Number of blocks exceeds usize::MAX")) { - stream - // TODO: to avoid clone `BlockMessage` could be split into sending and receiving parts - .send(BlockMessage(SignedBlock::clone(&block))) - .await?; + // TODO: to avoid clone `BlockMessage` could be split into sending and receiving parts + Sink::::send(&mut stream, BlockMessage(SignedBlock::clone(&block))).await?; from_height = from_height.checked_add(1).expect("Maximum block height is achieved."); } } @@ -182,7 +165,7 @@ pub mod subscription { /// Event reception error Event(#[from] tokio::sync::broadcast::error::RecvError), /// `WebSocket` error - WebSocket(#[from] warp::Error), + WebSocket(#[from] axum::Error), /// A `Close` message is received. Not strictly an Error CloseMessage, } @@ -206,9 +189,6 @@ pub mod subscription { /// /// Subscribes `stream` for `events` filtered by filter that is /// received through the `stream` - /// - /// There should be a [`warp::filters::ws::Message::close()`] - /// message to end subscription #[iroha_futures::telemetry_future] pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { let mut consumer = event::Consumer::new(stream).await?; @@ -250,7 +230,7 @@ pub mod subscription { #[iroha_futures::telemetry_future] #[cfg(feature = "telemetry")] -pub async fn handle_version(state: Arc) -> Json { +pub async fn handle_version(state: Arc) -> Json { use iroha_version::Version; let state_view = state.view(); @@ -259,7 +239,8 @@ pub async fn handle_version(state: Arc) -> Json { .expect("Genesis not applied. Nothing we can do. Solve the issue and rerun.") .version() .to_string(); - reply::json(&string) + + Json(string) } #[cfg(feature = "telemetry")] @@ -282,22 +263,15 @@ pub fn handle_metrics(metrics_reporter: &MetricsReporter) -> Result { #[allow(clippy::unnecessary_wraps)] pub fn handle_status( metrics_reporter: &MetricsReporter, - accept: Option>, - tail: &warp::path::Tail, + accept: Option>, + tail: Option<&str>, ) -> Result { use eyre::ContextCompat; update_metrics_gracefully(metrics_reporter); let status = Status::from(&metrics_reporter.metrics()); - let tail = tail.as_str(); - if tail.is_empty() { - if accept.is_some_and(|x| x.as_ref() == PARITY_SCALE_MIME_TYPE) { - Ok(Scale(status).into_response()) - } else { - Ok(reply::json(&status).into_response()) - } - } else { + if let Some(tail) = tail { // TODO: This probably can be optimised to elide the full // structure. Ideally there should remain a list of fields and // field aliases somewhere in `serde` macro output, which can @@ -312,9 +286,13 @@ pub fn handle_status( .try_fold(&value, serde_json::Value::get) .wrap_err_with(|| eyre!("Path not found: \"{}\"", tail)) .map_err(Error::StatusSegmentNotFound) - .map(|segment| reply::json(segment).into_response())?; + .map(|segment| Json(segment).into_response())?; Ok(reply) + } else if accept.is_some_and(|x| x.as_ref() == utils::PARITY_SCALE_MIME_TYPE.as_bytes()) { + Ok(Scale(status).into_response()) + } else { + Ok(Json(status).into_response()) } } diff --git a/torii/src/stream.rs b/torii/src/stream.rs index 7c34599c977..b0dd881d7bf 100644 --- a/torii/src/stream.rs +++ b/torii/src/stream.rs @@ -4,6 +4,7 @@ use core::{result::Result, time::Duration}; +use axum::extract::ws::Message; use futures::{SinkExt, StreamExt}; use iroha_version::prelude::*; use parity_scale_codec::DecodeAll; @@ -43,11 +44,8 @@ pub trait StreamMessage { /// Construct new binary message fn binary(source: Vec) -> Self; - /// Decodes the message into byte slice - fn as_bytes(&self) -> &[u8]; - - /// Returns `true` if the message is binary - fn is_binary(&self) -> bool; + /// Check if message is binary and if so return payload + fn try_binary(self) -> Option>; /// Returns `true` if it's a closing message fn is_close(&self) -> bool; @@ -100,64 +98,43 @@ pub trait Stream: return Err(Error::CloseMessage); } - if !subscription_request_message.is_binary() { - return Err(Error::NonBinaryMessage); + if let Some(binary) = subscription_request_message.try_binary() { + Ok(R::decode_all(&mut binary.as_slice())?) + } else { + Err(Error::NonBinaryMessage) } - - Ok(R::decode_all(&mut subscription_request_message.as_bytes())?) } } -impl StreamMessage for warp::ws::Message { +impl StreamMessage for axum::extract::ws::Message { fn binary(source: Vec) -> Self { - warp::ws::Message::binary(source) - } - - fn as_bytes(&self) -> &[u8] { - self.as_bytes() + axum::extract::ws::Message::Binary(source) } - fn is_binary(&self) -> bool { - self.is_binary() + fn try_binary(self) -> Option> { + if let Message::Binary(binary) = self { + Some(binary) + } else { + None + } } fn is_close(&self) -> bool { - self.is_close() + matches!(self, axum::extract::ws::Message::Close(_)) } } #[async_trait::async_trait] -impl Sink for warp::ws::WebSocket +impl Sink for axum::extract::ws::WebSocket where M: Encode + Send + Sync + 'static, { - type Err = warp::Error; - type Message = warp::ws::Message; + type Err = axum::Error; + type Message = axum::extract::ws::Message; } #[async_trait::async_trait] -impl Stream for warp::ws::WebSocket { - type Err = warp::Error; - type Message = warp::ws::Message; -} - -#[cfg(test)] -mod ws_client { - use warp::test::WsClient; - - use super::*; - - #[async_trait::async_trait] - impl Stream for WsClient { - type Err = warp::test::WsError; - type Message = warp::ws::Message; - } - #[async_trait::async_trait] - impl Sink for WsClient - where - M: Encode + Send + Sync + 'static, - { - type Err = warp::test::WsError; - type Message = warp::ws::Message; - } +impl Stream for axum::extract::ws::WebSocket { + type Err = axum::Error; + type Message = axum::extract::ws::Message; } diff --git a/torii/src/utils.rs b/torii/src/utils.rs index eb7a5e30d7b..cdadaa10c54 100644 --- a/torii/src/utils.rs +++ b/torii/src/utils.rs @@ -1,22 +1,9 @@ -use std::convert::Infallible; - -use iroha_version::prelude::*; -use warp::{ +use axum::{ http::{header::CONTENT_TYPE, HeaderValue}, - hyper::body::Bytes, - reply::Response, - Filter, Rejection, Reply, + response::{IntoResponse, Response}, }; - -/// Structure for empty response body -#[derive(Clone, Copy)] -pub struct Empty; - -impl Reply for Empty { - fn into_response(self) -> Response { - Response::new(vec![].into()) - } -} +use iroha_data_model::query::http::{ClientQueryRequest, SignedQuery}; +use iroha_version::prelude::*; /// MIME used in Torii for SCALE encoding // note: no elegant way to associate it with generic `Scale` @@ -26,7 +13,7 @@ pub const PARITY_SCALE_MIME_TYPE: &'_ str = "application/x-parity-scale"; #[derive(Debug)] pub struct Scale(pub T); -impl Reply for Scale { +impl IntoResponse for Scale { fn into_response(self) -> Response { let mut res = Response::new(self.0.encode().into()); res.headers_mut().insert( @@ -37,48 +24,73 @@ impl Reply for Scale { } } -/// Adds state to filter -macro_rules! add_state { - ( $( $state : expr ),* $(,)? ) => { - warp::any().map({ - let state = ($( $state.clone(), )*); - move || state.clone() - }).untuple_one() - } -} - pub mod body { - use iroha_version::error::Error as VersionError; + use axum::{ + async_trait, + body::Bytes, + extract::{FromRequest, FromRequestParts, Query, Request}, + }; + use iroha_data_model::query::cursor::ForwardCursor; use super::*; - /// Decode body as versioned scale codec - pub fn versioned() -> impl Filter + Copy + /// Extractor of scale encoded versioned data from body + #[derive(Clone, Copy, Debug)] + pub struct ScaleVersioned(pub T); + + #[async_trait] + impl FromRequest for ScaleVersioned + where + Bytes: FromRequest, + S: Send + Sync, + T: DecodeVersioned, { - warp::body::bytes().and_then(|body: Bytes| async move { - T::decode_all_versioned(body.as_ref()).map_err(warp::reject::custom) - }) - } + type Rejection = Response; - /// Recover from failure in `versioned` - pub fn recover_versioned(rejection: Rejection) -> Result { - rejection - .find::() - .map(warp::Reply::into_response) - .ok_or(rejection) + async fn from_request(req: Request, state: &S) -> Result { + let body = Bytes::from_request(req, state) + .await + .map_err(IntoResponse::into_response)?; + + T::decode_all_versioned(&body) + .map(ScaleVersioned) + .map_err(|err| { + ( + axum::http::StatusCode::BAD_REQUEST, + format!("Transaction Rejected (Malformed), Reason : '{err}'"), + ) + .into_response() + }) + } } -} -/// Warp result response type -pub struct WarpResult(pub Result); + /// Extractor for [`ClientQueryRequest`] + /// + /// First try to deserialize body as [`SignedQuery`] if fail try to parse query parameters for [`ForwardCursor`] values + #[derive(Clone, Debug)] + pub struct ClientQueryRequestExtractor(pub ClientQueryRequest); + + #[async_trait] + impl FromRequest for ClientQueryRequestExtractor + where + Bytes: FromRequest, + S: Send + Sync, + { + type Rejection = Response; -impl Reply for WarpResult { - fn into_response(self) -> warp::reply::Response { - match self { - Self(Ok(ok)) => ok.into_response(), - Self(Err(err)) => err.into_response(), + async fn from_request(req: Request, state: &S) -> Result { + let (mut parts, body) = req.into_parts(); + let cursor = Query::::from_request_parts(&mut parts, &state) + .await + .map(|Query(cursor)| ClientQueryRequest::cursor(cursor)); + let req = Request::from_parts(parts, body); + ScaleVersioned::::from_request(req, state) + .await + .map(|ScaleVersioned(query)| ClientQueryRequest::query(query)) + .or(cursor) + // TODO: custom error to show that neither SignedQuery nor ForwardCursor + .map_err(IntoResponse::into_response) + .map(ClientQueryRequestExtractor) } } } - -iroha_torii_derive::generate_endpoints!(2, 3, 4, 5, 6, 7); diff --git a/version/Cargo.toml b/version/Cargo.toml index 89b396ff8a9..3a835e1305d 100644 --- a/version/Cargo.toml +++ b/version/Cargo.toml @@ -21,8 +21,6 @@ derive = ["iroha_version_derive"] scale = ["parity-scale-codec/full"] # Support JSON (de)serialisation json = ["serde", "serde_json"] -# Enable API for HTTP requests. Should be activated for HTTP clients -http = ["std", "warp"] [dependencies] iroha_version_derive = { workspace = true, optional = true } @@ -32,7 +30,6 @@ parity-scale-codec = { workspace = true, optional = true, features = ["derive"] serde_json = { workspace = true, optional = true, features = ["alloc"] } serde = { workspace = true, optional = true, features = ["derive"] } thiserror = { workspace = true, optional = true } -warp = { workspace = true, optional = true } [dev-dependencies] iroha_data_model = { workspace = true } diff --git a/version/src/lib.rs b/version/src/lib.rs index 8bfbe1f1fea..a0887da1c6a 100644 --- a/version/src/lib.rs +++ b/version/src/lib.rs @@ -109,28 +109,6 @@ pub mod error { } } - #[cfg(feature = "http")] - impl Error { - /// Returns status code for this error - #[allow(clippy::unused_self)] - pub const fn status_code(&self) -> warp::http::StatusCode { - warp::http::StatusCode::BAD_REQUEST - } - } - - #[cfg(feature = "http")] - impl warp::Reply for &Error { - fn into_response(self) -> warp::reply::Response { - warp::reply::with_status( - format!("Transaction Rejected (Malformed), Reason : '{self}'"), - self.status_code(), - ) - .into_response() - } - } - #[cfg(feature = "http")] - impl warp::reject::Reject for Error {} - /// Result type for versioning pub type Result = core::result::Result; }