Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dgw): support TRP streaming #1188

Merged
merged 9 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 0 additions & 13 deletions crates/ascii-streamer/Cargo.toml

This file was deleted.

15 changes: 15 additions & 0 deletions crates/terminal-streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "terminal-streamer"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0"
tokio = { version = "1.42", features = ["io-util", "sync", "macros", "rt-multi-thread", "time"] }
tracing = "0.1"

[lints]
workspace = true

[dev-dependencies]
tokio = { version = "1.42", features = ["fs"] }
27 changes: 27 additions & 0 deletions crates/terminal-streamer/examples/decode_trp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Context;
use tokio::io::AsyncWriteExt;

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
let mut arg = std::env::args();
let input = arg
.find(|arg| arg.starts_with("--input"))
.context("input path is required")?;

let input = input.split("=").last().context("file path is required")?;

let output = arg
.find(|arg| arg.starts_with("--output"))
.context("output path is required")?;

let output = output.split("=").last().context("output path is required")?;

let file = tokio::fs::File::open(input).await?;
let (_task, mut output_reader) = terminal_streamer::trp_decoder::decode_stream(file)?;
let mut output_file = tokio::fs::File::create(output).await?;

tokio::io::copy(&mut output_reader, &mut output_file).await?;
output_file.flush().await?;

Ok(())
}
74 changes: 74 additions & 0 deletions crates/terminal-streamer/src/asciinema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#[derive(Debug)]
pub(crate) struct AsciinemaHeader {
pub(crate) version: u16,
pub(crate) row: u16,
pub(crate) col: u16,
}

impl Default for AsciinemaHeader {
fn default() -> Self {
Self {
version: 2,
row: 24,
col: 80,
}
}
}

#[derive(Debug)]
pub(crate) enum AsciinemaEvent {
TerminalOutput { payload: String, time: f64 },
UserInput { payload: String, time: f64 },
Resize { width: u16, height: u16, time: f64 },
}

impl AsciinemaHeader {
pub(crate) fn to_json(&self) -> String {
format!(
r#"{{"version": {}, "row": {}, "col": {}}}"#,
self.version, self.row, self.col
)
}
}

impl AsciinemaEvent {
pub(crate) fn to_json(&self) -> String {
match self {
AsciinemaEvent::TerminalOutput { payload, time } => {
let escaped_payload = Self::sanitize_payload(payload);
format!(r#"[{},"o","{}"]"#, time, escaped_payload)
}
AsciinemaEvent::UserInput { payload, time } => {
let escaped_payload = Self::sanitize_payload(payload);
format!(r#"[{},"i","{}"]"#, time, escaped_payload)
}
AsciinemaEvent::Resize { width, height, time } => {
format!(r#"[{},"r","{}x{}"]"#, time, width, height)
}
}
}

/// Sanitizes a string payload for JSON output by escaping ASCII control characters.
///
/// This function converts ASCII control characters (0x00-0x1F, 0x7F) into their Unicode
/// escape sequence representation (e.g., '\u001b' for ESC), while leaving other characters unchanged.
/// This ensures the resulting string is valid JSON and control characters are preserved in a readable format.
///
/// # Arguments
/// * `payload` - The string to sanitize
///
/// # Returns
/// A new string with all control characters escaped
fn sanitize_payload(payload: &str) -> String {
payload
.chars()
.fold(String::with_capacity(payload.len()), |mut acc, c| {
if c.is_ascii_control() {
acc.push_str(&format!("\\u{:04x}", c as u32));
} else {
acc.push(c);
}
acc
})
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,50 @@
pub(crate) mod asciinema;
pub mod trp_decoder;

#[macro_use]
extern crate tracing;

use std::{future::Future, sync::Arc};

use tokio::{
io::{AsyncBufReadExt, BufReader},
io::{AsyncBufReadExt, AsyncRead, BufReader},
sync::Notify,
};

pub trait AsciiStreamSocket {
pub trait TerminalStreamSocket {
fn send(&mut self, value: String) -> impl Future<Output = anyhow::Result<()>> + Send;
fn close(&mut self) -> impl Future<Output = ()> + Send;
}

pub enum InputStreamType {
Asciinema,
Trp,
}

#[tracing::instrument(skip_all)]
pub async fn ascii_stream(
mut websocket: impl AsciiStreamSocket,
input_stream: impl tokio::io::AsyncRead + Unpin,
pub async fn terminal_stream(
mut websocket: impl TerminalStreamSocket,
input_stream: impl AsyncRead + Unpin + Send + 'static,
shutdown_signal: Arc<Notify>,
input_type: InputStreamType,
when_new_chunk_appended: impl Fn() -> tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<()> {
info!("Starting ASCII streaming");

let mut trp_task_handle = None;
// Write all the data from the input stream to the output stream.
let buf_reader = BufReader::new(input_stream);
let mut lines = BufReader::new(buf_reader).lines();
let boxed_stream = match input_type {
InputStreamType::Asciinema => Box::new(input_stream) as Box<dyn AsyncRead + Unpin + Send + 'static>,
InputStreamType::Trp => {
let (task, stream) = trp_decoder::decode_stream(input_stream)?;
trp_task_handle = Some(task);
Box::new(stream) as Box<dyn AsyncRead + Unpin + Send + 'static>
}
};

let mut lines = BufReader::new(boxed_stream).lines();

// iterate and drain all the lines from the input stream
loop {
match lines.next_line().await {
Ok(Some(line)) => {
Expand All @@ -49,7 +69,6 @@ pub async fn ascii_stream(
websocket.send(line.clone()).await?;
}
Ok(None) => {
debug!("EOF reached");
break;
}
Err(e) => {
Expand All @@ -69,6 +88,9 @@ pub async fn ascii_stream(
// but we still needs to send 1000 code to the client
// as it is what is expected for the ascii-player to end the playback properly
websocket.close().await;
if let Some(task) = trp_task_handle {
task.abort();
}
debug!("Shutting down ASCII streaming");

Ok(())
Expand Down
Loading
Loading