diff --git a/crates/jmux-proxy/src/lib.rs b/crates/jmux-proxy/src/lib.rs index 7b464b790..1359d13be 100644 --- a/crates/jmux-proxy/src/lib.rs +++ b/crates/jmux-proxy/src/lib.rs @@ -250,21 +250,34 @@ impl JmuxSenderTask { #[instrument("sender", skip_all)] async fn run(self) -> anyhow::Result<()> { let Self { - mut jmux_writer, + jmux_writer, mut msg_to_send_rx, } = self; + let mut jmux_writer = tokio::io::BufWriter::with_capacity(16 * 1024, jmux_writer); let mut buf = bytes::BytesMut::new(); + let mut needs_flush = false; - while let Some(msg) = msg_to_send_rx.recv().await { - trace!(?msg, "Send channel message"); + loop { + tokio::select! { + msg = msg_to_send_rx.recv() => { + let Some(msg) = msg else { + break; + }; - buf.clear(); - msg.encode(&mut buf)?; + trace!(?msg, "Send channel message"); - jmux_writer.write_all(&buf).await?; + buf.clear(); + msg.encode(&mut buf)?; - jmux_writer.flush().await?; + jmux_writer.write_all(&buf).await?; + needs_flush = true; + } + _ = tokio::time::sleep(core::time::Duration::from_millis(10)), if needs_flush => { + jmux_writer.flush().await?; + needs_flush = false; + } + } } // TODO: send a signal to the main scheduler when we are done processing channel data messages @@ -272,6 +285,8 @@ impl JmuxSenderTask { info!("Closing JMUX sender task..."); + jmux_writer.flush().await?; + Ok(()) } }