Skip to content

Commit

Permalink
fix(dgw): fix streamer failure for RDM
Browse files Browse the repository at this point in the history
  • Loading branch information
irvingoujAtDevolution committed Jan 10, 2025
1 parent e4f8ee3 commit 6b0e511
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
21 changes: 15 additions & 6 deletions crates/video-streamer/src/streamer/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use webm_iterable::{
WebmIterator,
};

use crate::reopenable::Reopenable;
use crate::{debug::mastroka_spec_name, reopenable::Reopenable};

#[derive(Debug, Clone, Copy)]
pub(crate) enum LastKeyFrameInfo {
Expand Down Expand Up @@ -40,8 +40,6 @@ pub(crate) struct WebmPositionedIterator<R: std::io::Read + Seek + Reopenable> {
// So we need to keep track of weather we hit the cluster start and rolled back
// if so, we need to emit the cluster end tag manually
rolled_back_between_cluster: bool,

should_emit_cache: Option<MatroskaSpec>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -70,7 +68,6 @@ where
last_cluster_position: None,
rollback_record: None,
rolled_back_between_cluster: false,
should_emit_cache: None,
last_key_frame_info: LastKeyFrameInfo::NotMet {
cluster_timestamp: None,
cluster_start_position: None,
Expand Down Expand Up @@ -100,6 +97,14 @@ where
self.last_tag_position = record + inner.last_emitted_tag_offset();
}

warn!(
tag = mastroka_spec_name(tag),
absolute_postion = self.last_tag_position,
last_emmited_tag_offset = inner.last_emitted_tag_offset(),
record = record,
"First hand inner tag"
);

if matches!(tag, MatroskaSpec::BlockGroup(Master::Full(_))) {
// we check if the tag is BlockGroup Full,
// If so, we need to correct for the last tag position
Expand Down Expand Up @@ -186,7 +191,6 @@ where
};

if self.rolled_back_between_cluster {
self.should_emit_cache = Some(MatroskaSpec::Cluster(Master::Start));
self.rolled_back_between_cluster = false;
return Some(Ok(MatroskaSpec::Cluster(Master::End)));
} else {
Expand All @@ -199,7 +203,7 @@ where
}

pub(crate) fn rollback_to_last_successful_tag(&mut self) -> anyhow::Result<()> {
debug!(
warn!(
last_tag_position = self.last_tag_position,
"Rolling back to last successful tag"
);
Expand All @@ -223,13 +227,18 @@ where

pub(crate) fn skip(&mut self, number: u32) -> anyhow::Result<()> {
for _ in 0..number {
warn!(last_tag_position = self.last_tag_position, "Skipping tag");
let _ = self.next().context("failed to skip tag")??;
}

Ok(())
}

pub(crate) fn rollback_to_last_key_frame(&mut self) -> Result<LastKeyFrameInfo, IteratorError> {
warn!(
last_key_frame_info =?self.last_key_frame_info,
"Rolling back to last key frame"
);
let LastKeyFrameInfo::Met {
position: last_key_frame_position,
cluster_start_position,
Expand Down
8 changes: 5 additions & 3 deletions crates/video-streamer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ pub fn webm_stream(
header_writer.write(header)?;
}

let mut encode_writer = header_writer.into_encoded_writer(encode_writer_config)?;

let (mut encode_writer, cut_block_hit_marker) = header_writer.into_encoded_writer(encode_writer_config)?;
let mut cut_block_hit_marker = Some(cut_block_hit_marker);
// Start muxing from the last key frame.
// The WebM project requires the muxer to ensure the first Block/SimpleBlock is a keyframe.
// However, the WebM file emitted by the CaptureStream API in Chrome does not adhere to this requirement.
Expand Down Expand Up @@ -119,7 +119,9 @@ pub fn webm_stream(
}
Some(Ok(tag)) => {
if webm_itr.last_tag_position() == cut_block_position {
encode_writer.mark_cut_block_hit();
if let Some(cut_block_hit_marker) = cut_block_hit_marker.take() {
encode_writer.mark_cut_block_hit(cut_block_hit_marker);
}
}

match encode_writer.write(tag) {
Expand Down
1 change: 1 addition & 0 deletions crates/video-streamer/src/streamer/reopenable_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl Seek for ReOpenableFile {

impl Reopenable for ReOpenableFile {
fn reopen(&mut self) -> io::Result<()> {
error!("reopen file: {:?}", self.file_path);
let mut open_option = std::fs::OpenOptions::new();
open_option.read(true);

Expand Down
31 changes: 20 additions & 11 deletions crates/video-streamer/src/streamer/tag_writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
Ok(())
}

pub(crate) fn into_encoded_writer(self, config: EncodeWriterConfig) -> anyhow::Result<CutCusterWriter<T>> {
pub(crate) fn into_encoded_writer(self, config: EncodeWriterConfig) -> anyhow::Result<(CutCusterWriter<T>, CutBlockHitMarker)> {
let encoded_writer = CutCusterWriter::new(config, self)?;
Ok(encoded_writer)
}
Expand Down Expand Up @@ -94,11 +94,13 @@ where
cut_block_state: CutBlockState,
}

pub(crate) struct CutBlockHitMarker;

impl<T> CutCusterWriter<T>
where
T: std::io::Write,
{
fn new(config: EncodeWriterConfig, writer: HeaderWriter<T>) -> anyhow::Result<Self> {
fn new(config: EncodeWriterConfig, writer: HeaderWriter<T>) -> anyhow::Result<(Self, CutBlockHitMarker)> {
let decoder = VpxDecoder::builder()
.threads(config.threads)
.width(config.width)
Expand All @@ -117,13 +119,16 @@ where
.build()?;

let HeaderWriter { writer } = writer;
Ok(Self {
writer,
cluster_timestamp: None,
encoder,
decoder,
cut_block_state: CutBlockState::HaventMet,
})
Ok((
Self {
writer,
cluster_timestamp: None,
encoder,
decoder,
cut_block_state: CutBlockState::HaventMet,
},
CutBlockHitMarker,
))
}
}

Expand All @@ -137,6 +142,7 @@ where
{
#[instrument(skip(self, tag))]
pub fn write(&mut self, tag: MatroskaSpec) -> anyhow::Result<WriterResult> {
warn!("attempted to write tag: {}", mastroka_spec_name(&tag));
match tag {
MatroskaSpec::Timestamp(timestamp) => {
self.cluster_timestamp = Some(timestamp);
Expand Down Expand Up @@ -203,7 +209,9 @@ where
let current_block_absolute_time = current_video_block.absolute_timestamp()?;
let cluster_relative_timestamp = current_block_absolute_time - cut_block_absolute_time;
if self.should_write_new_cluster(current_block_absolute_time) {
self.start_new_cluster(cluster_relative_timestamp)?;
self.start_new_cluster(cluster_relative_timestamp).inspect_err(|e| {
error!("failed to start new cluster: {}", e);
})?;

self.cut_block_state = CutBlockState::Met {
cut_block_absolute_time,
Expand Down Expand Up @@ -296,7 +304,8 @@ where
false
}

pub(crate) fn mark_cut_block_hit(&mut self) {
pub(crate) fn mark_cut_block_hit(&mut self, _marker: CutBlockHitMarker) {
debug!("marking cut block hit");
self.cut_block_state = CutBlockState::AtCutBlock;
}
}
Expand Down

0 comments on commit 6b0e511

Please sign in to comment.