Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move delay control to latency monitors #779

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "roc_core/panic.h"
#include "roc_core/stddefs.h"
#include "roc_core/time.h"
#include "roc_packet/delayed_reader.h"
#include "roc_packet/ireader.h"
#include "roc_rtp/link_meter.h"

namespace roc {
Expand All @@ -24,7 +26,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
const SampleSpec& frame_sample_spec)
const SampleSpec& frame_sample_spec,
packet::DelayedReader& delayed_reader)
: tuner_(config, frame_sample_spec)
, frame_reader_(frame_reader)
, incoming_queue_(incoming_queue)
Expand All @@ -36,7 +39,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
, packet_sample_spec_(packet_sample_spec)
, frame_sample_spec_(frame_sample_spec)
, alive_(true)
, valid_(false) {
, valid_(false),
delayed_reader_(delayed_reader) {
if (!tuner_.is_valid()) {
return;
}
Expand Down Expand Up @@ -116,6 +120,12 @@ bool LatencyMonitor::pre_process_(const Frame& frame) {
}
}

if (!delayed_reader_.is_started()) {
if (tuner_.can_start()) {
delayed_reader_.start();
}
}

return true;
}

Expand Down
7 changes: 6 additions & 1 deletion src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "roc_core/noncopyable.h"
#include "roc_core/optional.h"
#include "roc_core/time.h"
#include "roc_packet/delayed_reader.h"
#include "roc_packet/ilink_meter.h"
#include "roc_packet/ireader.h"
#include "roc_packet/sorted_queue.h"
#include "roc_packet/units.h"

Expand Down Expand Up @@ -67,7 +69,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
ResamplerReader* resampler,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
const SampleSpec& frame_sample_spec);
const SampleSpec& frame_sample_spec,
packet::DelayedReader& delayed_reader);

//! Check if the object was initialized successfully.
bool is_valid() const;
Expand Down Expand Up @@ -123,6 +126,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {

bool alive_;
bool valid_;

packet::DelayedReader& delayed_reader_;
};

} // namespace audio
Expand Down
29 changes: 28 additions & 1 deletion src/internal_modules/roc_audio/latency_tuner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ bool LatencyTuner::update_stream() {
default:
break;
}

