Skip to content

Commit

Permalink
feat(dgw): support asciicast streaming (#1165)
Browse files Browse the repository at this point in the history
Co-authored-by: Benoît Cortier <[email protected]>
  • Loading branch information
irvingoujAtDevolution and CBenoit authored Jan 8, 2025
1 parent 0ed70d2 commit 8a52585
Show file tree
Hide file tree
Showing 33 changed files with 555 additions and 130 deletions.
57 changes: 34 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions crates/ascii-streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "ascii-streamer"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0"
serde_json = "1.0"
tokio = { version = "1.42", features = ["io-util", "sync"] }
tracing = "0.1"

[lints]
workspace = true
75 changes: 75 additions & 0 deletions crates/ascii-streamer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#[macro_use]
extern crate tracing;

use std::{future::Future, sync::Arc};

use tokio::{
io::{AsyncBufReadExt, BufReader},
sync::Notify,
};

pub trait AsciiStreamSocket {
fn send(&mut self, value: String) -> impl Future<Output = anyhow::Result<()>> + Send;
fn close(&mut self) -> impl Future<Output = ()> + Send;
}

#[tracing::instrument(skip_all)]
pub async fn ascii_stream(
mut websocket: impl AsciiStreamSocket,
input_stream: impl tokio::io::AsyncRead + Unpin,
shutdown_signal: Arc<Notify>,
when_new_chunk_appended: impl Fn() -> tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<()> {
info!("Starting ASCII streaming");
// Write all the data from the input stream to the output stream.
let buf_reader = BufReader::new(input_stream);
let mut lines = BufReader::new(buf_reader).lines();

loop {
match lines.next_line().await {
Ok(Some(line)) => {
websocket.send(line.clone()).await?;
}
Ok(None) => {
break;
}
Err(e) => {
warn!(error=%e, "Error reading line");
continue;
}
}
}

loop {
tokio::select! {
_ = when_new_chunk_appended() => {
loop {
match lines.next_line().await {
Ok(Some(line)) => {
websocket.send(line.clone()).await?;
}
Ok(None) => {
debug!("EOF reached");
break;
}
Err(e) => {
warn!(error=%e, "Error reading line");
continue;
}
}
}
}
_ = shutdown_signal.notified() => {
break;
}
}
}

// Note: though sometimes we end the loop with error
// but we still needs to send 1000 code to the client
// as it is what is expected for the ascii-player to end the playback properly
websocket.close().await;
debug!("Shutting down ASCII streaming");

Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "streamer"
name = "video-streamer"
version = "0.0.0"
authors = ["Devolutions Inc. <[email protected]>"]
edition = "2021"
Expand All @@ -20,7 +20,7 @@ tracing = "0.1"
webm-iterable = { version = "0.6", features = ["futures"] }
cadeau = { version = "0.5", features = ["dlopen"] }
thiserror = "2"
num_cpus = "1.16.0"
num_cpus = "1.16"

[dev-dependencies]
tracing-subscriber = "0.3"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
#![allow(clippy::print_stdout)]
#![allow(clippy::unwrap_used)]

use std::{env, path::Path, process::exit};
use std::{env, path::Path, process::exit, sync::Arc};

use anyhow::Context;
use cadeau::xmf;
use local_websocket::create_local_websocket;
use streamer::{config::CpuCount, webm_stream, ReOpenableFile, StreamingConfig};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::watch::Sender,
sync::{watch::Sender, Notify},
};
use tracing::{error, info};
use video_streamer::{config::CpuCount, webm_stream, ReOpenableFile, StreamingConfig};

pub struct TokioSignal {
signal: tokio::sync::watch::Receiver<()>,
}

impl streamer::Signal for TokioSignal {
async fn wait(&mut self) {
let _ = self.signal.changed().await;
}
}

mod local_websocket;

#[tokio::main]
Expand Down Expand Up @@ -53,10 +47,11 @@ async fn main() -> anyhow::Result<()> {
xmf::init(args.lib_xmf_path)?;
}

let notify = Arc::new(Notify::new());

let input_path = Path::new(args.input_path);
let (eof_sender, eof_receiver) = tokio::sync::watch::channel(());
let (file_written_sender, file_written_receiver) = tokio::sync::broadcast::channel(1);
let intermediate_file = get_slowly_written_file(input_path, eof_sender, file_written_sender).await?;
let intermediate_file = get_slowly_written_file(input_path, notify.clone(), file_written_sender).await?;

let (client, server) = create_local_websocket().await;
let output_file = tokio::fs::OpenOptions::new()
Expand All @@ -68,13 +63,11 @@ async fn main() -> anyhow::Result<()> {

run_client(client, output_file);

let shutdown_signal = TokioSignal { signal: eof_receiver };

tokio::task::spawn_blocking(move || {
webm_stream(
server,
intermediate_file,
shutdown_signal,
notify,
StreamingConfig {
encoder_threads: CpuCount::default(),
},
Expand All @@ -99,7 +92,7 @@ async fn main() -> anyhow::Result<()> {

async fn get_slowly_written_file(
input_path: &Path,
eof_signal: Sender<()>,
eof_signal: Arc<Notify>,
file_written_sender: tokio::sync::broadcast::Sender<()>,
) -> anyhow::Result<ReOpenableFile> {
let input_file_name = input_path
Expand Down Expand Up @@ -152,7 +145,7 @@ async fn get_slowly_written_file(
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}

eof_signal.send(()).unwrap();
eof_signal.notify_waiters();
Ok::<_, anyhow::Error>(())
});

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ extern crate tracing;
pub use config::StreamingConfig;
pub use streamer::reopenable_file::ReOpenableFile;
pub use streamer::signal_writer::SignalWriter;
pub use streamer::{webm_stream, Signal};
pub use streamer::webm_stream;
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ pub(crate) mod tag_writers;

use crate::{reopenable::Reopenable, StreamingConfig};

pub trait Signal: Send + 'static {
fn wait(&mut self) -> impl std::future::Future<Output = ()> + Send;
}

#[instrument(skip_all)]
pub fn webm_stream(
output_stream: impl tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin + Send + 'static, // A websocket usually
input_stream: impl std::io::Read + Reopenable, // A file usually
shutdown_signal: impl Signal,
shutdown_signal: Arc<Notify>,
config: StreamingConfig,
when_new_chunk_appended: impl Fn() -> tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -181,7 +177,7 @@ fn spawn_sending_task<W>(
ws_frame: Framed<W, ProtocolCodeC>,
mut chunk_receiver: ChannelWriterReceiver,
codec: Option<String>,
mut shutdown_signal: impl Signal,
shutdown_signal: Arc<Notify>,
mut error_receiver: mpsc::Receiver<UserFriendlyError>,
stop_notifier: Arc<Notify>,
) where
Expand Down Expand Up @@ -245,7 +241,7 @@ fn spawn_sending_task<W>(
continue;
}
},
_ = shutdown_signal.wait() => {
_ = shutdown_signal.notified() => {
info!("Received shutdown signal");
ws_send(&ws_frame_clone, protocol::ServerMessage::End).await;
break;
Expand Down
File renamed without changes.
File renamed without changes.
3 changes: 2 additions & 1 deletion devolutions-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ ironrdp-rdcleanpath = { version = "0.1", git = "https://github.com/Devolutions/I
ceviche = "0.6.1"
picky-krb = "0.9"
network-scanner = { version = "0.0.0", path = "../crates/network-scanner" }
streamer = { path = "../crates/streamer" }
video-streamer = { path = "../crates/video-streamer" }
ascii-streamer = { path = "../crates/ascii-streamer" }

# Serialization
serde = { version = "1", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion devolutions-gateway/src/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use devolutions_gateway_task::{ShutdownSignal, Task};
use futures::future::Either;
use parking_lot::Mutex;
use serde::Serialize;
use streamer::SignalWriter;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::sync::{mpsc, oneshot, Notify};
use tokio::{fs, io};
use typed_builder::TypedBuilder;
use uuid::Uuid;
use video_streamer::SignalWriter;

use crate::job_queue::JobQueueHandle;
use crate::session::SessionMessageSender;
Expand Down
Loading

0 comments on commit 8a52585

Please sign in to comment.