From 5cf73559053f869ef62ab61720168703995d6abc Mon Sep 17 00:00:00 2001 From: Leonardo Invernizzi Date: Wed, 10 Jul 2024 18:03:57 +0000 Subject: [PATCH] gh-738: Use status codes in FEC block codecs --- src/internal_modules/roc_fec/block_reader.cpp | 36 +++++---- src/internal_modules/roc_fec/block_reader.h | 4 +- src/internal_modules/roc_fec/block_writer.cpp | 24 +++--- src/internal_modules/roc_fec/block_writer.h | 2 +- src/internal_modules/roc_fec/iblock_decoder.h | 5 +- src/internal_modules/roc_fec/iblock_encoder.h | 9 ++- .../roc_fec/openfec_decoder.cpp | 11 ++- .../target_openfec/roc_fec/openfec_decoder.h | 3 +- .../roc_fec/openfec_encoder.cpp | 13 +++- .../target_openfec/roc_fec/openfec_encoder.h | 3 +- .../roc_fec/test_block_encoder_decoder.cpp | 73 ++++++++++++++----- .../test_block_writer_reader_errors.cpp | 13 +++- 12 files changed, 138 insertions(+), 58 deletions(-) diff --git a/src/internal_modules/roc_fec/block_reader.cpp b/src/internal_modules/roc_fec/block_reader.cpp index e927e835b..c48e76f68 100644 --- a/src/internal_modules/roc_fec/block_reader.cpp +++ b/src/internal_modules/roc_fec/block_reader.cpp @@ -80,6 +80,11 @@ status::StatusCode BlockReader::read(packet::PacketPtr& pp, packet::PacketReadMo const status::StatusCode code = read_(pp, mode); + if (code != status::StatusOK && code != status::StatusDrain) { + pp = NULL; + return code; + } + if (!alive_) { pp = NULL; return status::StatusAbort; @@ -88,10 +93,8 @@ status::StatusCode BlockReader::read(packet::PacketPtr& pp, packet::PacketReadMo if (code == status::StatusOK && mode == packet::ModeFetch) { n_packets_++; } - return code; } - status::StatusCode BlockReader::read_(packet::PacketPtr& pp, packet::PacketReadMode mode) { const status::StatusCode code = fetch_all_packets_(); @@ -159,8 +162,12 @@ status::StatusCode BlockReader::get_next_packet_(packet::PacketPtr& result_pkt, if (pkt) { next_index = head_index_ + 1; } else { - // Try repairing as much as possible and store in block. - try_repair_(); + // try repairing as much as possible and store in block + const status::StatusCode status_code = try_repair_(); + if (status_code != status::StatusOK) { + roc_panic_if(status_code == status::StatusDrain); + return status_code; + } // Find first present packet in block, starting from head. for (next_index = head_index_; next_index < source_block_.size(); @@ -229,24 +236,26 @@ void BlockReader::next_block_() { fill_block_(); } -void BlockReader::try_repair_() { - if (!can_repair_) { - return; - } +bool BlockReader::is_block_resized_() const { + return source_block_resized_ && repair_block_resized_ && payload_resized_; +} - if (!source_block_resized_ || !repair_block_resized_ || !payload_resized_) { - return; +status::StatusCode BlockReader::try_repair_() { + if (!can_repair_ || !is_block_resized_()) { + return status::StatusOK; } - if (!block_decoder_.begin_block(source_block_.size(), repair_block_.size(), - payload_size_)) { + const status::StatusCode status_code = block_decoder_.begin_block( + source_block_.size(), repair_block_.size(), payload_size_); + + if (status_code != status::StatusOK) { roc_log(LogDebug, "fec block reader: can't begin decoder block, shutting down:" " sbl=%lu rbl=%lu payload_size=%lu", (unsigned long)source_block_.size(), (unsigned long)repair_block_.size(), (unsigned long)payload_size_); alive_ = false; - return; + return status_code; } for (size_t n = 0; n < source_block_.size(); n++) { @@ -284,6 +293,7 @@ void BlockReader::try_repair_() { block_decoder_.end_block(); can_repair_ = false; + return status::StatusOK; } packet::PacketPtr diff --git a/src/internal_modules/roc_fec/block_reader.h b/src/internal_modules/roc_fec/block_reader.h index 54458f6d7..fec7d1e79 100644 --- a/src/internal_modules/roc_fec/block_reader.h +++ b/src/internal_modules/roc_fec/block_reader.h @@ -96,7 +96,7 @@ class BlockReader : public packet::IReader, public core::NonCopyable<> { packet::PacketReadMode mode); void next_block_(); - void try_repair_(); + status::StatusCode try_repair_(); packet::PacketPtr parse_repaired_packet_(const core::Slice& buffer); @@ -128,6 +128,8 @@ class BlockReader : public packet::IReader, public core::NonCopyable<> { void update_block_duration_(const packet::PacketPtr& curr_block_pkt); + bool is_block_resized_() const; + IBlockDecoder& block_decoder_; packet::IReader& source_reader_; diff --git a/src/internal_modules/roc_fec/block_writer.cpp b/src/internal_modules/roc_fec/block_writer.cpp index c9b911752..f62658e34 100644 --- a/src/internal_modules/roc_fec/block_writer.cpp +++ b/src/internal_modules/roc_fec/block_writer.cpp @@ -120,8 +120,7 @@ status::StatusCode BlockWriter::write(const packet::PacketPtr& pp) { roc_panic_if(!pp); if (!alive_) { - // TODO(gh-183): return StatusDead - return status::StatusOK; + return status::StatusAbort; } validate_fec_packet_(pp); @@ -131,9 +130,9 @@ status::StatusCode BlockWriter::write(const packet::PacketPtr& pp) { } if (cur_packet_ == 0) { - if (!begin_block_(pp)) { - // TODO(gh-183): return status - return status::StatusOK; + const status::StatusCode status_code = begin_block_(pp); + if (status_code != status::StatusOK) { + return status_code; } } @@ -156,11 +155,13 @@ status::StatusCode BlockWriter::write(const packet::PacketPtr& pp) { return status::StatusOK; } -bool BlockWriter::begin_block_(const packet::PacketPtr& pp) { +status::StatusCode BlockWriter::begin_block_(const packet::PacketPtr& pp) { update_block_duration_(pp); if (!apply_sizes_(next_sblen_, next_rblen_, pp->fec()->payload.size())) { - return false; + roc_log(LogError, + "fec block writer: apply_sizes in begin_block_ failed with StatusNoMem"); + return status::StatusNoMem; } roc_log(LogTrace, @@ -168,15 +169,18 @@ bool BlockWriter::begin_block_(const packet::PacketPtr& pp) { (unsigned long)cur_sbn_, (unsigned long)cur_sblen_, (unsigned long)cur_rblen_, (unsigned long)cur_payload_size_); - if (!block_encoder_.begin_block(cur_sblen_, cur_rblen_, cur_payload_size_)) { + const status::StatusCode status_code = + block_encoder_.begin_block(cur_sblen_, cur_rblen_, cur_payload_size_); + + if (status_code != status::StatusOK) { roc_log(LogError, "fec block writer: can't begin encoder block, shutting down:" " sblen=%lu rblen=%lu", (unsigned long)cur_sblen_, (unsigned long)cur_rblen_); - return (alive_ = false); + alive_ = false; } - return true; + return status_code; } void BlockWriter::end_block_() { diff --git a/src/internal_modules/roc_fec/block_writer.h b/src/internal_modules/roc_fec/block_writer.h index d196e8746..c54980a2a 100644 --- a/src/internal_modules/roc_fec/block_writer.h +++ b/src/internal_modules/roc_fec/block_writer.h @@ -89,7 +89,7 @@ class BlockWriter : public packet::IWriter, public core::NonCopyable<> { virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&); private: - bool begin_block_(const packet::PacketPtr& pp); + status::StatusCode begin_block_(const packet::PacketPtr& pp); void end_block_(); void next_block_(); diff --git a/src/internal_modules/roc_fec/iblock_decoder.h b/src/internal_modules/roc_fec/iblock_decoder.h index de8c56c64..82d466e69 100644 --- a/src/internal_modules/roc_fec/iblock_decoder.h +++ b/src/internal_modules/roc_fec/iblock_decoder.h @@ -34,7 +34,10 @@ class IBlockDecoder { //! @remarks //! Performs an initial setup for a block. Should be called before //! any operations for the block. - virtual bool begin_block(size_t sblen, size_t rblen, size_t payload_size) = 0; + //! @returns status::StatusOK on success, or a specific error code on failure (e.g., + //! status::StatusNoMem if memory allocation fails). + virtual ROC_ATTR_NODISCARD status::StatusCode + begin_block(size_t sblen, size_t rblen, size_t payload_size) = 0; //! Store source or repair packet buffer for current block. //! @pre diff --git a/src/internal_modules/roc_fec/iblock_encoder.h b/src/internal_modules/roc_fec/iblock_encoder.h index ba1126c52..f0269c9a1 100644 --- a/src/internal_modules/roc_fec/iblock_encoder.h +++ b/src/internal_modules/roc_fec/iblock_encoder.h @@ -35,9 +35,12 @@ class IBlockEncoder { //! Start block. //! @remarks - //! Performs an initial setup for a block. Should be called before - //! any operations for the block. - virtual bool begin_block(size_t sblen, size_t rblen, size_t payload_size) = 0; + //! Performs an initial setup for a block. Should be called before any operations for + //! the block. + //! @returns status::StatusOK on success, or a specific error code on failure (e.g., + //! status::StatusNoMem if memory allocation fails). + virtual ROC_ATTR_NODISCARD status::StatusCode + begin_block(size_t sblen, size_t rblen, size_t payload_size) = 0; //! Store source or repair packet buffer for current block. //! @pre diff --git a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.cpp b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.cpp index 5163c6a0b..e17372a01 100644 --- a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.cpp +++ b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.cpp @@ -82,11 +82,16 @@ size_t OpenfecDecoder::max_block_length() const { return max_block_length_; } -bool OpenfecDecoder::begin_block(size_t sblen, size_t rblen, size_t payload_size) { +status::StatusCode +OpenfecDecoder::begin_block(size_t sblen, size_t rblen, size_t payload_size) { roc_panic_if(init_status_ != status::StatusOK); if (!resize_tabs_(sblen + rblen)) { - return false; + roc_log( + LogError, + "openfec decoder: failed to resize tabs in begin_block, sblen=%lu, rblen=%lu", + (unsigned long)sblen, (unsigned long)rblen); + return status::StatusNoMem; } sblen_ = sblen; @@ -97,7 +102,7 @@ bool OpenfecDecoder::begin_block(size_t sblen, size_t rblen, size_t payload_size update_session_params_(sblen, rblen, payload_size); reset_session_(); - return true; + return status::StatusOK; } void OpenfecDecoder::set_buffer(size_t index, const core::Slice& buffer) { diff --git a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.h b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.h index 91870ea44..aa95aba45 100644 --- a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.h +++ b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_decoder.h @@ -53,7 +53,8 @@ class OpenfecDecoder : public IBlockDecoder, public core::NonCopyable<> { virtual size_t max_block_length() const; //! Start block. - virtual bool begin_block(size_t sblen, size_t rblen, size_t payload_size); + virtual ROC_ATTR_NODISCARD status::StatusCode + begin_block(size_t sblen, size_t rblen, size_t payload_size); //! Store source or repair packet buffer for current block. virtual void set_buffer(size_t index, const core::Slice& buffer); diff --git a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.cpp b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.cpp index a37744189..c5fba6063 100644 --- a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.cpp +++ b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.cpp @@ -75,15 +75,20 @@ size_t OpenfecEncoder::buffer_alignment() const { return Alignment; } -bool OpenfecEncoder::begin_block(size_t sblen, size_t rblen, size_t payload_size) { +status::StatusCode +OpenfecEncoder::begin_block(size_t sblen, size_t rblen, size_t payload_size) { roc_panic_if(init_status_ != status::StatusOK); if (sblen_ == sblen && rblen_ == rblen && payload_size_ == payload_size) { - return true; + return status::StatusOK; } if (!resize_tabs_(sblen + rblen)) { - return false; + roc_log( + LogError, + "openfec encoder: failed to resize tabs in begin_block, sblen=%lu, rblen=%lu", + (unsigned long)sblen, (unsigned long)rblen); + return status::StatusNoMem; } sblen_ = sblen; @@ -93,7 +98,7 @@ bool OpenfecEncoder::begin_block(size_t sblen, size_t rblen, size_t payload_size update_session_params_(sblen, rblen, payload_size); reset_session_(); - return true; + return status::StatusOK; } void OpenfecEncoder::set_buffer(size_t index, const core::Slice& buffer) { diff --git a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.h b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.h index 046d86206..e5f8ad99a 100644 --- a/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.h +++ b/src/internal_modules/roc_fec/target_openfec/roc_fec/openfec_encoder.h @@ -56,7 +56,8 @@ class OpenfecEncoder : public IBlockEncoder, public core::NonCopyable<> { virtual size_t buffer_alignment() const; //! Start block. - virtual bool begin_block(size_t sblen, size_t rblen, size_t payload_size); + virtual ROC_ATTR_NODISCARD status::StatusCode + begin_block(size_t sblen, size_t rblen, size_t payload_size); //! Store packet data for current block. virtual void set_buffer(size_t index, const core::Slice& buffer); diff --git a/src/tests/roc_fec/test_block_encoder_decoder.cpp b/src/tests/roc_fec/test_block_encoder_decoder.cpp index f8c022dbf..cd1298173 100644 --- a/src/tests/roc_fec/test_block_encoder_decoder.cpp +++ b/src/tests/roc_fec/test_block_encoder_decoder.cpp @@ -10,10 +10,10 @@ #include "roc_core/array.h" #include "roc_core/fast_random.h" -#include "roc_core/heap_arena.h" #include "roc_core/log.h" #include "roc_core/scoped_ptr.h" #include "roc_fec/codec_map.h" +#include "test_helpers/mock_arena.h" namespace roc { namespace fec { @@ -22,19 +22,22 @@ namespace { const size_t MaxPayloadSize = 1024; -core::HeapArena arena; -packet::PacketFactory packet_factory(arena, MaxPayloadSize); - } // namespace class Codec { public: Codec(const CodecConfig& config) - : encoder_(CodecMap::instance().new_block_encoder(config, packet_factory, arena), - arena) - , decoder_(CodecMap::instance().new_block_decoder(config, packet_factory, arena), - arena) - , buffers_(arena) { + : arena_() + , packet_factory_(arena_, MaxPayloadSize) + , encoder_( + CodecMap::instance().new_block_encoder(config, packet_factory_, arena_), + arena_) + , decoder_( + CodecMap::instance().new_block_decoder(config, packet_factory_, arena_), + arena_) + , buffers_(arena_) { + set_fail(false); + CHECK(encoder_); CHECK(decoder_); @@ -45,7 +48,7 @@ class Codec { void encode(size_t n_source, size_t n_repair, size_t p_size) { CHECK(buffers_.resize(n_source + n_repair)); - CHECK(encoder_->begin_block(n_source, n_repair, p_size)); + LONGS_EQUAL(status::StatusOK, encoder_->begin_block(n_source, n_repair, p_size)); for (size_t i = 0; i < n_source + n_repair; ++i) { buffers_[i] = make_buffer_(p_size); @@ -83,9 +86,13 @@ class Codec { return buffers_[i]; } + void set_fail(bool fail) { + arena_.set_fail(fail); + } + private: core::Slice make_buffer_(size_t p_size) { - core::Slice buf = packet_factory.new_packet_buffer(); + core::Slice buf = packet_factory_.new_packet_buffer(); buf.reslice(0, p_size); for (size_t j = 0; j < buf.size(); ++j) { buf.data()[j] = (uint8_t)core::fast_random_range(0, 0xff); @@ -93,9 +100,10 @@ class Codec { return buf; } + test::MockArena arena_; + packet::PacketFactory packet_factory_; core::ScopedPtr encoder_; core::ScopedPtr decoder_; - core::Array > buffers_; }; @@ -111,7 +119,8 @@ TEST(block_encoder_decoder, without_loss) { Codec code(config); code.encode(NumSourcePackets, NumRepairPackets, PayloadSize); - CHECK( + LONGS_EQUAL( + status::StatusOK, code.decoder().begin_block(NumSourcePackets, NumRepairPackets, PayloadSize)); for (size_t i = 0; i < NumSourcePackets + NumRepairPackets; ++i) { @@ -123,6 +132,32 @@ TEST(block_encoder_decoder, without_loss) { } } +TEST(block_encoder_decoder, no_memory) { + enum { NumSourcePackets = 20, NumRepairPackets = 10, PayloadSize = 251 }; + + for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); n_scheme++) { + CodecConfig config; + config.scheme = CodecMap::instance().nth_scheme(n_scheme); + + { // test encoder + Codec code(config); + code.set_fail(true); + LONGS_EQUAL(status::StatusNoMem, + code.encoder().begin_block(NumSourcePackets, NumRepairPackets, + PayloadSize)); + } + + { // test decoder + Codec code(config); + code.encode(NumSourcePackets, NumRepairPackets, PayloadSize); + code.set_fail(true); + LONGS_EQUAL(status::StatusNoMem, + code.decoder().begin_block(NumSourcePackets, NumRepairPackets, + PayloadSize)); + } + } +} + TEST(block_encoder_decoder, lost_1) { enum { NumSourcePackets = 20, NumRepairPackets = 10, PayloadSize = 251 }; @@ -133,7 +168,8 @@ TEST(block_encoder_decoder, lost_1) { Codec code(config); code.encode(NumSourcePackets, NumRepairPackets, PayloadSize); - CHECK( + LONGS_EQUAL( + status::StatusOK, code.decoder().begin_block(NumSourcePackets, NumRepairPackets, PayloadSize)); for (size_t i = 0; i < NumSourcePackets + NumRepairPackets; ++i) { @@ -172,8 +208,9 @@ TEST(block_encoder_decoder, random_losses) { for (size_t test_num = 0; test_num < NumIterations; ++test_num) { code.encode(NumSourcePackets, NumRepairPackets, PayloadSize); - CHECK(code.decoder().begin_block(NumSourcePackets, NumRepairPackets, - PayloadSize)); + LONGS_EQUAL(status::StatusOK, + code.decoder().begin_block(NumSourcePackets, NumRepairPackets, + PayloadSize)); size_t curr_loss = 0; for (size_t i = 0; i < NumSourcePackets + NumRepairPackets; ++i) { @@ -214,7 +251,9 @@ TEST(block_encoder_decoder, full_repair_payload_sizes) { Codec code(config); code.encode(NumSourcePackets, NumRepairPackets, p_size); - CHECK(code.decoder().begin_block(NumSourcePackets, NumRepairPackets, p_size)); + LONGS_EQUAL( + status::StatusOK, + code.decoder().begin_block(NumSourcePackets, NumRepairPackets, p_size)); for (size_t i = NumSourcePackets; i < NumSourcePackets + NumRepairPackets; ++i) { diff --git a/src/tests/roc_fec/test_block_writer_reader_errors.cpp b/src/tests/roc_fec/test_block_writer_reader_errors.cpp index 4fc569b73..2b49ae0df 100644 --- a/src/tests/roc_fec/test_block_writer_reader_errors.cpp +++ b/src/tests/roc_fec/test_block_writer_reader_errors.cpp @@ -175,7 +175,9 @@ TEST(block_writer_reader_errors, writer_cant_resize_block) { CHECK(writer.resize(NumSourcePackets, BlockSize2)); for (size_t i = 0; i < NumSourcePackets; ++i) { - LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); + const status::StatusCode expected_status = + (i == 0) ? status::StatusNoMem : status::StatusAbort; + LONGS_EQUAL(expected_status, writer.write(generate_packet(sn++))); CHECK(!writer.is_alive()); } @@ -222,7 +224,9 @@ TEST(block_writer_reader_errors, writer_cant_encode_packet) { CHECK(writer.resize(BlockSize2, NumRepairPackets)); for (size_t i = 0; i < BlockSize2; ++i) { - LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); + const status::StatusCode expected_status = + (i == 0) ? status::StatusNoMem : status::StatusAbort; + LONGS_EQUAL(expected_status, writer.write(generate_packet(sn++))); CHECK(!writer.is_alive()); } @@ -384,9 +388,12 @@ TEST(block_writer_reader_errors, reader_cant_decode_packet) { // reader should get an error from arena when trying // to repair lost packet and shut down packet::PacketPtr pp; - LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); + LONGS_EQUAL(status::StatusNoMem, reader.read(pp, packet::ModeFetch)); CHECK(!pp); CHECK(!reader.is_alive()); + + // reader should get an abort error if it is not alive + LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); } TEST(block_writer_reader_errors, reader_cant_read_source_packet) {