From 11efaa5cfe1a87d3880c82a27e37a4da9d38ed4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Cortier?= Date: Wed, 14 Aug 2024 13:58:49 -0400 Subject: [PATCH] perf(jetsocat,dgw): limit number of syscalls in JMUX sender task (#976) Number of syscalls is reduced by using a `BufWriter` and waiting for write operations to stop before calling `flush()`. Performance is increased by ~28.4%. Before this patch: > 0.0000-17.1307 sec 28.8 GBytes 14.4 Gbits/sec After this patch: > 0.0000-13.8483 sec 29.9 GBytes 18.5 Gbits/sec --- crates/jmux-proxy/src/lib.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/crates/jmux-proxy/src/lib.rs b/crates/jmux-proxy/src/lib.rs index 7b464b790..1359d13be 100644 --- a/crates/jmux-proxy/src/lib.rs +++ b/crates/jmux-proxy/src/lib.rs @@ -250,21 +250,34 @@ impl JmuxSenderTask { #[instrument("sender", skip_all)] async fn run(self) -> anyhow::Result<()> { let Self { - mut jmux_writer, + jmux_writer, mut msg_to_send_rx, } = self; + let mut jmux_writer = tokio::io::BufWriter::with_capacity(16 * 1024, jmux_writer); let mut buf = bytes::BytesMut::new(); + let mut needs_flush = false; - while let Some(msg) = msg_to_send_rx.recv().await { - trace!(?msg, "Send channel message"); + loop { + tokio::select! { + msg = msg_to_send_rx.recv() => { + let Some(msg) = msg else { + break; + }; - buf.clear(); - msg.encode(&mut buf)?; + trace!(?msg, "Send channel message"); - jmux_writer.write_all(&buf).await?; + buf.clear(); + msg.encode(&mut buf)?; - jmux_writer.flush().await?; + jmux_writer.write_all(&buf).await?; + needs_flush = true; + } + _ = tokio::time::sleep(core::time::Duration::from_millis(10)), if needs_flush => { + jmux_writer.flush().await?; + needs_flush = false; + } + } } // TODO: send a signal to the main scheduler when we are done processing channel data messages @@ -272,6 +285,8 @@ impl JmuxSenderTask { info!("Closing JMUX sender task..."); + jmux_writer.flush().await?; + Ok(()) } }