Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dgw): support TRP streaming #1188

Merged
merged 9 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions crates/ascii-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ edition = "2021"

[dependencies]
anyhow = "1.0"
serde_json = "1.0"
tokio = { version = "1.42", features = ["io-util", "sync"] }
tokio = { version = "1.42", features = ["io-util", "sync", "macros", "rt-multi-thread", "time"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Can you document why more features are necessary now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Cargo.toml or here in Github?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s better to document at the relevant places as GitHub PRs are not something we can refer to easily.

tracing = "0.1"

[lints]
workspace = true

[dev-dependencies]
tokio = { version = "1.42", features = ["fs"] }
32 changes: 32 additions & 0 deletions crates/ascii-streamer/examples/decode_trp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use tokio::io::AsyncWriteExt;

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
let mut arg = std::env::args();
let input = arg
.find(|arg| arg.starts_with("--input"))
.ok_or(anyhow::anyhow!("input path is required"))?;

let input = input
.split("=")
.last()
.ok_or(anyhow::anyhow!("file path is required"))?;

let output = arg
.find(|arg| arg.starts_with("--output"))
.ok_or(anyhow::anyhow!("output path is required"))?;

let output = output
.split("=")
.last()
.ok_or(anyhow::anyhow!("output path is required"))?;
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved

let file = tokio::fs::File::open(input).await?;
let (_task, mut output_reader) = ascii_streamer::trp_decoder::decode_stream(file)?;
let mut output_file = tokio::fs::File::create(output).await?;

tokio::io::copy(&mut output_reader, &mut output_file).await?;
output_file.flush().await?;

Ok(())
}
53 changes: 47 additions & 6 deletions crates/ascii-streamer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
pub mod trp_decoder;

#[macro_use]
extern crate tracing;

use std::{future::Future, sync::Arc};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use tokio::{
io::{AsyncBufReadExt, BufReader},
io::{AsyncBufReadExt, AsyncRead, BufReader, ReadBuf},
sync::Notify,
};

Expand All @@ -13,18 +20,32 @@ pub trait AsciiStreamSocket {
fn close(&mut self) -> impl Future<Output = ()> + Send;
}

pub enum InputStreamType {
Asciinema,
Trp,
}

#[tracing::instrument(skip_all)]
pub async fn ascii_stream(
mut websocket: impl AsciiStreamSocket,
input_stream: impl tokio::io::AsyncRead + Unpin,
input_stream: impl AsyncRead + Unpin + Send + 'static,
shutdown_signal: Arc<Notify>,
input_type: InputStreamType,
when_new_chunk_appended: impl Fn() -> tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<()> {
info!("Starting ASCII streaming");
let mut trp_task_handle = None;
// Write all the data from the input stream to the output stream.
let buf_reader = BufReader::new(input_stream);
let mut lines = BufReader::new(buf_reader).lines();
let either = match input_type {
InputStreamType::Asciinema => Either::Left(input_stream),
InputStreamType::Trp => {
let (task, stream) = trp_decoder::decode_stream(input_stream)?;
trp_task_handle = Some(task);
Either::Right(stream)
}
};

let mut lines = BufReader::new(either).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
Expand All @@ -49,7 +70,6 @@ pub async fn ascii_stream(
websocket.send(line.clone()).await?;
}
Ok(None) => {
debug!("EOF reached");
break;
}
Err(e) => {
Expand All @@ -69,7 +89,28 @@ pub async fn ascii_stream(
// but we still needs to send 1000 code to the client
// as it is what is expected for the ascii-player to end the playback properly
websocket.close().await;
if let Some(task) = trp_task_handle {
task.abort();
}
debug!("Shutting down ASCII streaming");

Ok(())
}

pub enum Either<A, B> {
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved
Left(A),
Right(B),
}

impl<A, B> AsyncRead for Either<A, B>
where
A: AsyncRead + Unpin + Send,
B: AsyncRead + Unpin + Send,
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
match self.get_mut() {
Either::Left(left) => Pin::new(left).poll_read(cx, buf),
Either::Right(right) => Pin::new(right).poll_read(cx, buf),
}
}
}
223 changes: 223 additions & 0 deletions crates/ascii-streamer/src/trp_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};

#[derive(Debug)]
struct AsciinemaHeader {
version: u16,
row: u16,
col: u16,
}
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved

impl Default for AsciinemaHeader {
fn default() -> Self {
Self {
version: 2,
row: 24,
col: 80,
}
}
}

#[derive(Debug)]
enum AsciinemaEvent {
TerminalOutput { payload: String, time: f64 },
UserInput { payload: String, time: f64 },
Resize { width: u16, height: u16, time: f64 },
}

impl AsciinemaHeader {
fn to_json(&self) -> String {
format!(
r#"{{"version": {}, "row": {}, "col": {}}}"#,
self.version, self.row, self.col
)
}
}

impl AsciinemaEvent {
fn to_json(&self) -> String {
match self {
AsciinemaEvent::TerminalOutput { payload, time } => {
let escaped_payload = Self::sanitize_payload(payload);
format!(r#"[{},"o","{}"]"#, time, escaped_payload)
}
AsciinemaEvent::UserInput { payload, time } => {
let escaped_payload = Self::sanitize_payload(payload);
format!(r#"[{},"i","{}"]"#, time, escaped_payload)
}
AsciinemaEvent::Resize { width, height, time } => {
format!(r#"[{},"r","{}x{}"]"#, time, width, height)
}
}
}

fn sanitize_payload(payload: &str) -> String {
CBenoit marked this conversation as resolved.
Show resolved Hide resolved
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved
payload
.chars()
.map(|c| {
if c.is_ascii_control() {
format!("\\u{:04x}", c as u32)
} else {
c.to_string()
}
})
.collect::<String>()
}
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn decode_stream(
mut input_stream: impl AsyncRead + Unpin + Send + 'static,
) -> anyhow::Result<(tokio::task::JoinHandle<()>, impl AsyncRead + Unpin + Send + 'static)> {
let (mut tx, rx) = tokio::sync::mpsc::channel(10);

let mut time = 0.0;
// Store everything until we have a terminal setup
let mut before_setup_cache = Some(Vec::new());
let mut header = AsciinemaHeader::default();
let task = tokio::spawn(async move {
let final_tx = tx.clone();
let task = async move {
loop {
let mut packet_head_buffer = [0u8; 8];
if let Err(e) = input_stream.read_exact(&mut packet_head_buffer).await {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
}
anyhow::bail!(e);
}

let time_delta = u32::from_le_bytes(packet_head_buffer[0..4].try_into()?);
let event_type = u16::from_le_bytes(packet_head_buffer[4..6].try_into()?);
let size = u16::from_le_bytes(packet_head_buffer[6..8].try_into()?);
time += f64::from(time_delta) / 1000.0;
let mut event_payload = vec![0u8; size as usize];
if let Err(e) = input_stream.read_exact(&mut event_payload).await {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
}
anyhow::bail!(e);
}

match event_type {
0 => {
// Terminal output
let event_payload = String::from_utf8_lossy(&event_payload).into_owned();
let event = AsciinemaEvent::TerminalOutput {
payload: event_payload,
time,
};
match before_setup_cache {
Some(ref mut cache) => {
cache.push(event);
}
None => {
send(&mut tx, event.to_json()).await?;
}
}
}
1 => {
let event_payload = String::from_utf8_lossy(&event_payload).into_owned();
let event = AsciinemaEvent::UserInput {
payload: event_payload,
time,
};
match before_setup_cache {
Some(ref mut cache) => {
cache.push(event);
}
None => {
send(&mut tx, event.to_json()).await?;
}
}
}
2 => {
// Terminal size change
if before_setup_cache.is_some() {
header.row = u16::from_le_bytes(event_payload[0..2].try_into()?);
header.col = u16::from_le_bytes(event_payload[2..4].try_into()?);
} else {
let event = AsciinemaEvent::Resize {
width: header.col,
height: header.row,
time,
};
send(&mut tx, event.to_json()).await?;
}
}
4 => {
// Terminal setup
if before_setup_cache.is_some() {
let header_json = header.to_json();
send(&mut tx, header_json).await?;
if let Some(ref mut cache) = before_setup_cache {
for event in cache.drain(..) {
send(&mut tx, event.to_json()).await?;
}
}
before_setup_cache = None;
} else {
warn!("Termianl setup event cache is empty and we got a setup event");
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved
}
}
_ => {}
}
}
};

info!("TRP decoder task finished");
set_return_type::<anyhow::Result<()>, _>(&task);
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved
if let Err(e) = task.await {
final_tx.send(Err(e)).await.ok();
}
});

return Ok((task, AsyncReadChannel::new(rx)));

fn set_return_type<T, F: Future<Output = T>>(_arg: &F) {}
async fn send(
sender: &mut tokio::sync::mpsc::Sender<anyhow::Result<String>>,
mut json: String,
) -> anyhow::Result<()> {
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved
json.push('\n');
sender.send(Ok(json)).await?;
Ok(())
}
}

struct AsyncReadChannel {
receiver: tokio::sync::mpsc::Receiver<anyhow::Result<String>>,
}

impl AsyncReadChannel {
fn new(receiver: tokio::sync::mpsc::Receiver<anyhow::Result<String>>) -> Self {
Self { receiver }
}
}

impl AsyncRead for AsyncReadChannel {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
let res = Pin::new(&mut self.receiver).poll_recv(cx);
match res {
Poll::Ready(Some(Ok(data))) => {
buf.put_slice(data.as_bytes());
Poll::Ready(Ok(()))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))),
Poll::Ready(None) => {
// Channel is closed - only then we signal EOF
Poll::Ready(Ok(()))
}
Poll::Pending => {
// No data available yet, but channel is still open
Poll::Pending
}
}
}
}
Loading
Loading