if (enable_bounds_) {
if (!check_bounds_(latency)) {
return false;
Expand Down Expand Up @@ -463,5 +463,32 @@ const char* latency_tuner_profile_to_str(LatencyTunerProfile profile) {
return "<invalid>";
}

bool LatencyTuner::can_start() const {
roc_panic_if(!is_valid());

packet::stream_timestamp_diff_t latency = 0;

switch (backend_) {
case audio::LatencyTunerBackend_Niq:
if (!has_niq_latency_) {
return true;
}
latency = niq_latency_;
break;

case audio::LatencyTunerBackend_E2e:
if (!has_e2e_latency_) {
return true;
}
latency = e2e_latency_;
break;

default:
break;
}

return latency >= target_latency_;
}

} // namespace audio
} // namespace roc
2 changes: 2 additions & 0 deletions src/internal_modules/roc_audio/latency_tuner.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class LatencyTuner : public core::NonCopyable<> {
//! Returned value is close to 1.0.
float fetch_scaling();

bool can_start() const;

private:
bool check_bounds_(packet::stream_timestamp_diff_t latency);
void compute_scaling_(packet::stream_timestamp_diff_t latency);
Expand Down
30 changes: 16 additions & 14 deletions src/internal_modules/roc_packet/delayed_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@ namespace roc {
namespace packet {

DelayedReader::DelayedReader(IReader& reader,
core::nanoseconds_t target_delay,
const audio::SampleSpec& sample_spec)
: reader_(reader)
, queue_(0)
, delay_(0)
, started_(false)
, sample_spec_(sample_spec)
, valid_(false) {
if (target_delay > 0) {
delay_ = sample_spec.ns_2_stream_timestamp(target_delay);
}

roc_log(LogDebug, "delayed reader: initializing: delay=%lu(%.3fms)",
(unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_));
roc_log(LogDebug, "delayed reader: initializing: started=%d",
(bool)started_);

valid_ = true;
}
Expand All @@ -41,7 +36,6 @@ bool DelayedReader::is_valid() const {

status::StatusCode DelayedReader::read(PacketPtr& ptr) {
roc_panic_if(!valid_);

if (!started_) {
const status::StatusCode code = fetch_packets_();
if (code != status::StatusOK) {
Expand Down Expand Up @@ -75,14 +69,14 @@ status::StatusCode DelayedReader::fetch_packets_() {
}

const stream_timestamp_t qs = queue_size_();
if (qs < delay_) {
if (!is_started()) {
return status::StatusNoData;
}

roc_log(LogDebug,
"delayed reader: initial queue:"
" delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu",
(unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_),
" started=%d queue=%lu(%.3fms) packets=%lu",
(bool)started_,
(unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs),
(unsigned long)queue_.size());

Expand All @@ -99,7 +93,7 @@ status::StatusCode DelayedReader::read_queued_packet_(PacketPtr& pp) {
}

const stream_timestamp_t new_qs = queue_size_();
if (new_qs < delay_) {
if (!is_started()) {
break;
}

Expand All @@ -109,8 +103,8 @@ status::StatusCode DelayedReader::read_queued_packet_(PacketPtr& pp) {
if (qs != 0) {
roc_log(LogDebug,
"delayed reader: trimmed queue:"
" delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu",
(unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_),
" started=%dqueue=%lu(%.3fms) packets=%lu",
(bool)started_,
(unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs),
(unsigned long)(queue_.size() + 1));
}
Expand All @@ -136,5 +130,13 @@ stream_timestamp_t DelayedReader::queue_size_() const {
return (stream_timestamp_t)qs;
}

void DelayedReader::start() {
started_ = true;
}

bool DelayedReader::is_started() const {
return started_;
}

} // namespace packet
} // namespace roc
8 changes: 5 additions & 3 deletions src/internal_modules/roc_packet/delayed_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ class DelayedReader : public IReader, public core::NonCopyable<> {
//!
//! @b Parameters
//! - @p reader is used to read packets
//! - @p target_delay is the delay to insert before first packet
//! - @p sample_spec is the specifications of incoming packets
DelayedReader(IReader& reader,
core::nanoseconds_t target_delay,
const audio::SampleSpec& sample_spec);

//! Check if object was constructed successfully.
Expand All @@ -43,6 +41,11 @@ class DelayedReader : public IReader, public core::NonCopyable<> {
//! Read packet.
virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr&);

void start();

//! Check if object was constructed successfully.
bool is_started() const;

private:
status::StatusCode fetch_packets_();
status::StatusCode read_queued_packet_(PacketPtr&);
Expand All @@ -52,7 +55,6 @@ class DelayedReader : public IReader, public core::NonCopyable<> {
IReader& reader_;
SortedQueue queue_;

stream_timestamp_t delay_;
bool started_;

const audio::SampleSpec sample_spec_;
Expand Down
5 changes: 3 additions & 2 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config,
pkt_reader = filter_.get();

delayed_reader_.reset(new (delayed_reader_) packet::DelayedReader(
*pkt_reader, session_config.latency.target_latency, pkt_encoding->sample_spec));
*pkt_reader, pkt_encoding->sample_spec));

if (!delayed_reader_ || !delayed_reader_->is_valid()) {
return;
}
Expand Down Expand Up @@ -216,7 +217,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config,
latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor(
*frm_reader, *source_queue_, *depacketizer_, *source_meter_,
resampler_reader_.get(), session_config.latency, pkt_encoding->sample_spec,
common_config.output_sample_spec));
common_config.output_sample_spec, *delayed_reader_.get()));
if (!latency_monitor_ || !latency_monitor_->is_valid()) {
return;
}
Expand Down
Loading
Loading