From 41b81a716c745305da7459851bd2f1a430505324 Mon Sep 17 00:00:00 2001 From: irving ou Date: Fri, 10 Jan 2025 12:02:20 -0500 Subject: [PATCH 1/3] parent f8913429193fc865a48ef1540363b34bcbf9cb49 author irving ou 1736528540 -0500 committer irving ou 1736530356 -0500 fix(dgw): fix streamer failure for RDM --- crates/video-streamer/src/streamer/mod.rs | 8 +++-- .../src/streamer/tag_writers.rs | 34 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/crates/video-streamer/src/streamer/mod.rs b/crates/video-streamer/src/streamer/mod.rs index 16de344a..4013d459 100644 --- a/crates/video-streamer/src/streamer/mod.rs +++ b/crates/video-streamer/src/streamer/mod.rs @@ -82,8 +82,8 @@ pub fn webm_stream( header_writer.write(header)?; } - let mut encode_writer = header_writer.into_encoded_writer(encode_writer_config)?; - + let (mut encode_writer, cut_block_hit_marker) = header_writer.into_encoded_writer(encode_writer_config)?; + let mut cut_block_hit_marker = Some(cut_block_hit_marker); // Start muxing from the last key frame. // The WebM project requires the muxer to ensure the first Block/SimpleBlock is a keyframe. // However, the WebM file emitted by the CaptureStream API in Chrome does not adhere to this requirement. @@ -119,7 +119,9 @@ pub fn webm_stream( } Some(Ok(tag)) => { if webm_itr.last_tag_position() == cut_block_position { - encode_writer.mark_cut_block_hit(); + if let Some(cut_block_hit_marker) = cut_block_hit_marker.take() { + encode_writer.mark_cut_block_hit(cut_block_hit_marker); + } } match encode_writer.write(tag) { diff --git a/crates/video-streamer/src/streamer/tag_writers.rs b/crates/video-streamer/src/streamer/tag_writers.rs index da5eae81..a2a0e25b 100644 --- a/crates/video-streamer/src/streamer/tag_writers.rs +++ b/crates/video-streamer/src/streamer/tag_writers.rs @@ -47,7 +47,10 @@ where Ok(()) } - pub(crate) fn into_encoded_writer(self, config: EncodeWriterConfig) -> anyhow::Result> { + pub(crate) fn into_encoded_writer( + self, + config: EncodeWriterConfig, + ) -> anyhow::Result<(CutCusterWriter, CutBlockHitMarker)> { let encoded_writer = CutCusterWriter::new(config, self)?; Ok(encoded_writer) } @@ -94,11 +97,13 @@ where cut_block_state: CutBlockState, } +pub(crate) struct CutBlockHitMarker; + impl CutCusterWriter where T: std::io::Write, { - fn new(config: EncodeWriterConfig, writer: HeaderWriter) -> anyhow::Result { + fn new(config: EncodeWriterConfig, writer: HeaderWriter) -> anyhow::Result<(Self, CutBlockHitMarker)> { let decoder = VpxDecoder::builder() .threads(config.threads) .width(config.width) @@ -117,13 +122,16 @@ where .build()?; let HeaderWriter { writer } = writer; - Ok(Self { - writer, - cluster_timestamp: None, - encoder, - decoder, - cut_block_state: CutBlockState::HaventMet, - }) + Ok(( + Self { + writer, + cluster_timestamp: None, + encoder, + decoder, + cut_block_state: CutBlockState::HaventMet, + }, + CutBlockHitMarker, + )) } } @@ -137,6 +145,7 @@ where { #[instrument(skip(self, tag))] pub fn write(&mut self, tag: MatroskaSpec) -> anyhow::Result { + warn!("attempted to write tag: {}", mastroka_spec_name(&tag)); match tag { MatroskaSpec::Timestamp(timestamp) => { self.cluster_timestamp = Some(timestamp); @@ -203,7 +212,9 @@ where let current_block_absolute_time = current_video_block.absolute_timestamp()?; let cluster_relative_timestamp = current_block_absolute_time - cut_block_absolute_time; if self.should_write_new_cluster(current_block_absolute_time) { - self.start_new_cluster(cluster_relative_timestamp)?; + self.start_new_cluster(cluster_relative_timestamp).inspect_err(|e| { + error!("failed to start new cluster: {}", e); + })?; self.cut_block_state = CutBlockState::Met { cut_block_absolute_time, @@ -296,7 +307,8 @@ where false } - pub(crate) fn mark_cut_block_hit(&mut self) { + pub(crate) fn mark_cut_block_hit(&mut self, _marker: CutBlockHitMarker) { + debug!("marking cut block hit"); self.cut_block_state = CutBlockState::AtCutBlock; } } From 043ae0cda5a96eeb49c6dbcc37351628bb698a2f Mon Sep 17 00:00:00 2001 From: irving ou Date: Fri, 10 Jan 2025 12:42:25 -0500 Subject: [PATCH 2/3] clean up --- crates/video-streamer/src/streamer/tag_writers.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/video-streamer/src/streamer/tag_writers.rs b/crates/video-streamer/src/streamer/tag_writers.rs index a2a0e25b..98d9f8cb 100644 --- a/crates/video-streamer/src/streamer/tag_writers.rs +++ b/crates/video-streamer/src/streamer/tag_writers.rs @@ -145,7 +145,6 @@ where { #[instrument(skip(self, tag))] pub fn write(&mut self, tag: MatroskaSpec) -> anyhow::Result { - warn!("attempted to write tag: {}", mastroka_spec_name(&tag)); match tag { MatroskaSpec::Timestamp(timestamp) => { self.cluster_timestamp = Some(timestamp); From 7d608e36ab6c08f0079e94f7814a32d6ba8bbbb9 Mon Sep 17 00:00:00 2001 From: irving ou Date: Sat, 11 Jan 2025 05:14:28 -0500 Subject: [PATCH 3/3] review fix --- crates/video-streamer/src/streamer/iter.rs | 10 ++++++++-- crates/video-streamer/src/streamer/tag_writers.rs | 15 ++++++--------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/crates/video-streamer/src/streamer/iter.rs b/crates/video-streamer/src/streamer/iter.rs index e9cd4e20..7115fd13 100644 --- a/crates/video-streamer/src/streamer/iter.rs +++ b/crates/video-streamer/src/streamer/iter.rs @@ -207,7 +207,7 @@ where let mut file = inner.into_inner(); file.reopen()?; file.seek(std::io::SeekFrom::Start(self.last_tag_position as u64))?; - self.inner = Some(WebmIterator::new(file, &[MatroskaSpec::BlockGroup(Master::Start)])); + self.new_inner(file); self.rollback_record = Some(self.last_tag_position); if self @@ -248,7 +248,7 @@ where file.seek(std::io::SeekFrom::Start(last_key_frame_position as u64))?; self.rollback_record = Some(last_key_frame_position); self.last_tag_position = last_key_frame_position; - self.inner = Some(WebmIterator::new(file, &[MatroskaSpec::BlockGroup(Master::Start)])); + self.new_inner(file); self.last_cluster_position = Some(cluster_start_position); Ok(self.last_key_frame_info) } @@ -315,6 +315,12 @@ where _ => Ok(false), } } + + fn new_inner(&mut self, reader: R) { + let mut inner = WebmIterator::new(reader, &[MatroskaSpec::BlockGroup(Master::Start)]); + inner.emit_master_end_when_eof(false); + self.inner = Some(inner); + } } pub(crate) fn read_vint(buffer: &[u8]) -> anyhow::Result> { diff --git a/crates/video-streamer/src/streamer/tag_writers.rs b/crates/video-streamer/src/streamer/tag_writers.rs index 98d9f8cb..bf4e8360 100644 --- a/crates/video-streamer/src/streamer/tag_writers.rs +++ b/crates/video-streamer/src/streamer/tag_writers.rs @@ -50,8 +50,8 @@ where pub(crate) fn into_encoded_writer( self, config: EncodeWriterConfig, - ) -> anyhow::Result<(CutCusterWriter, CutBlockHitMarker)> { - let encoded_writer = CutCusterWriter::new(config, self)?; + ) -> anyhow::Result<(CutClusterWriter, CutBlockHitMarker)> { + let encoded_writer = CutClusterWriter::new(config, self)?; Ok(encoded_writer) } } @@ -85,7 +85,7 @@ enum CutBlockState { }, } -pub(crate) struct CutCusterWriter +pub(crate) struct CutClusterWriter where T: std::io::Write, { @@ -99,7 +99,7 @@ where pub(crate) struct CutBlockHitMarker; -impl CutCusterWriter +impl CutClusterWriter where T: std::io::Write, { @@ -139,7 +139,7 @@ pub(crate) enum WriterResult { Continue, } -impl CutCusterWriter +impl CutClusterWriter where T: std::io::Write, { @@ -211,9 +211,7 @@ where let current_block_absolute_time = current_video_block.absolute_timestamp()?; let cluster_relative_timestamp = current_block_absolute_time - cut_block_absolute_time; if self.should_write_new_cluster(current_block_absolute_time) { - self.start_new_cluster(cluster_relative_timestamp).inspect_err(|e| { - error!("failed to start new cluster: {}", e); - })?; + self.start_new_cluster(cluster_relative_timestamp)?; self.cut_block_state = CutBlockState::Met { cut_block_absolute_time, @@ -307,7 +305,6 @@ where } pub(crate) fn mark_cut_block_hit(&mut self, _marker: CutBlockHitMarker) { - debug!("marking cut block hit"); self.cut_block_state = CutBlockState::AtCutBlock; } }