Skip to content

Commit

Permalink
Implemented explicit flushing of buffered samples from sink to file r…
Browse files Browse the repository at this point in the history
…oc-streaming#703

- Added `virtual ROC_ATTR_NODISCARD status::StatusCode flush()=0` pure virtual function in `sndio::ISink`.
- Added this method to all ISink implementations.
- Implementations that don't use buffereing are no-op.
- For SoxSink, flush() sends the buffered samples to disk.
- sndio::Pump invokes `flush()` when it exits from `run()`.
- Made SoxSink::write to write only when buffer_pos is greater than zero
  • Loading branch information
jeshwanthreddy13 committed Aug 6, 2024
1 parent ccc6de5 commit 06f05ae
Show file tree
Hide file tree
Showing 18 changed files with 79 additions and 3 deletions.
6 changes: 6 additions & 0 deletions src/internal_modules/roc_pipeline/sender_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ status::StatusCode SenderLoop::write(audio::Frame& frame) {
return code;
}

status::StatusCode SenderLoop::flush() {
core::Mutex::Lock lock(sink_mutex_);

return sink_.flush();
}

core::nanoseconds_t SenderLoop::timestamp_imp() const {
return core::timestamp(core::ClockMonotonic);
}
Expand Down
1 change: 1 addition & 0 deletions src/internal_modules/roc_pipeline/sender_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class SenderLoop : public PipelineLoop, private sndio::ISink {
virtual core::nanoseconds_t latency() const;
virtual bool has_clock() const;
virtual status::StatusCode write(audio::Frame& frame);
virtual ROC_ATTR_NODISCARD status::StatusCode flush();

// Methods of PipelineLoop
virtual core::nanoseconds_t timestamp_imp() const;
Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_pipeline/sender_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,9 @@ status::StatusCode SenderSink::write(audio::Frame& frame) {
return code;
}

status::StatusCode SenderSink::flush() {
return status::StatusOK;
}

} // namespace pipeline
} // namespace roc
3 changes: 3 additions & 0 deletions src/internal_modules/roc_pipeline/sender_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> {
//! Write frame.
virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame);

//! Flush buffered data, if any.
virtual ROC_ATTR_NODISCARD status::StatusCode flush();

private:
SenderSinkConfig sink_config_;

Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_pipeline/transcoder_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,9 @@ status::StatusCode TranscoderSink::write(audio::Frame& frame) {
return frame_writer_->write(frame);
}

status::StatusCode TranscoderSink::flush() {
return status::StatusOK;
}

} // namespace pipeline
} // namespace roc
3 changes: 3 additions & 0 deletions src/internal_modules/roc_pipeline/transcoder_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class TranscoderSink : public sndio::ISink, public core::NonCopyable<> {
//! Write frame.
virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame);

//! Flush buffered data, if any.
virtual ROC_ATTR_NODISCARD status::StatusCode flush();

private:
audio::FrameFactory frame_factory_;

Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_sndio/isink.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "roc_audio/iframe_writer.h"
#include "roc_sndio/idevice.h"
#include "roc_status/status_code.h"

