Skip to content

Commit

Permalink
fix(torii): close ws properly in blocks/events stream
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Sep 25, 2024
1 parent c3ad903 commit 2a4e918
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 248 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 27 additions & 21 deletions crates/iroha/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,14 +716,23 @@ pub mod stream_api {
impl<E> Drop for SyncIterator<E> {
fn drop(&mut self) {
let mut close = || -> eyre::Result<()> {
self.stream.close(None)?;
let msg = self.stream.read()?;
if !msg.is_close() {
return Err(eyre!(
"Server hasn't sent `Close` message for websocket handshake"
));
match self.stream.close(None) {
Ok(()) => {}
Err(WebSocketError::ConnectionClosed | WebSocketError::AlreadyClosed) => {
return Ok(())
}
Err(error) => Err(error)?,
}
// NOTE: drive close handshake to completion
loop {
match self.stream.read() {
Ok(_) => {}
Err(WebSocketError::ConnectionClosed | WebSocketError::AlreadyClosed) => {
return Ok(())
}
Err(error) => Err(error)?,
}
}
Ok(())
};

trace!("Closing WebSocket connection");
Expand Down Expand Up @@ -776,17 +785,12 @@ pub mod stream_api {
/// - Closing the websocket connection itself fails.
pub async fn close(mut self) {
let close = async {
self.stream.close(None).await?;
if let Some(msg) = self.stream.next().await {
if !msg?.is_close() {
eyre::bail!("Server hasn't sent `Close` message for websocket handshake");
}
}
<_ as SinkExt<_>>::close(&mut self.stream).await?;
Ok(())
};

trace!("Closing WebSocket connection");
let _ = close.await.map_err(|e| error!(%e));
let _ = close.await.map_err(|e: eyre::Report| error!(%e));
trace!("WebSocket connection closed");
}
}
Expand All @@ -798,13 +802,15 @@ pub mod stream_api {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match futures_util::ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(WebSocketMessage::Binary(message))) => {
std::task::Poll::Ready(Some(self.handler.message(message)))
}
Some(Ok(_)) => std::task::Poll::Pending,
Some(Err(err)) => std::task::Poll::Ready(Some(Err(err.into()))),
None => std::task::Poll::Ready(None),
loop {
break match futures_util::ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(WebSocketMessage::Binary(message))) => {
std::task::Poll::Ready(Some(self.handler.message(message)))
}
Some(Ok(_)) => continue,
Some(Err(err)) => std::task::Poll::Ready(Some(Err(err.into()))),
None => std::task::Poll::Ready(None),
};
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/iroha_torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ parity-scale-codec = { workspace = true, features = ["derive"] }
pprof = { git = " https://github.com/Erigara/pprof-rs", branch = "fix_pointer_align", optional = true, default-features = false, features = ["protobuf-codec", "frame-pointer", "cpp"] }
nonzero_ext = { workspace = true }
pretty-error-debug = "0.3.0"
tungstenite = { workspace = true }

[dev-dependencies]
http-body-util = "0.1.2"
74 changes: 74 additions & 0 deletions crates/iroha_torii/src/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::{num::NonZeroU64, sync::Arc};

use iroha_core::kura::Kura;
use iroha_data_model::block::{
stream::{BlockMessage, BlockSubscriptionRequest},
SignedBlock,
};

use crate::stream::{self, WebSocketScale};

/// Type of error for `Consumer`
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error from provided stream/websocket
#[error("Stream error: {0}")]
Stream(Box<stream::Error>),
}

impl From<stream::Error> for Error {
fn from(error: stream::Error) -> Self {
Self::Stream(Box::new(error))
}
}

/// Result type for `Consumer`
pub type Result<T> = core::result::Result<T, Error>;

/// Consumer for Iroha `Block`(s).
/// Passes the blocks over the corresponding connection `stream`.
#[derive(Debug)]
pub struct Consumer<'ws> {
pub stream: &'ws mut WebSocketScale,
height: NonZeroU64,
kura: Arc<Kura>,
}

impl<'ws> Consumer<'ws> {
/// Constructs [`Consumer`], which forwards blocks through the `stream`.
///
/// # Errors
/// Can fail due to timeout or without message at websocket or during decoding request
#[iroha_futures::telemetry_future]
pub async fn new(stream: &'ws mut WebSocketScale, kura: Arc<Kura>) -> Result<Self> {
let BlockSubscriptionRequest(height) = stream.recv().await?;
Ok(Consumer {
stream,
height,
kura,
})
}

/// Forwards block if block for given height already exists
///
/// # Errors
/// Can fail due to timeout. Also receiving might fail
#[iroha_futures::telemetry_future]
pub async fn consume(&mut self) -> Result<()> {
if let Some(block) = self.kura.get_block_by_height(
self.height
.try_into()
.expect("INTERNAL BUG: Number of blocks exceeds usize::MAX"),
) {
// TODO: to avoid clone `BlockMessage` could be split into sending and receiving parts
self.stream
.send(BlockMessage(SignedBlock::clone(&block)))
.await?;
self.height = self
.height
.checked_add(1)
.expect("Maximum block height is achieved.");
}
Ok(())
}
}
62 changes: 12 additions & 50 deletions crates/iroha_torii/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
//! Iroha is a quite dynamic system so many events can happen.
//! This module contains descriptions of such an events and
//! utility Iroha Special Instructions to work with them.
use axum::extract::ws::WebSocket;
use futures::TryStreamExt;
use iroha_data_model::events::prelude::*;
use iroha_macro::error::ErrorTryFromEnum;
use crate::stream::{self, Sink, Stream, StreamMessage as _};
use iroha_data_model::events::prelude::*;

/// Type of Stream error
pub type StreamError = stream::Error<<WebSocket as Stream<EventSubscriptionRequest>>::Err>;
use crate::stream::{self, WebSocketScale};

/// Type of error for `Consumer`
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error from provided stream/websocket
#[error("Stream error: {0}")]
Stream(Box<StreamError>),
/// Error from converting received message to filter
#[error("Can't retrieve subscription filter: {0}")]
CantRetrieveSubscriptionFilter(
#[from] ErrorTryFromEnum<EventSubscriptionRequest, EventFilterBox>,
),
/// Error from provided websocket
#[error("WebSocket error: {0}")]
WebSocket(#[from] axum::Error),
/// Error that occurs than `WebSocket::next()` call returns `None`
#[error("Can't receive message from stream")]
CantReceiveMessage,
Stream(Box<stream::Error>),
}

impl From<StreamError> for Error {
fn from(error: StreamError) -> Self {
impl From<stream::Error> for Error {
fn from(error: stream::Error) -> Self {
Self::Stream(Box::new(error))
}
}
Expand All @@ -42,20 +26,19 @@ pub type Result<T> = core::result::Result<T, Error>;
/// Consumer for Iroha `Event`(s).
/// Passes the events over the corresponding connection `stream` if they match the `filter`.
#[derive(Debug)]
pub struct Consumer {
stream: WebSocket,
pub struct Consumer<'ws> {
pub stream: &'ws mut WebSocketScale,
filters: Vec<EventFilterBox>,
}

impl Consumer {
impl<'ws> Consumer<'ws> {
/// Constructs [`Consumer`], which consumes `Event`s and forwards it through the `stream`.
///
/// # Errors
/// Can fail due to timeout or without message at websocket or during decoding request
#[iroha_futures::telemetry_future]
pub async fn new(mut stream: WebSocket) -> Result<Self> {
let EventSubscriptionRequest(filters) =
Stream::<EventSubscriptionRequest>::recv(&mut stream).await?;
pub async fn new(stream: &'ws mut WebSocketScale) -> Result<Self> {
let EventSubscriptionRequest(filters) = stream.recv::<EventSubscriptionRequest>().await?;
Ok(Consumer { stream, filters })
}

Expand All @@ -69,30 +52,9 @@ impl Consumer {
return Ok(());
}

Sink::<_>::send(&mut self.stream, EventMessage(event))
self.stream
.send(EventMessage(event))
.await
.map_err(Into::into)
}

/// Listen for `Close` message in loop
///
/// # Errors
/// Can fail if can't receive message from stream for some reason
pub async fn stream_closed(&mut self) -> Result<()> {
while let Some(message) = self.stream.try_next().await? {
if message.is_close() {
return Ok(());
}
iroha_logger::warn!("Unexpected message received: {:?}", message);
}
Err(Error::CantReceiveMessage)
}

/// Close stream. See [`WebSocket::close()`]
///
/// # Errors
/// Throws up [`WebSocket::close()`] errors
pub async fn close_stream(self) -> Result<()> {
self.stream.close().await.map_err(Into::into)
}
}
6 changes: 4 additions & 2 deletions crates/iroha_torii/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use utils::{

#[macro_use]
pub(crate) mod utils;
mod block;
mod event;
mod routing;
mod stream;
Expand Down Expand Up @@ -208,7 +209,7 @@ impl Torii {
move |ws: WebSocketUpgrade| {
core::future::ready(ws.on_upgrade(|ws| async move {
if let Err(error) =
routing::subscription::handle_subscription(events, ws).await
routing::event::handle_events_stream(events, ws).await
{
iroha_logger::error!(%error, "Failure during event streaming");
}
Expand All @@ -222,7 +223,8 @@ impl Torii {
let kura = self.kura.clone();
move |ws: WebSocketUpgrade| {
core::future::ready(ws.on_upgrade(|ws| async move {
if let Err(error) = routing::handle_blocks_stream(kura, ws).await {
if let Err(error) = routing::block::handle_blocks_stream(kura, ws).await
{
iroha_logger::error!(%error, "Failure during block streaming");
}
}))
Expand Down
Loading

0 comments on commit 2a4e918

Please sign in to comment.