Skip to content

Commit

Permalink
perf(jetsocat,dgw): reuse buffer in JMUX sender task (#975)
Browse files Browse the repository at this point in the history
Performance is increased by ~26.3%.

Before this patch:

> 0.0000-19.0245 sec 25.2 GBytes 11.4 Gbits/sec

After this patch:

> 0.0000-17.1307 sec  28.8 GBytes  14.4 Gbits/sec
  • Loading branch information
CBenoit authored Aug 14, 2024
1 parent aaf7765 commit 8ebfd23
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 25 deletions.
8 changes: 4 additions & 4 deletions crates/jmux-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,21 @@ impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::PacketOversized { packet_size, max } => {
write!(f, "Packet oversized: max is {max}, got {packet_size}")
write!(f, "packet oversized: max is {max}, got {packet_size}")
}
Error::NotEnoughBytes {
name,
received,
expected,
} => write!(
f,
"Not enough bytes provided to decode {name}: received {received} bytes, expected {expected} bytes"
"not enough bytes provided to decode {name}: received {received} bytes, expected {expected} bytes"
),
Error::InvalidPacket { name, field, reason } => {
write!(f, "Invalid `{field}` in {name}: {reason}")
write!(f, "invalid `{field}` in {name}: {reason}")
}
Error::InvalidDestinationUrl { value, reason } => {
write!(f, "Invalid destination URL `{value}`: {reason}")
write!(f, "invalid destination URL `{value}`: {reason}")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/jmux-proto/tests/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn message_type_try_err_on_invalid_bytes() {
fn header_decode_buffer_too_short_err() {
let err = Header::decode(Bytes::from_static(&[])).err().unwrap();
assert_eq!(
"Not enough bytes provided to decode HEADER: received 0 bytes, expected 4 bytes",
"not enough bytes provided to decode HEADER: received 0 bytes, expected 4 bytes",
err.to_string()
);
}
Expand Down Expand Up @@ -156,7 +156,7 @@ pub fn error_on_oversized_packet() {
.encode(&mut buf)
.err()
.unwrap();
assert_eq!("Packet oversized: max is 65535, got 65543", err.to_string());
assert_eq!("packet oversized: max is 65535, got 65543", err.to_string());
}

#[test]
Expand Down
45 changes: 26 additions & 19 deletions crates/jmux-proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! [Specification document](https://github.com/awakecoding/qmux/blob/protocol-update/SPEC.md)
//! [Specification document][source]
//!
//! [source]: https://github.com/Devolutions/devolutions-gateway/blob/master/docs/JMUX-spec.md
#[macro_use]
extern crate tracing;
Expand All @@ -14,7 +16,6 @@ use self::codec::JmuxCodec;
use self::id_allocator::IdAllocator;
use anyhow::Context as _;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use jmux_proto::{ChannelData, DistantChannelId, Header, LocalChannelId, Message, ReasonCode};
use std::collections::HashMap;
use std::convert::TryFrom;
Expand All @@ -26,7 +27,7 @@ use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, Notify};
use tokio::task::JoinHandle;
use tokio_util::codec::{FramedRead, FramedWrite};
use tokio_util::codec::FramedRead;
use tracing::{Instrument as _, Span};

// PERF/FIXME: changing this parameter to 16 * 1024 greatly improves the throughput,
Expand Down Expand Up @@ -104,12 +105,6 @@ impl JmuxProxy {
self
}

// TODO: consider using something like ChildTask<T> more widely in Devolutions Gateway
pub fn spawn(self) -> JoinHandle<anyhow::Result<()>> {
let fut = self.run();
tokio::spawn(fut)
}

pub async fn run(self) -> anyhow::Result<()> {
let span = Span::current();
run_proxy_impl(self, span.clone()).instrument(span).await
Expand All @@ -127,10 +122,9 @@ async fn run_proxy_impl(proxy: JmuxProxy, span: Span) -> anyhow::Result<()> {
let (msg_to_send_tx, msg_to_send_rx) = mpsc::unbounded_channel::<Message>();

let jmux_stream = FramedRead::new(jmux_reader, JmuxCodec);
let jmux_sink = FramedWrite::new(jmux_writer, JmuxCodec);

let sender_task_handle = JmuxSenderTask {
jmux_sink,
jmux_writer,
msg_to_send_rx,
}
.spawn(span.clone());
Expand Down Expand Up @@ -243,7 +237,7 @@ enum InternalMessage {
// ---------------------- //

struct JmuxSenderTask<T: AsyncWrite + Unpin + Send + 'static> {
jmux_sink: FramedWrite<T, JmuxCodec>,
jmux_writer: T,
msg_to_send_rx: MessageReceiver,
}

Expand All @@ -256,16 +250,26 @@ impl<T: AsyncWrite + Unpin + Send + 'static> JmuxSenderTask<T> {
#[instrument("sender", skip_all)]
async fn run(self) -> anyhow::Result<()> {
let Self {
mut jmux_sink,
mut jmux_writer,
mut msg_to_send_rx,
} = self;

let mut buf = bytes::BytesMut::new();

while let Some(msg) = msg_to_send_rx.recv().await {
trace!(?msg, "Send channel message");
jmux_sink.feed(msg).await?;
jmux_sink.flush().await?;

buf.clear();
msg.encode(&mut buf)?;

jmux_writer.write_all(&buf).await?;

jmux_writer.flush().await?;
}

// TODO: send a signal to the main scheduler when we are done processing channel data messages
// and adjust windows for all the channels only then.

info!("Closing JMUX sender task...");

Ok(())
Expand All @@ -292,6 +296,8 @@ impl<T: AsyncRead + Unpin + Send + 'static> JmuxSchedulerTask<T> {

#[instrument("scheduler", skip_all)]
async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSchedulerTask<T>) -> anyhow::Result<()> {
use futures_util::StreamExt as _;

let JmuxSchedulerTask {
cfg,
mut jmux_stream,
Expand Down Expand Up @@ -342,7 +348,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
anyhow::bail!("detected two streams with the same ID {}", id);
}

// Send leftover bytes if any
// Send leftover bytes if any.
if let Some(leftover) = leftover {
if let Err(error) = msg_to_send_tx.send(Message::data(channel.distant_id, leftover)) {
error!(%error, "Couldn't send leftover bytes");
Expand Down Expand Up @@ -483,9 +489,8 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc

nb_consecutive_pipe_failures += 1;
if nb_consecutive_pipe_failures > MAX_CONSECUTIVE_PIPE_FAILURES {
// Some underlying `AsyncRead` implementations might handle errors poorly
// and cause infinite polling on errors such as broken pipe (this should
// stop instead of returning the same error indefinitely).
// Some underlying `AsyncRead` implementations might handle errors poorly and cause infinite polling on errors such as broken pipe.
// (This should stop instead of returning the same error indefinitely.)
// Hence, this safety net to escape from such infinite loops.
anyhow::bail!("forced JMUX proxy shutdown because of too many consecutive pipe failures");
} else {
Expand Down Expand Up @@ -744,6 +749,8 @@ impl DataReaderTask {
}

async fn run(self) -> anyhow::Result<()> {
use futures_util::StreamExt as _;

let Self {
reader,
local_id,
Expand Down

0 comments on commit 8ebfd23

Please sign in to comment.