From 5949c120dc6b9ed9b45a04b7f24ebafa0a15e01b Mon Sep 17 00:00:00 2001 From: Jose Daniel Hernandez Date: Tue, 20 Feb 2024 23:34:45 -0600 Subject: [PATCH] server: Respond WS text request as text as well Make the `WS` server to respond to requests in text when they arrive in text format instead of always replying in binary. --- derive/src/service.rs | 2 +- server/src/lib.rs | 58 ++++++++++++++++++++++++------------------- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/derive/src/service.rs b/derive/src/service.rs index b588507..1815c29 100644 --- a/derive/src/service.rs +++ b/derive/src/service.rs @@ -90,7 +90,7 @@ fn impl_service(im: &mut ItemImpl, args: &ServiceMeta) -> TokenStream { async fn dispatch( &mut self, request: ::nimiq_jsonrpc_core::Request, - tx: Option<&::tokio::sync::mpsc::Sender<::std::vec::Vec>>, + tx: Option<&::tokio::sync::mpsc::Sender<::nimiq_jsonrpc_server::Message>>, stream_id: u64, ) -> Option<::nimiq_jsonrpc_core::Response> { match request.method.as_str() { diff --git a/server/src/lib.rs b/server/src/lib.rs index f16f90d..9e5dcd3 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -30,6 +30,7 @@ use serde::{de::Deserialize, ser::Serialize}; use serde_json::Value; use thiserror::Error; use tokio::sync::{mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard}; +pub use warp::filters::ws::Message; use warp::Filter; use nimiq_jsonrpc_core::{ @@ -45,7 +46,7 @@ pub enum Error { /// Error from the message queues, that are used internally. #[error("Queue error: {0}")] - Mpsc(#[from] tokio::sync::mpsc::error::SendError>), + Mpsc(#[from] tokio::sync::mpsc::error::SendError), /// JSON error #[error("JSON error: {0}")] @@ -150,14 +151,14 @@ impl Server { .and_then(move |body: Bytes| { let inner = Arc::clone(&inner); async move { - let data = Self::handle_raw_request(inner, &body, None) + let data = Self::handle_raw_request(inner, &Message::binary(body), None) .await - .unwrap_or_default(); + .unwrap_or(Message::binary([])); let response = http::response::Builder::new() .status(200) .header("Content-Type", "application/json-rpc") - .body(data) + .body(data.as_bytes().to_owned()) .unwrap(); // As long as the hard-coded status code and content-type is correct, this won't fail. Ok::<_, warp::Rejection>(response) @@ -219,7 +220,7 @@ impl Server { // Forwards multiplexer queue output to websocket let forward_fut = async move { while let Some(data) = multiplex_rx.recv().await { - tx.send(warp::ws::Message::binary(data)).await?; + tx.send(data).await?; } Ok::<(), Error>(()) }; @@ -235,22 +236,17 @@ impl Server { if let Some((code, reason)) = message.close_frame() { // If the close message contains a code and a reason, we need to echo it back multiplex_tx - .send( - warp::ws::Message::close_with(code, reason.to_owned()) - .into_bytes(), - ) + .send(warp::ws::Message::close_with(code, reason.to_owned())) .await?; } else { // Otherwise we echo an empty close message - multiplex_tx - .send(warp::ws::Message::close().into_bytes()) - .await?; + multiplex_tx.send(warp::ws::Message::close()).await?; } // Then we exit the loop which closes the connection break; } else if let Some(response) = Self::handle_raw_request( Arc::clone(&inner), - message.as_bytes(), + &message, Some(&multiplex_tx), ) .await @@ -281,10 +277,10 @@ impl Server { /// async fn handle_raw_request( inner: Arc>, - request: &[u8], - tx: Option<&mpsc::Sender>>, - ) -> Option> { - match serde_json::from_slice(request) { + request: &Message, + tx: Option<&mpsc::Sender>, + ) -> Option { + match serde_json::from_slice(request.as_bytes()) { Ok(request) => Self::handle_request(inner, request, tx).await, Err(_e) => { log::error!("Received invalid JSON from client"); @@ -295,7 +291,16 @@ impl Server { } } .map(|response| { - serde_json::to_vec(&response).expect("Failed to serialize JSON RPC response") + if request.is_text() { + Message::text( + serde_json::to_string(&response) + .expect("Failed to serialize JSON RPC response"), + ) + } else { + Message::binary( + serde_json::to_vec(&response).expect("Failed to serialize JSON RPC response"), + ) + } }) } @@ -311,7 +316,7 @@ impl Server { async fn handle_request( inner: Arc>, request: SingleOrBatch, - tx: Option<&mpsc::Sender>>, + tx: Option<&mpsc::Sender>, ) -> Option> { match request { SingleOrBatch::Single(request) => Self::handle_single_request(inner, request, tx) @@ -342,7 +347,7 @@ impl Server { async fn handle_single_request( inner: Arc>, request: Request, - tx: Option<&mpsc::Sender>>, + tx: Option<&mpsc::Sender>, ) -> Option { let mut dispatcher = inner.dispatcher.write().await; // This ID is only used for streams @@ -366,7 +371,7 @@ pub trait Dispatcher: Send + Sync + 'static { async fn dispatch( &mut self, request: Request, - tx: Option<&mpsc::Sender>>, + tx: Option<&mpsc::Sender>, id: u64, ) -> Option; @@ -406,7 +411,7 @@ impl Dispatcher for ModularDispatcher { async fn dispatch( &mut self, request: Request, - tx: Option<&mpsc::Sender>>, + tx: Option<&mpsc::Sender>, id: u64, ) -> Option { for dispatcher in &mut self.dispatchers { @@ -475,7 +480,7 @@ where async fn dispatch( &mut self, request: Request, - tx: Option<&mpsc::Sender>>, + tx: Option<&mpsc::Sender>, id: u64, ) -> Option { if self.is_allowed(&request.method) { @@ -628,7 +633,7 @@ pub fn method_not_found(request: Request) -> Option { async fn forward_notification( item: T, - tx: &mut mpsc::Sender>, + tx: &mut mpsc::Sender, id: &SubscriptionId, method: &str, ) -> Result<(), Error> @@ -644,7 +649,8 @@ where log::debug!("Sending notification: {:?}", notification); - tx.send(serde_json::to_vec(¬ification)?).await?; + tx.send(Message::binary(serde_json::to_vec(¬ification)?)) + .await?; Ok(()) } @@ -664,7 +670,7 @@ where /// pub fn connect_stream( stream: S, - tx: &mpsc::Sender>, + tx: &mpsc::Sender, stream_id: u64, method: String, ) -> SubscriptionId