diff --git a/Cargo.lock b/Cargo.lock index 8769064b75a..d6c91ef7d6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3630,6 +3630,7 @@ dependencies = [ "thiserror", "tokio", "tower-http", + "tungstenite", ] [[package]] diff --git a/crates/iroha/src/client.rs b/crates/iroha/src/client.rs index dfcf644b0d3..9f165637f58 100644 --- a/crates/iroha/src/client.rs +++ b/crates/iroha/src/client.rs @@ -716,14 +716,23 @@ pub mod stream_api { impl Drop for SyncIterator { fn drop(&mut self) { let mut close = || -> eyre::Result<()> { - self.stream.close(None)?; - let msg = self.stream.read()?; - if !msg.is_close() { - return Err(eyre!( - "Server hasn't sent `Close` message for websocket handshake" - )); + match self.stream.close(None) { + Ok(()) => {} + Err(WebSocketError::ConnectionClosed | WebSocketError::AlreadyClosed) => { + return Ok(()) + } + Err(error) => Err(error)?, + } + // NOTE: drive close handshake to completion + loop { + match self.stream.read() { + Ok(_) => {} + Err(WebSocketError::ConnectionClosed | WebSocketError::AlreadyClosed) => { + return Ok(()) + } + Err(error) => Err(error)?, + } } - Ok(()) }; trace!("Closing WebSocket connection"); @@ -776,17 +785,12 @@ pub mod stream_api { /// - Closing the websocket connection itself fails. pub async fn close(mut self) { let close = async { - self.stream.close(None).await?; - if let Some(msg) = self.stream.next().await { - if !msg?.is_close() { - eyre::bail!("Server hasn't sent `Close` message for websocket handshake"); - } - } + <_ as SinkExt<_>>::close(&mut self.stream).await?; Ok(()) }; trace!("Closing WebSocket connection"); - let _ = close.await.map_err(|e| error!(%e)); + let _ = close.await.map_err(|e: eyre::Report| error!(%e)); trace!("WebSocket connection closed"); } } @@ -798,13 +802,15 @@ pub mod stream_api { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - match futures_util::ready!(self.stream.poll_next_unpin(cx)) { - Some(Ok(WebSocketMessage::Binary(message))) => { - std::task::Poll::Ready(Some(self.handler.message(message))) - } - Some(Ok(_)) => std::task::Poll::Pending, - Some(Err(err)) => std::task::Poll::Ready(Some(Err(err.into()))), - None => std::task::Poll::Ready(None), + loop { + break match futures_util::ready!(self.stream.poll_next_unpin(cx)) { + Some(Ok(WebSocketMessage::Binary(message))) => { + std::task::Poll::Ready(Some(self.handler.message(message))) + } + Some(Ok(_)) => continue, + Some(Err(err)) => std::task::Poll::Ready(Some(Err(err.into()))), + None => std::task::Poll::Ready(None), + }; } } } diff --git a/crates/iroha_torii/Cargo.toml b/crates/iroha_torii/Cargo.toml index 7a541e7fb86..54f1a870ba7 100644 --- a/crates/iroha_torii/Cargo.toml +++ b/crates/iroha_torii/Cargo.toml @@ -55,6 +55,7 @@ parity-scale-codec = { workspace = true, features = ["derive"] } pprof = { git = " https://github.com/Erigara/pprof-rs", branch = "fix_pointer_align", optional = true, default-features = false, features = ["protobuf-codec", "frame-pointer", "cpp"] } nonzero_ext = { workspace = true } pretty-error-debug = "0.3.0" +tungstenite = { workspace = true } [dev-dependencies] http-body-util = "0.1.2" diff --git a/crates/iroha_torii/src/block.rs b/crates/iroha_torii/src/block.rs new file mode 100644 index 00000000000..89c42e60abc --- /dev/null +++ b/crates/iroha_torii/src/block.rs @@ -0,0 +1,74 @@ +use std::{num::NonZeroU64, sync::Arc}; + +use iroha_core::kura::Kura; +use iroha_data_model::block::{ + stream::{BlockMessage, BlockSubscriptionRequest}, + SignedBlock, +}; + +use crate::stream::{self, WebSocketScale}; + +/// Type of error for `Consumer` +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Error from provided stream/websocket + #[error("Stream error: {0}")] + Stream(Box), +} + +impl From for Error { + fn from(error: stream::Error) -> Self { + Self::Stream(Box::new(error)) + } +} + +/// Result type for `Consumer` +pub type Result = core::result::Result; + +/// Consumer for Iroha `Block`(s). +/// Passes the blocks over the corresponding connection `stream`. +#[derive(Debug)] +pub struct Consumer<'ws> { + pub stream: &'ws mut WebSocketScale, + height: NonZeroU64, + kura: Arc, +} + +impl<'ws> Consumer<'ws> { + /// Constructs [`Consumer`], which forwards blocks through the `stream`. + /// + /// # Errors + /// Can fail due to timeout or without message at websocket or during decoding request + #[iroha_futures::telemetry_future] + pub async fn new(stream: &'ws mut WebSocketScale, kura: Arc) -> Result { + let BlockSubscriptionRequest(height) = stream.recv().await?; + Ok(Consumer { + stream, + height, + kura, + }) + } + + /// Forwards block if block for given height already exists + /// + /// # Errors + /// Can fail due to timeout. Also receiving might fail + #[iroha_futures::telemetry_future] + pub async fn consume(&mut self) -> Result<()> { + if let Some(block) = self.kura.get_block_by_height( + self.height + .try_into() + .expect("INTERNAL BUG: Number of blocks exceeds usize::MAX"), + ) { + // TODO: to avoid clone `BlockMessage` could be split into sending and receiving parts + self.stream + .send(BlockMessage(SignedBlock::clone(&block))) + .await?; + self.height = self + .height + .checked_add(1) + .expect("Maximum block height is achieved."); + } + Ok(()) + } +} diff --git a/crates/iroha_torii/src/event.rs b/crates/iroha_torii/src/event.rs index 258a57c79bc..ff715cc56bc 100644 --- a/crates/iroha_torii/src/event.rs +++ b/crates/iroha_torii/src/event.rs @@ -1,37 +1,21 @@ //! Iroha is a quite dynamic system so many events can happen. //! This module contains descriptions of such an events and //! utility Iroha Special Instructions to work with them. -use axum::extract::ws::WebSocket; -use futures::TryStreamExt; -use iroha_data_model::events::prelude::*; -use iroha_macro::error::ErrorTryFromEnum; -use crate::stream::{self, Sink, Stream, StreamMessage as _}; +use iroha_data_model::events::prelude::*; -/// Type of Stream error -pub type StreamError = stream::Error<>::Err>; +use crate::stream::{self, WebSocketScale}; /// Type of error for `Consumer` #[derive(thiserror::Error, Debug)] pub enum Error { /// Error from provided stream/websocket #[error("Stream error: {0}")] - Stream(Box), - /// Error from converting received message to filter - #[error("Can't retrieve subscription filter: {0}")] - CantRetrieveSubscriptionFilter( - #[from] ErrorTryFromEnum, - ), - /// Error from provided websocket - #[error("WebSocket error: {0}")] - WebSocket(#[from] axum::Error), - /// Error that occurs than `WebSocket::next()` call returns `None` - #[error("Can't receive message from stream")] - CantReceiveMessage, + Stream(Box), } -impl From for Error { - fn from(error: StreamError) -> Self { +impl From for Error { + fn from(error: stream::Error) -> Self { Self::Stream(Box::new(error)) } } @@ -42,20 +26,19 @@ pub type Result = core::result::Result; /// Consumer for Iroha `Event`(s). /// Passes the events over the corresponding connection `stream` if they match the `filter`. #[derive(Debug)] -pub struct Consumer { - stream: WebSocket, +pub struct Consumer<'ws> { + pub stream: &'ws mut WebSocketScale, filters: Vec, } -impl Consumer { +impl<'ws> Consumer<'ws> { /// Constructs [`Consumer`], which consumes `Event`s and forwards it through the `stream`. /// /// # Errors /// Can fail due to timeout or without message at websocket or during decoding request #[iroha_futures::telemetry_future] - pub async fn new(mut stream: WebSocket) -> Result { - let EventSubscriptionRequest(filters) = - Stream::::recv(&mut stream).await?; + pub async fn new(stream: &'ws mut WebSocketScale) -> Result { + let EventSubscriptionRequest(filters) = stream.recv::().await?; Ok(Consumer { stream, filters }) } @@ -69,30 +52,9 @@ impl Consumer { return Ok(()); } - Sink::<_>::send(&mut self.stream, EventMessage(event)) + self.stream + .send(EventMessage(event)) .await .map_err(Into::into) } - - /// Listen for `Close` message in loop - /// - /// # Errors - /// Can fail if can't receive message from stream for some reason - pub async fn stream_closed(&mut self) -> Result<()> { - while let Some(message) = self.stream.try_next().await? { - if message.is_close() { - return Ok(()); - } - iroha_logger::warn!("Unexpected message received: {:?}", message); - } - Err(Error::CantReceiveMessage) - } - - /// Close stream. See [`WebSocket::close()`] - /// - /// # Errors - /// Throws up [`WebSocket::close()`] errors - pub async fn close_stream(self) -> Result<()> { - self.stream.close().await.map_err(Into::into) - } } diff --git a/crates/iroha_torii/src/lib.rs b/crates/iroha_torii/src/lib.rs index 2ff9261d9b4..693788b0562 100644 --- a/crates/iroha_torii/src/lib.rs +++ b/crates/iroha_torii/src/lib.rs @@ -47,6 +47,7 @@ use utils::{ #[macro_use] pub(crate) mod utils; +mod block; mod event; mod routing; mod stream; @@ -208,7 +209,7 @@ impl Torii { move |ws: WebSocketUpgrade| { core::future::ready(ws.on_upgrade(|ws| async move { if let Err(error) = - routing::subscription::handle_subscription(events, ws).await + routing::event::handle_events_stream(events, ws).await { iroha_logger::error!(%error, "Failure during event streaming"); } @@ -222,7 +223,8 @@ impl Torii { let kura = self.kura.clone(); move |ws: WebSocketUpgrade| { core::future::ready(ws.on_upgrade(|ws| async move { - if let Err(error) = routing::handle_blocks_stream(kura, ws).await { + if let Err(error) = routing::block::handle_blocks_stream(kura, ws).await + { iroha_logger::error!(%error, "Failure during block streaming"); } })) diff --git a/crates/iroha_torii/src/routing.rs b/crates/iroha_torii/src/routing.rs index 5881f4c2a2f..0bb9415862c 100644 --- a/crates/iroha_torii/src/routing.rs +++ b/crates/iroha_torii/src/routing.rs @@ -5,24 +5,18 @@ use axum::extract::ws::WebSocket; #[cfg(feature = "telemetry")] use eyre::{eyre, WrapErr}; -use futures::TryStreamExt; use iroha_config::client_api::ConfigDTO; use iroha_core::{query::store::LiveQueryStoreHandle, smartcontracts::query::ValidQueryRequest}; use iroha_data_model::{ - block::{ - stream::{BlockMessage, BlockSubscriptionRequest}, - SignedBlock, - }, + self, prelude::*, query::{QueryRequestWithAuthority, QueryResponse, SignedQuery}, }; #[cfg(feature = "telemetry")] use iroha_telemetry::metrics::Status; -use stream::StreamMessage as _; use tokio::task; use super::*; -use crate::stream::{Sink, Stream}; #[iroha_futures::telemetry_future] pub async fn handle_transaction( @@ -103,75 +97,99 @@ pub async fn handle_post_configuration( Ok((StatusCode::ACCEPTED, ())) } -#[iroha_futures::telemetry_future] -pub async fn handle_blocks_stream(kura: Arc, mut stream: WebSocket) -> eyre::Result<()> { - let BlockSubscriptionRequest(mut from_height) = - Stream::::recv(&mut stream).await?; - - let mut interval = tokio::time::interval(std::time::Duration::from_millis(10)); - loop { - // FIXME: cleanup. - - tokio::select! { - // This branch catches `Close` and unexpected messages - closed = async { - while let Some(message) = stream.try_next().await? { - if message.is_close() { - return Ok(()); - } - iroha_logger::warn!(?message, "Unexpected message received"); - } - eyre::bail!("Can't receive close message") - } => { - match closed { - Ok(()) => { - return stream.close().await.map_err(Into::into); - } - Err(err) => return Err(err) - } +pub mod block { + //! Blocks stream handler + + use stream::WebSocketScale; + + use super::*; + use crate::block; + + /// Type for any error during blocks streaming + #[derive(Debug, displaydoc::Display, thiserror::Error)] + enum Error { + /// Block consumption resulted in an error: {_0} + Consumer(#[from] Box), + /// Event reception error + Event(#[from] tokio::sync::broadcast::error::RecvError), + /// Connection is closed + Close, + } + + impl From for Error { + fn from(error: block::Error) -> Self { + match error { + block::Error::Stream(err) if matches!(*err, stream::Error::Closed) => Self::Close, + error => Self::Consumer(Box::new(error)), + } + } + } + + type Result = core::result::Result; + + #[iroha_futures::telemetry_future] + pub async fn handle_blocks_stream(kura: Arc, stream: WebSocket) -> eyre::Result<()> { + let mut stream = WebSocketScale(stream); + let init_and_subscribe = async { + let mut consumer = block::Consumer::new(&mut stream, kura).await?; + subscribe_forever(&mut consumer).await + }; + + match init_and_subscribe.await { + Ok(()) => stream.close().await.map_err(Into::into), + Err(Error::Close) => Ok(()), + Err(err) => { + // NOTE: try close websocket and return initial error + let _ = stream.close().await; + Err(err.into()) } - // This branch sends blocks - _ = interval.tick() => { - if let Some(block) = kura.get_block_by_height(from_height.try_into().expect("INTERNAL BUG: Number of blocks exceeds usize::MAX")) { - // TODO: to avoid clone `BlockMessage` could be split into sending and receiving parts - Sink::::send(&mut stream, BlockMessage(SignedBlock::clone(&block))).await?; - from_height = from_height.checked_add(1).expect("Maximum block height is achieved."); + } + } + + /// Make endless `consumer` subscription for `blocks` + /// + /// Ideally should return `Result` cause it either runs forever or returns error + async fn subscribe_forever(consumer: &mut block::Consumer<'_>) -> Result<()> { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(10)); + loop { + tokio::select! { + // Wait for stream to be closed by client + closed = consumer.stream.closed() => { + match closed { + Ok(()) => return Err(Error::Close), + Err(err) => return Err(block::Error::from(err).into()) + } } + // This branch sends blocks + _ = interval.tick() => consumer.consume().await?, } - // Else branch to prevent panic i.e. I don't know what - // this does. - else => () } } } -pub mod subscription { - //! Contains the `handle_subscription` functions and used for general routing. +pub mod event { + //! Events stream handler + + use stream::WebSocketScale; use super::*; use crate::event; - /// Type for any error during subscription handling + /// Type for any error during events streaming #[derive(Debug, displaydoc::Display, thiserror::Error)] enum Error { - /// Event consumption resulted in an error + /// Event consumption resulted in an error: {_0} Consumer(#[from] Box), /// Event reception error Event(#[from] tokio::sync::broadcast::error::RecvError), - /// `WebSocket` error - WebSocket(#[from] axum::Error), - /// A `Close` message is received. Not strictly an Error - CloseMessage, + /// Connection is closed + Close, } impl From for Error { fn from(error: event::Error) -> Self { match error { - event::Error::Stream(box_err) - if matches!(*box_err, event::StreamError::CloseMessage) => - { - Self::CloseMessage - } + event::Error::Stream(err) if matches!(*err, stream::Error::Closed) => Self::Close, error => Self::Consumer(Box::new(error)), } } @@ -179,34 +197,43 @@ pub mod subscription { type Result = core::result::Result; - /// Handle subscription request - /// /// Subscribes `stream` for `events` filtered by filter that is /// received through the `stream` #[iroha_futures::telemetry_future] - pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { - let mut consumer = event::Consumer::new(stream).await?; - - match subscribe_forever(events, &mut consumer).await { - Ok(()) | Err(Error::CloseMessage) => consumer.close_stream().await.map_err(Into::into), - Err(err) => Err(err.into()), + pub async fn handle_events_stream(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { + let mut stream = WebSocketScale(stream); + let init_and_subscribe = async { + let mut consumer = event::Consumer::new(&mut stream).await?; + subscribe_forever(events, &mut consumer).await + }; + + match init_and_subscribe.await { + Ok(()) => stream.close().await.map_err(Into::into), + Err(Error::Close) => Ok(()), + Err(err) => { + // NOTE: try close websocket and return initial error + let _ = stream.close().await; + Err(err.into()) + } } } /// Make endless `consumer` subscription for `events` /// - /// Ideally should return `Result` cause it either runs forever - /// either returns `Err` variant - async fn subscribe_forever(events: EventsSender, consumer: &mut event::Consumer) -> Result<()> { + /// Ideally should return `Result` cause it either runs forever or returns error + async fn subscribe_forever( + events: EventsSender, + consumer: &mut event::Consumer<'_>, + ) -> Result<()> { let mut events = events.subscribe(); loop { tokio::select! { - // This branch catches `Close` and unexpected messages - closed = consumer.stream_closed() => { + // Wait for stream to be closed by client + closed = consumer.stream.closed() => { match closed { - Ok(()) => return Err(Error::CloseMessage), - Err(err) => return Err(err.into()) + Ok(()) => return Err(Error::Close), + Err(err) => return Err(event::Error::from(err).into()) } } // This branch catches and sends events @@ -215,8 +242,6 @@ pub mod subscription { iroha_logger::trace!(?event); consumer.consume(event).await?; } - // Else branch to prevent panic - else => () } } } diff --git a/crates/iroha_torii/src/stream.rs b/crates/iroha_torii/src/stream.rs index b0dd881d7bf..d627968d03d 100644 --- a/crates/iroha_torii/src/stream.rs +++ b/crates/iroha_torii/src/stream.rs @@ -1,10 +1,8 @@ -//! Extension to the [`futures::StreamExt`] and [`futures::SinkExt`]. -//! Adds support for sending custom Iroha messages over the stream, taking care -//! of encoding/decoding as well as timeouts +//! Adds support for sending/receiving custom Iroha messages over the WebSocket use core::{result::Result, time::Duration}; -use axum::extract::ws::Message; +use axum::extract::ws::{Message, WebSocket}; use futures::{SinkExt, StreamExt}; use iroha_version::prelude::*; use parity_scale_codec::DecodeAll; @@ -17,124 +15,97 @@ const TIMEOUT: Duration = Duration::from_millis(1000); /// Error type with generic for actual Stream/Sink error type #[derive(Debug, displaydoc::Display, thiserror::Error)] #[ignore_extra_doc_attributes] -pub enum Error -where - InternalStreamError: std::error::Error + Send + Sync + 'static, -{ +pub enum Error { /// Read message timeout ReadTimeout, /// Send message timeout SendTimeout, - /// An empty message was received - NoMessage, - /// Error in internal stream representation (typically `WebSocket`) - /// - /// Made without `from` macro because it will break `IrohaVersion` variant conversion - InternalStream(#[source] InternalStreamError), - /// `Close` message received - CloseMessage, - /// Unexpected non-binary message received - NonBinaryMessage, + /// WebSocket error: {_0} + WebSocket(#[source] axum::Error), /// Error during versioned message decoding Decode(#[from] parity_scale_codec::Error), + /// Connection is closed + Closed, } -/// Represents message used by the stream -pub trait StreamMessage { - /// Construct new binary message - fn binary(source: Vec) -> Self; +/// Wrapper to send/receive scale encoded messages +#[derive(Debug)] +pub struct WebSocketScale(pub(crate) WebSocket); - /// Check if message is binary and if so return payload - fn try_binary(self) -> Option>; - - /// Returns `true` if it's a closing message - fn is_close(&self) -> bool; -} - -/// Trait for writing custom messages into stream -#[async_trait::async_trait] -pub trait Sink: SinkExt + Unpin -where - S: Encode + Send + Sync + 'static, -{ - /// Error type returned by the sink - type Err: std::error::Error + Send + Sync + 'static; - - /// Message type used by the underlying sink - type Message: StreamMessage + Send; - - /// Encoded message and sends it to the stream - async fn send(&mut self, message: S) -> Result<(), Error> { - tokio::time::timeout( - TIMEOUT, - >::send(self, Self::Message::binary(message.encode())), - ) - .await - .map_err(|_err| Error::SendTimeout)? - .map_err(Error::InternalStream) - } -} - -/// Trait for reading custom messages from stream -#[async_trait::async_trait] -pub trait Stream: - StreamExt> + Unpin -{ - /// Error type returned by the stream - type Err: std::error::Error + Send + Sync + 'static; - - /// Message type used by the underlying stream - type Message: StreamMessage; - - /// Receives and decodes message from the stream - async fn recv(&mut self) -> Result> { - let subscription_request_message = tokio::time::timeout(TIMEOUT, self.next()) +impl WebSocketScale { + /// Send message encoded in scale + pub async fn send(&mut self, message: M) -> Result<(), Error> { + tokio::time::timeout(TIMEOUT, self.0.send(Message::Binary(message.encode()))) .await - .map_err(|_err| Error::ReadTimeout)? - .ok_or(Error::NoMessage)? - .map_err(Error::InternalStream)?; - - if subscription_request_message.is_close() { - return Err(Error::CloseMessage); - } - - if let Some(binary) = subscription_request_message.try_binary() { - Ok(R::decode_all(&mut binary.as_slice())?) - } else { - Err(Error::NonBinaryMessage) - } + .map_err(|_err| Error::SendTimeout)? + .map_err(extract_ws_closed) } -} -impl StreamMessage for axum::extract::ws::Message { - fn binary(source: Vec) -> Self { - axum::extract::ws::Message::Binary(source) + /// Recv message and try to decode it + pub async fn recv(&mut self) -> Result { + // NOTE: ignore non binary messages + loop { + let message = tokio::time::timeout(TIMEOUT, self.0.next()) + .await + .map_err(|_err| Error::ReadTimeout)? + // NOTE: `None` is the same as `ConnectionClosed` or `AlreadyClosed` + .ok_or(Error::Closed)? + .map_err(extract_ws_closed)?; + + match message { + Message::Binary(binary) => { + return Ok(M::decode_all(&mut binary.as_slice())?); + } + Message::Text(_) | Message::Ping(_) | Message::Pong(_) => { + iroha_logger::debug!(?message, "Unexpected message received"); + } + Message::Close(_) => { + iroha_logger::debug!(?message, "Close message received"); + } + } + } } - fn try_binary(self) -> Option> { - if let Message::Binary(binary) = self { - Some(binary) - } else { - None + /// Discard messages and wait for close message + pub async fn closed(&mut self) -> Result<(), Error> { + loop { + match self.0.next().await { + // NOTE: `None` is the same as `ConnectionClosed` or `AlreadyClosed` + None => return Ok(()), + Some(Ok(_)) => {} + // NOTE: technically `ConnectionClosed` or `AlreadyClosed` never returned + // from `Stream` impl of `tokio_tungstenite` but left `ConnectionClosed` extraction to protect from potential change + Some(Err(error)) => match extract_ws_closed(error) { + Error::Closed => return Ok(()), + error => return Err(error), + }, + } } } - fn is_close(&self) -> bool { - matches!(self, axum::extract::ws::Message::Close(_)) + /// Close websocket + pub async fn close(mut self) -> Result<(), Error> { + // NOTE: use `SinkExt::close` because it's not trying to write to closed socket + match <_ as SinkExt<_>>::close(&mut self.0) + .await + .map_err(extract_ws_closed) + { + Err(Error::Closed) | Ok(()) => Ok(()), + Err(error) => Err(error), + } } } -#[async_trait::async_trait] -impl Sink for axum::extract::ws::WebSocket -where - M: Encode + Send + Sync + 'static, -{ - type Err = axum::Error; - type Message = axum::extract::ws::Message; -} +/// Check if websocket was closed normally +pub fn extract_ws_closed(error: axum::Error) -> Error { + let error = error.into_inner(); + // NOTE: for this downcast to work versions of `tungstenite` here and in axum should match + if let Some(tungstenite::Error::ConnectionClosed) = error.downcast_ref::() { + return Error::Closed; + } + if let Some(tungstenite::Error::AlreadyClosed) = error.downcast_ref::() { + return Error::Closed; + } -#[async_trait::async_trait] -impl Stream for axum::extract::ws::WebSocket { - type Err = axum::Error; - type Message = axum::extract::ws::Message; + Error::WebSocket(axum::Error::new(error)) }