From 06f05ae5a32c85a496a458b9cf9fa33e34fc0a43 Mon Sep 17 00:00:00 2001 From: J Jeshwanth Reddy Date: Sun, 21 Jul 2024 19:47:10 +0530 Subject: [PATCH] Implemented explicit flushing of buffered samples from sink to file #703 - Added `virtual ROC_ATTR_NODISCARD status::StatusCode flush()=0` pure virtual function in `sndio::ISink`. - Added this method to all ISink implementations. - Implementations that don't use buffereing are no-op. - For SoxSink, flush() sends the buffered samples to disk. - sndio::Pump invokes `flush()` when it exits from `run()`. - Made SoxSink::write to write only when buffer_pos is greater than zero --- .../roc_pipeline/sender_loop.cpp | 6 ++++++ .../roc_pipeline/sender_loop.h | 1 + .../roc_pipeline/sender_sink.cpp | 4 ++++ .../roc_pipeline/sender_sink.h | 3 +++ .../roc_pipeline/transcoder_sink.cpp | 4 ++++ .../roc_pipeline/transcoder_sink.h | 3 +++ src/internal_modules/roc_sndio/isink.h | 4 ++++ src/internal_modules/roc_sndio/pump.cpp | 7 +++++++ .../roc_sndio/pulseaudio_device.cpp | 4 ++++ .../roc_sndio/pulseaudio_device.h | 3 +++ .../target_sndfile/roc_sndio/sndfile_sink.cpp | 4 ++++ .../target_sndfile/roc_sndio/sndfile_sink.h | 3 +++ .../target_sox/roc_sndio/sox_sink.cpp | 18 +++++++++++++++--- .../roc_sndio/target_sox/roc_sndio/sox_sink.h | 3 +++ src/internal_modules/roc_sndio/wav_sink.cpp | 4 ++++ src/internal_modules/roc_sndio/wav_sink.h | 3 +++ .../roc_pipeline/test_helpers/mock_sink.h | 4 ++++ src/tests/roc_sndio/test_helpers/mock_sink.h | 4 ++++ 18 files changed, 79 insertions(+), 3 deletions(-) diff --git a/src/internal_modules/roc_pipeline/sender_loop.cpp b/src/internal_modules/roc_pipeline/sender_loop.cpp index 34bef9128..d62fb2121 100644 --- a/src/internal_modules/roc_pipeline/sender_loop.cpp +++ b/src/internal_modules/roc_pipeline/sender_loop.cpp @@ -221,6 +221,12 @@ status::StatusCode SenderLoop::write(audio::Frame& frame) { return code; } +status::StatusCode SenderLoop::flush() { + core::Mutex::Lock lock(sink_mutex_); + + return sink_.flush(); +} + core::nanoseconds_t SenderLoop::timestamp_imp() const { return core::timestamp(core::ClockMonotonic); } diff --git a/src/internal_modules/roc_pipeline/sender_loop.h b/src/internal_modules/roc_pipeline/sender_loop.h index 45d1082ba..df6ef5e4e 100644 --- a/src/internal_modules/roc_pipeline/sender_loop.h +++ b/src/internal_modules/roc_pipeline/sender_loop.h @@ -152,6 +152,7 @@ class SenderLoop : public PipelineLoop, private sndio::ISink { virtual core::nanoseconds_t latency() const; virtual bool has_clock() const; virtual status::StatusCode write(audio::Frame& frame); + virtual ROC_ATTR_NODISCARD status::StatusCode flush(); // Methods of PipelineLoop virtual core::nanoseconds_t timestamp_imp() const; diff --git a/src/internal_modules/roc_pipeline/sender_sink.cpp b/src/internal_modules/roc_pipeline/sender_sink.cpp index e9a18773c..cd4704e6f 100644 --- a/src/internal_modules/roc_pipeline/sender_sink.cpp +++ b/src/internal_modules/roc_pipeline/sender_sink.cpp @@ -205,5 +205,9 @@ status::StatusCode SenderSink::write(audio::Frame& frame) { return code; } +status::StatusCode SenderSink::flush() { + return status::StatusOK; +} + } // namespace pipeline } // namespace roc diff --git a/src/internal_modules/roc_pipeline/sender_sink.h b/src/internal_modules/roc_pipeline/sender_sink.h index 070a90044..1b1b96847 100644 --- a/src/internal_modules/roc_pipeline/sender_sink.h +++ b/src/internal_modules/roc_pipeline/sender_sink.h @@ -109,6 +109,9 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> { //! Write frame. virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame); + //! Flush buffered data, if any. + virtual ROC_ATTR_NODISCARD status::StatusCode flush(); + private: SenderSinkConfig sink_config_; diff --git a/src/internal_modules/roc_pipeline/transcoder_sink.cpp b/src/internal_modules/roc_pipeline/transcoder_sink.cpp index 22c585a09..93a261464 100644 --- a/src/internal_modules/roc_pipeline/transcoder_sink.cpp +++ b/src/internal_modules/roc_pipeline/transcoder_sink.cpp @@ -129,5 +129,9 @@ status::StatusCode TranscoderSink::write(audio::Frame& frame) { return frame_writer_->write(frame); } +status::StatusCode TranscoderSink::flush() { + return status::StatusOK; +} + } // namespace pipeline } // namespace roc diff --git a/src/internal_modules/roc_pipeline/transcoder_sink.h b/src/internal_modules/roc_pipeline/transcoder_sink.h index 43bd718df..87104eac5 100644 --- a/src/internal_modules/roc_pipeline/transcoder_sink.h +++ b/src/internal_modules/roc_pipeline/transcoder_sink.h @@ -69,6 +69,9 @@ class TranscoderSink : public sndio::ISink, public core::NonCopyable<> { //! Write frame. virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame); + //! Flush buffered data, if any. + virtual ROC_ATTR_NODISCARD status::StatusCode flush(); + private: audio::FrameFactory frame_factory_; diff --git a/src/internal_modules/roc_sndio/isink.h b/src/internal_modules/roc_sndio/isink.h index cd36dc1ae..14406bc7f 100644 --- a/src/internal_modules/roc_sndio/isink.h +++ b/src/internal_modules/roc_sndio/isink.h @@ -14,6 +14,7 @@ #include "roc_audio/iframe_writer.h" #include "roc_sndio/idevice.h" +#include "roc_status/status_code.h" namespace roc { namespace sndio { @@ -22,6 +23,9 @@ namespace sndio { class ISink : virtual public IDevice, public audio::IFrameWriter { public: virtual ~ISink(); + + //! Flush buffered data, if any. + virtual ROC_ATTR_NODISCARD status::StatusCode flush() = 0; }; } // namespace sndio diff --git a/src/internal_modules/roc_sndio/pump.cpp b/src/internal_modules/roc_sndio/pump.cpp index ca1b855f6..26f93d72c 100644 --- a/src/internal_modules/roc_sndio/pump.cpp +++ b/src/internal_modules/roc_sndio/pump.cpp @@ -71,6 +71,13 @@ status::StatusCode Pump::run() { if (code == status::StatusEnd) { code = status::StatusOK; // EOF is fine } + if (code == status::StatusOK) { + code = sink_.flush(); + if (code != status::StatusOK) { + roc_log(LogError, "pump: got error when flushing sink: status=%s", + status::code_to_str(code)); + } + } roc_log(LogDebug, "pump: exiting main loop"); diff --git a/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.cpp b/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.cpp index 1a2fea542..33412f991 100644 --- a/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.cpp +++ b/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.cpp @@ -326,6 +326,10 @@ status::StatusCode PulseaudioDevice::write(audio::Frame& frame) { return handle_request_(frame.bytes(), frame.num_bytes()); } +status::StatusCode PulseaudioDevice::flush() { + return status::StatusOK; +} + status::StatusCode PulseaudioDevice::read(audio::Frame& frame, packet::stream_timestamp_t duration, audio::FrameReadMode mode) { diff --git a/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.h b/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.h index dbd513810..a20008281 100644 --- a/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.h +++ b/src/internal_modules/roc_sndio/target_pulseaudio/roc_sndio/pulseaudio_device.h @@ -98,6 +98,9 @@ class PulseaudioDevice : public ISink, public ISource, public core::NonCopyable< packet::stream_timestamp_t duration, audio::FrameReadMode mode); + //! Flush buffered data, if any. + virtual ROC_ATTR_NODISCARD status::StatusCode flush(); + private: static void context_state_cb_(pa_context* context, void* userdata); diff --git a/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.cpp b/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.cpp index 764eaf6d0..7f199a463 100644 --- a/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.cpp +++ b/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.cpp @@ -244,6 +244,10 @@ status::StatusCode SndfileSink::write(audio::Frame& frame) { return status::StatusOK; } +status::StatusCode SndfileSink::flush() { + return status::StatusOK; +} + status::StatusCode SndfileSink::open_(const char* driver, const char* path) { if (!map_to_sndfile(&driver, path, file_info_)) { roc_log(LogDebug, diff --git a/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.h b/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.h index 7270589d5..0e2d617e9 100644 --- a/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.h +++ b/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_sink.h @@ -66,6 +66,9 @@ class SndfileSink : public ISink, public core::NonCopyable<> { //! Write frame. virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame); + //! Flush buffered data, if any. + virtual ROC_ATTR_NODISCARD status::StatusCode flush(); + private: status::StatusCode open_(const char* driver, const char* path); void close_(); diff --git a/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.cpp b/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.cpp index 597beb2d0..8244f4082 100644 --- a/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.cpp +++ b/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.cpp @@ -209,9 +209,21 @@ status::StatusCode SoxSink::write(audio::Frame& frame) { } } - const status::StatusCode code = write_(buffer_data, buffer_pos); - if (code != status::StatusOK) { - return code; + if (buffer_pos > 0) { + const status::StatusCode code = write_(buffer_data, buffer_pos); + if (code != status::StatusOK) { + return code; + } + } + + return status::StatusOK; +} + +status::StatusCode SoxSink::flush() { + if (output_ != NULL && driver_type_ == DriverType_File && output_->fp != NULL) { + if (fflush((FILE*)output_->fp) != 0) { + return status::StatusErrFile; + } } return status::StatusOK; diff --git a/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.h b/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.h index 3500f1e33..02365f216 100644 --- a/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.h +++ b/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.h @@ -78,6 +78,9 @@ class SoxSink : public ISink, public core::NonCopyable<> { //! Write frame. virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame); + //! Flush buffered data, if any. + virtual ROC_ATTR_NODISCARD status::StatusCode flush(); + private: status::StatusCode init_names_(const char* driver, const char* path); status::StatusCode init_buffer_(); diff --git a/src/internal_modules/roc_sndio/wav_sink.cpp b/src/internal_modules/roc_sndio/wav_sink.cpp index 88954b6e9..82450bda5 100644 --- a/src/internal_modules/roc_sndio/wav_sink.cpp +++ b/src/internal_modules/roc_sndio/wav_sink.cpp @@ -138,6 +138,10 @@ status::StatusCode WavSink::write(audio::Frame& frame) { return status::StatusOK; } +status::StatusCode WavSink::flush() { + return status::StatusOK; +} + status::StatusCode WavSink::open_(const char* path) { if (output_file_) { roc_panic("wav sink: already opened"); diff --git a/src/internal_modules/roc_sndio/wav_sink.h b/src/internal_modules/roc_sndio/wav_sink.h index 154023752..aa13e55b5 100644 --- a/src/internal_modules/roc_sndio/wav_sink.h +++ b/src/internal_modules/roc_sndio/wav_sink.h @@ -63,6 +63,9 @@ class WavSink : public ISink, public core::NonCopyable<> { //! Write frame. virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame); + //! Flush buffered data, if any. + virtual ROC_ATTR_NODISCARD status::StatusCode flush(); + private: status::StatusCode open_(const char* path); void close_(); diff --git a/src/tests/roc_pipeline/test_helpers/mock_sink.h b/src/tests/roc_pipeline/test_helpers/mock_sink.h index 1cdc807cb..6a0dbc590 100644 --- a/src/tests/roc_pipeline/test_helpers/mock_sink.h +++ b/src/tests/roc_pipeline/test_helpers/mock_sink.h @@ -75,6 +75,10 @@ class MockSink : public sndio::ISink, public core::NonCopyable<> { return status::StatusOK; } + virtual ROC_ATTR_NODISCARD status::StatusCode flush() { + return status::StatusOK; + } + void expect_frames(size_t total) { UNSIGNED_LONGS_EQUAL(total, n_frames_); } diff --git a/src/tests/roc_sndio/test_helpers/mock_sink.h b/src/tests/roc_sndio/test_helpers/mock_sink.h index 302b2046c..11d68e6eb 100644 --- a/src/tests/roc_sndio/test_helpers/mock_sink.h +++ b/src/tests/roc_sndio/test_helpers/mock_sink.h @@ -82,6 +82,10 @@ class MockSink : public ISink { } } + virtual ROC_ATTR_NODISCARD status::StatusCode flush() { + return status::StatusOK; + } + private: enum { MaxSz = 256 * 1024 };