Skip to content

Commit

Permalink
blosc codec: Eliminate unnecessary memset (and in some cases a copy)
Browse files Browse the repository at this point in the history
This eliminates the unnecessary memset (but not the unnecessary
copy) described here:
sgkit-dev/vcf-zarr-publication#161 (comment)

PiperOrigin-RevId: 691924460
Change-Id: I2cfe01659a6088278228763b7a36953125cf2380
  • Loading branch information
jbms authored and copybara-github committed Oct 31, 2024
1 parent 948d5f3 commit b2316ec
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 133 deletions.
3 changes: 0 additions & 3 deletions tensorstore/driver/zarr3/codec/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,9 @@ tensorstore_cc_library(
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/strings:str_format",
"@com_google_riegeli//riegeli/base:chain",
"@com_google_riegeli//riegeli/bytes:chain_reader",
"@com_google_riegeli//riegeli/bytes:cord_writer",
"@com_google_riegeli//riegeli/bytes:read_all",
"@com_google_riegeli//riegeli/bytes:reader",
"@com_google_riegeli//riegeli/bytes:write",
"@com_google_riegeli//riegeli/bytes:writer",
"@org_blosc_cblosc//:blosc",
],
Expand Down
52 changes: 2 additions & 50 deletions tensorstore/driver/zarr3/codec/blosc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@
#include <utility>

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include <blosc.h>
#include "riegeli/base/chain.h"
#include "riegeli/bytes/chain_reader.h"
#include "riegeli/bytes/cord_writer.h"
#include "riegeli/bytes/read_all.h"
#include "riegeli/bytes/reader.h"
#include "riegeli/bytes/write.h"
#include "riegeli/bytes/writer.h"
#include "tensorstore/driver/zarr3/codec/codec.h"
#include "tensorstore/driver/zarr3/codec/codec_spec.h"
Expand All @@ -53,63 +47,21 @@ namespace tensorstore {
namespace internal_zarr3 {
namespace {

// Buffers writes to a `std::string`, and then in `Done`, calls `blosc::Encode`
// and forwards the result to another `Writer`.
class BloscDeferredWriter : public riegeli::CordWriter<absl::Cord> {
public:
explicit BloscDeferredWriter(blosc::Options options,
riegeli::Writer& base_writer)
: CordWriter(riegeli::CordWriterBase::Options().set_max_block_size(
std::numeric_limits<size_t>::max())),
options_(std::move(options)),
base_writer_(base_writer) {}

void Done() override {
CordWriter::Done();
auto output = blosc::Encode(dest().Flatten(), options_);
if (!output.ok()) {
Fail(std::move(output).status());
return;
}
auto status = riegeli::Write(*std::move(output), base_writer_);
if (!status.ok()) {
Fail(std::move(status));
return;
}
}

private:
blosc::Options options_;
riegeli::Writer& base_writer_;
};

class BloscCodec : public ZarrBytesToBytesCodec {
public:
class State : public ZarrBytesToBytesCodec::PreparedState {
public:
Result<std::unique_ptr<riegeli::Writer>> GetEncodeWriter(
riegeli::Writer& encoded_writer) const final {
return std::make_unique<BloscDeferredWriter>(
return std::make_unique<blosc::BloscWriter>(
blosc::Options{codec_->cname.c_str(), codec_->clevel, codec_->shuffle,
codec_->blocksize, codec_->typesize},
encoded_writer);
}

Result<std::unique_ptr<riegeli::Reader>> GetDecodeReader(
riegeli::Reader& encoded_reader) const final {
auto output = riegeli::ReadAll(
encoded_reader,
[](absl::string_view input) -> absl::StatusOr<std::string> {
auto output = blosc::Decode(input);
if (!output.ok()) return std::move(output).status();
return *std::move(output);
});
auto reader = std::make_unique<riegeli::ChainReader<riegeli::Chain>>(
output.ok() ? riegeli::Chain(*std::move(output)) : riegeli::Chain());
if (!output.ok()) {
reader->Fail(std::move(output).status());
}
return reader;
return std::make_unique<blosc::BloscReader>(encoded_reader);
}

const BloscCodec* codec_;
Expand Down
17 changes: 9 additions & 8 deletions tensorstore/internal/compression/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@ tensorstore_cc_library(
deps = [
"//tensorstore/util:result",
"//tensorstore/util:str_cat",
"@com_google_absl//absl/functional:function_ref",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/strings:string_view",
"@com_google_absl//absl/types:optional",
"@com_google_riegeli//riegeli/base:types",
"@com_google_riegeli//riegeli/bytes:cord_writer",
"@com_google_riegeli//riegeli/bytes:read_all",
"@com_google_riegeli//riegeli/bytes:reader",
"@com_google_riegeli//riegeli/bytes:writer",
"@org_blosc_cblosc//:blosc",
],
)
Expand All @@ -35,15 +44,7 @@ tensorstore_cc_library(
"//tensorstore/util:quote_string",
"//tensorstore/util:str_cat",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:cord",
"@com_google_riegeli//riegeli/base:chain",
"@com_google_riegeli//riegeli/bytes:chain_reader",
"@com_google_riegeli//riegeli/bytes:cord_writer",
"@com_google_riegeli//riegeli/bytes:read_all",
"@com_google_riegeli//riegeli/bytes:reader",
"@com_google_riegeli//riegeli/bytes:write",
"@com_google_riegeli//riegeli/bytes:writer",
"@org_blosc_cblosc//:blosc",
],
Expand Down
151 changes: 138 additions & 13 deletions tensorstore/internal/compression/blosc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,182 @@

#include "tensorstore/internal/compression/blosc.h"

#include <cassert>
#include <cstddef>
#include <limits>
#include <string>
#include <string_view>
#include <utility>

#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include <blosc.h>
#include "riegeli/bytes/cord_writer.h"
#include "riegeli/bytes/read_all.h"
#include "riegeli/bytes/reader.h"
#include "riegeli/bytes/writer.h"
#include "tensorstore/util/result.h"
#include "tensorstore/util/str_cat.h"

namespace tensorstore {
namespace blosc {

Result<std::string> Encode(std::string_view input, const Options& options) {
std::string output;
auto result =
EncodeWithCallback(input, options, [&](size_t output_buffer_size) {
output.resize(output_buffer_size);
return output.data();
});
if (!result.ok()) return result.status();
output.erase(*result);
return output;
}

Result<size_t> EncodeWithCallback(
std::string_view input, const Options& options,
absl::FunctionRef<char*(size_t)> get_output_buffer) {
if (input.size() > BLOSC_MAX_BUFFERSIZE) {
return absl::InvalidArgumentError(tensorstore::StrCat(
"Blosc compression input of ", input.size(),
" bytes exceeds maximum size of ", BLOSC_MAX_BUFFERSIZE));
}
std::string output(input.size() + BLOSC_MAX_OVERHEAD, '\0');
size_t output_buffer_size = input.size() + BLOSC_MAX_OVERHEAD;
char* output_buffer = get_output_buffer(output_buffer_size);
if (!output_buffer) return 0;
int shuffle = options.shuffle;
if (shuffle == -1) {
shuffle = options.element_size == 1 ? BLOSC_BITSHUFFLE : BLOSC_SHUFFLE;
}
const int n = blosc_compress_ctx(
options.clevel, shuffle, options.element_size, input.size(), input.data(),
output.data(), output.size(), options.compressor, options.blocksize,
output_buffer, output_buffer_size, options.compressor, options.blocksize,
/*numinternalthreads=*/1);
if (n < 0) {
return absl::InternalError(
tensorstore::StrCat("Internal blosc error: ", n));
}
output.erase(n);
return output;
return n;
}

Result<std::string> Decode(std::string_view input) {
size_t nbytes;
if (blosc_cbuffer_validate(input.data(), input.size(), &nbytes) != 0) {
return absl::InvalidArgumentError("Invalid blosc-compressed data");
}
std::string output(nbytes, '\0');
std::string output;
auto result = DecodeWithCallback(input, [&](size_t n) {
output.resize(n);
return output.data();
});
if (!result.ok()) return result.status();
return output;
}

Result<size_t> DecodeWithCallback(
std::string_view input,
absl::FunctionRef<char*(size_t)> get_output_buffer) {
TENSORSTORE_ASSIGN_OR_RETURN(size_t nbytes, GetDecodedSize(input));
char* output_buffer = get_output_buffer(nbytes);
if (!output_buffer) return 0;
if (nbytes > 0) {
const int n =
blosc_decompress_ctx(input.data(), output.data(), output.size(),
/*numinternalthreads=*/1);
const int n = blosc_decompress_ctx(input.data(), output_buffer, nbytes,
/*numinternalthreads=*/1);
if (n <= 0) {
return absl::InvalidArgumentError(
tensorstore::StrCat("Blosc error: ", n));
}
}
return output;
return nbytes;
}

Result<size_t> GetDecodedSize(std::string_view input) {
size_t nbytes;
if (blosc_cbuffer_validate(input.data(), input.size(), &nbytes) != 0) {
return absl::InvalidArgumentError("Invalid blosc-compressed data");
}
return nbytes;
}

BloscWriter::BloscWriter(const blosc::Options& options,
riegeli::Writer& base_writer)
: CordWriter(riegeli::CordWriterBase::Options().set_max_block_size(
std::numeric_limits<size_t>::max())),
options_(options),
base_writer_(base_writer) {}

void BloscWriter::Done() {
CordWriter::Done();
auto result = blosc::EncodeWithCallback(dest().Flatten(), options_,
[&](size_t n) -> char* {
if (!base_writer_.Push(n)) {
Fail(base_writer_.status());
return nullptr;
}
return base_writer_.cursor();
});
if (!result.ok()) {
Fail(std::move(result).status());
return;
}
if (!*result) {
// Already failed when encoding.
return;
}
base_writer_.move_cursor(*result);
if (!base_writer_.Close()) {
Fail(base_writer_.status());
return;
}
}

BloscReader::BloscReader(riegeli::Reader& base_reader)
: base_reader_(base_reader) {
if (auto status = riegeli::ReadAll(base_reader_, encoded_data_);
!status.ok()) {
Fail(std::move(status));
return;
}
if (auto result = blosc::GetDecodedSize(encoded_data_); result.ok()) {
decoded_size_ = *result;
} else {
Fail(std::move(result).status());
}
}

bool BloscReader::ToleratesReadingAhead() { return true; }
bool BloscReader::SupportsSize() { return true; }
bool BloscReader::PullSlow(size_t min_length, size_t recommended_length) {
if (decoded_size_ == 0 || start() != nullptr || pos() > 0) {
// Data was already decoded. The precondition `min_length > available()`
// for this method implies that `min_length` would exceed EOF.
return false;
}
auto result = blosc::DecodeWithCallback(encoded_data_, [&](size_t n) {
assert(n == decoded_size_);
auto* buffer = new char[n];
buffer_.reset(buffer);
set_buffer(buffer, n);
move_limit_pos(n);
return buffer;
});
if (!result.ok()) {
Fail(std::move(result).status());
return false;
}
return min_length <= decoded_size_;
}

bool BloscReader::ReadSlow(size_t length, char* dest) {
if (decoded_size_ == 0 || start() != nullptr || pos() > 0 ||
length < decoded_size_) {
// Use default implementation which may call `PullSlow`.
return Reader::ReadSlow(length, dest);
}
if (auto result = blosc::DecodeWithCallback(encoded_data_,
[&](size_t n) { return dest; });
!result.ok()) {
Fail(std::move(result).status());
return false;
}
move_limit_pos(decoded_size_);
return length == decoded_size_;
}

} // namespace blosc
Expand Down
Loading

0 comments on commit b2316ec

Please sign in to comment.