namespace roc {
namespace sndio {
Expand All @@ -22,6 +23,9 @@ namespace sndio {
class ISink : virtual public IDevice, public audio::IFrameWriter {
public:
virtual ~ISink();

//! Flush buffered data, if any.
virtual ROC_ATTR_NODISCARD status::StatusCode flush() = 0;
};

} // namespace sndio
Expand Down
7 changes: 7 additions & 0 deletions src/internal_modules/roc_sndio/pump.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ status::StatusCode Pump::run() {
if (code == status::StatusEnd) {
code = status::StatusOK; // EOF is fine
}
if (code == status::StatusOK) {
code = sink_.flush();
if (code != status::StatusOK) {
roc_log(LogError, "pump: got error when flushing sink: status=%s",
status::code_to_str(code));
}
}

roc_log(LogDebug, "pump: exiting main loop");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ status::StatusCode PulseaudioDevice::write(audio::Frame& frame) {
return handle_request_(frame.bytes(), frame.num_bytes());
}

status::StatusCode PulseaudioDevice::flush() {
return status::StatusOK;
}

status::StatusCode PulseaudioDevice::read(audio::Frame& frame,
packet::stream_timestamp_t duration,
audio::FrameReadMode mode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class PulseaudioDevice : public ISink, public ISource, public core::NonCopyable<
packet::stream_timestamp_t duration,
audio::FrameReadMode mode);

//! Flush buffered data, if any.
virtual ROC_ATTR_NODISCARD status::StatusCode flush();

private:
static void context_state_cb_(pa_context* context, void* userdata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ status::StatusCode SndfileSink::write(audio::Frame& frame) {
return status::StatusOK;
}

status::StatusCode SndfileSink::flush() {
return status::StatusOK;
}

status::StatusCode SndfileSink::open_(const char* driver, const char* path) {
if (!map_to_sndfile(&driver, path, file_info_)) {
roc_log(LogDebug,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class SndfileSink : public ISink, public core::NonCopyable<> {
//! Write frame.
virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame);

//! Flush buffered data, if any.
virtual ROC_ATTR_NODISCARD status::StatusCode flush();

private:
status::StatusCode open_(const char* driver, const char* path);
void close_();
Expand Down
18 changes: 15 additions & 3 deletions src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,21 @@ status::StatusCode SoxSink::write(audio::Frame& frame) {
}
}

const status::StatusCode code = write_(buffer_data, buffer_pos);
if (code != status::StatusOK) {
return code;
if (buffer_pos > 0) {
const status::StatusCode code = write_(buffer_data, buffer_pos);
if (code != status::StatusOK) {
return code;
}
}

return status::StatusOK;
}

status::StatusCode SoxSink::flush() {
if (output_ != NULL && driver_type_ == DriverType_File && output_->fp != NULL) {
if (fflush((FILE*)output_->fp) != 0) {
return status::StatusErrFile;
}
}

return status::StatusOK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class SoxSink : public ISink, public core::NonCopyable<> {
//! Write frame.
virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame);

//! Flush buffered data, if any.
virtual ROC_ATTR_NODISCARD status::StatusCode flush();

private:
status::StatusCode init_names_(const char* driver, const char* path);
status::StatusCode init_buffer_();
Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_sndio/wav_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ status::StatusCode WavSink::write(audio::Frame& frame) {
return status::StatusOK;
}

status::StatusCode WavSink::flush() {
return status::StatusOK;
}

status::StatusCode WavSink::open_(const char* path) {
if (output_file_) {
roc_panic("wav sink: already opened");
Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_sndio/wav_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class WavSink : public ISink, public core::NonCopyable<> {
//! Write frame.
virtual ROC_ATTR_NODISCARD status::StatusCode write(audio::Frame& frame);

//! Flush buffered data, if any.
virtual ROC_ATTR_NODISCARD status::StatusCode flush();

private:
status::StatusCode open_(const char* path);
void close_();
Expand Down
4 changes: 4 additions & 0 deletions src/tests/roc_pipeline/test_helpers/mock_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class MockSink : public sndio::ISink, public core::NonCopyable<> {
return status::StatusOK;
}

virtual ROC_ATTR_NODISCARD status::StatusCode flush() {
return status::StatusOK;
}

void expect_frames(size_t total) {
UNSIGNED_LONGS_EQUAL(total, n_frames_);
}
Expand Down
4 changes: 4 additions & 0 deletions src/tests/roc_sndio/test_helpers/mock_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class MockSink : public ISink {
}
}

virtual ROC_ATTR_NODISCARD status::StatusCode flush() {
return status::StatusOK;
}

private:
enum { MaxSz = 256 * 1024 };

Expand Down

0 comments on commit 06f05ae

Please sign in to comment.