Skip to content

Commit

Permalink
[Feature] Add Orc Chunk Writer (StarRocks#31473)
Browse files Browse the repository at this point in the history
Signed-off-by: YinZhengSun <[email protected]>
Signed-off-by: Moonm3n <[email protected]>
  • Loading branch information
YinZheng-Sun authored and Moonm3n committed Oct 31, 2023
1 parent 02af40b commit 1587c9a
Show file tree
Hide file tree
Showing 11 changed files with 2,108 additions and 1 deletion.
1 change: 1 addition & 0 deletions be/src/bench/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ endif()
ADD_BE_BENCH(${SRC_DIR}/bench/chunks_sorter_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/runtime_filter_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/csv_reader_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/orc_writer_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/shuffle_chunk_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/parquet_writer_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/parquet_writer_array_bench)
Expand Down
145 changes: 145 additions & 0 deletions be/src/bench/orc_writer_bench.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <benchmark/benchmark.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include <memory>
#include <vector>

#include "bench.h"
#include "column/chunk.h"
#include "common/statusor.h"
#include "formats/orc/orc_chunk_writer.h"
#include "fs/fs.h"
#include "fs/fs_posix.h"
#include "runtime/descriptor_helper.h"
#include "testutil/assert.h"
#include "util/time.h"

namespace starrocks::orc {

const int random_seed = 42;

inline std::shared_ptr<Chunk> make_chunk(int num_rows, int null_percent) {
std::srand(random_seed);

std::vector<int32_t> values(num_rows);
std::vector<uint8_t> is_null(num_rows, 0);

for (int i = 0; i < num_rows; i++) {
if (std::rand() % 100 < null_percent) {
is_null[i] = 1;
} else {
values[i] = std::rand();
}
}

auto chunk = std::make_shared<Chunk>();
auto data_column = Int32Column::create();
data_column->append_numbers(values.data(), values.size() * sizeof(int32));
auto null_column = UInt8Column::create();
null_column->append_numbers(is_null.data(), is_null.size());
auto col = NullableColumn::create(data_column, null_column);

chunk->append_column(col, 0);
return chunk;
}

inline std::vector<TypeDescriptor> _make_type_descs() {
return {TypeDescriptor::from_logical_type(TYPE_INT)};
}

inline StatusOr<std::unique_ptr<::orc::Type>> _make_schema() {
auto type_descs = _make_type_descs();
std::vector<std::string> column_names{"INT"};
return OrcChunkWriter::make_schema(column_names, type_descs);
}

inline std::unique_ptr<OrcChunkWriter> make_starrocks_writer(std::unique_ptr<WritableFile> file) {
auto writer_options = std::make_shared<::orc::WriterOptions>();
auto type_descs = _make_type_descs();
ASSIGN_OR_ABORT(auto schema, _make_schema());
auto chunk_writer =
std::make_unique<OrcChunkWriter>(std::move(file), writer_options, type_descs, std::move(schema));
writer_options->setCompression(orc::CompressionKind::CompressionKind_SNAPPY);
return chunk_writer;
}

static void Benchmark_OrcChunkWriterArgs(benchmark::internal::Benchmark* b) {
// num rows: 100000, 1000000, 10000000
std::vector<int64_t> bm_num_chunks = {100, 1000, 10000};
std::vector<int> bm_null_percent = {0, 5, 25, 50, 75}; // percentage of null values
for (auto& num_chunk : bm_num_chunks) {
for (auto& null_percent : bm_null_percent) {
b->Args({num_chunk, null_percent});
}
}
}

static void Benchmark_StarRocksOrcWriter(benchmark::State& state) {
auto fs = new_fs_posix();
const std::string file_path = "./be/test/exec/test_data/orc_scanner/starrocks_writer.orc";
fs->delete_file(file_path);

auto chunk_nums = state.range(0);
auto null_percent = state.range(1);
auto chunk = make_chunk(1000, null_percent);

for (auto _ : state) {
state.PauseTiming();
ASSIGN_OR_ABORT(auto file, fs->new_writable_file(file_path));
auto writer = make_starrocks_writer(std::move(file));

state.ResumeTiming();
for (int i = 0; i < chunk_nums; ++i) {
try {
writer->write(chunk.get());
} catch (std::exception& e) {
auto s = strings::Substitute("Write ORC File Failed. reason = $0", e.what());
LOG(WARNING) << s;
return;
}
}
writer->close();
state.PauseTiming();

fs->delete_file(file_path);
}

{
ASSIGN_OR_ABORT(auto file, fs->new_writable_file(file_path));
auto writer = make_starrocks_writer(std::move(file));
for (int i = 0; i < chunk_nums; ++i) {
try {
writer->write(chunk.get());
} catch (std::exception& e) {
auto s = strings::Substitute("Write ORC File Failed. reason = $0", e.what());
LOG(WARNING) << s;
return;
}
}
writer->close();
}
}

BENCHMARK(Benchmark_StarRocksOrcWriter)
->Apply(Benchmark_OrcChunkWriterArgs)
->Unit(benchmark::kMillisecond)
->MinTime(10);

} // namespace starrocks::orc

BENCHMARK_MAIN();
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ set(EXEC_FILES
except_node.cpp
file_scanner.cpp
orc_scanner.cpp
orc_writer.cpp
jni_scanner.cpp
arrow_to_starrocks_converter.cpp
arrow_to_json_converter.cpp
Expand Down
102 changes: 102 additions & 0 deletions be/src/exec/orc_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/orc_writer.h"

namespace starrocks {
RollingAsyncOrcWriter::RollingAsyncOrcWriter(const ORCInfo& orc_info, const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile,
std::function<void(AsyncOrcChunkWriter*, RuntimeState*)> commit_func,
RuntimeState* state, int32_t driver_id)
: _orc_info(orc_info),
_output_expr_ctxs(output_expr_ctxs),
_parent_profile(parent_profile),
_commit_func(commit_func),
_state(state),
_driver_id(driver_id) {}

Status RollingAsyncOrcWriter::init() {
ASSIGN_OR_RETURN(_fs,
FileSystem::CreateUniqueFromString(_orc_info.partition_location, FSOptions(&_orc_info.cloud_conf)))
_schema = _orc_info.schema;
_partition_location = _orc_info.partition_location;
return Status::OK();
}

std::string RollingAsyncOrcWriter::_new_file_location() {
_file_cnt += 1;
_outfile_location = _partition_location +
fmt::format("{}_{}_{}.orc", print_id(_state->fragment_instance_id()), _driver_id, _file_cnt);
return _outfile_location;
}

Status RollingAsyncOrcWriter::_new_chunk_writer() {
std::string new_file_location = _new_file_location();
WritableFileOptions options{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
ASSIGN_OR_RETURN(auto writable_file, _fs->new_writable_file(options, new_file_location))

_writer = std::make_shared<AsyncOrcChunkWriter>(std::move(writable_file), _options, _schema, _output_expr_ctxs,
ExecEnv::GetInstance()->pipeline_sink_io_pool(), _parent_profile);

return Status::OK();
}

Status RollingAsyncOrcWriter::append_chunk(Chunk* chunk, RuntimeState* state) {
if (_writer == nullptr) {
RETURN_IF_ERROR(_new_chunk_writer());
}
// exceed file size
if (_writer->file_size() > _max_file_size) {
RETURN_IF_ERROR(close_current_writer(state));
RETURN_IF_ERROR(_new_chunk_writer());
}
return _writer->write(chunk);
}

Status RollingAsyncOrcWriter::close_current_writer(RuntimeState* state) {
Status st = _writer->close(state, _commit_func);
if (st.ok()) {
_pending_commits.emplace_back(_writer);
return Status::OK();
} else {
LOG(WARNING) << "close file error: " << _outfile_location;
return Status::IOError("close file error!");
}
}

Status RollingAsyncOrcWriter::close(RuntimeState* state) {
if (_writer != nullptr) {
RETURN_IF_ERROR(close_current_writer(state));
}
return Status::OK();
}

bool RollingAsyncOrcWriter::closed() {
for (auto& writer : _pending_commits) {
if (writer != nullptr && writer->closed()) {
writer = nullptr;
}
if (writer != nullptr && (!writer->closed())) {
return false;
}
}

if (_writer != nullptr) {
return _writer->closed();
}

return true;
}

} // namespace starrocks
71 changes: 71 additions & 0 deletions be/src/exec/orc_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "common/logging.h"
#include "exec/pipeline/fragment_context.h"
#include "formats/orc/orc_chunk_writer.h"
#include "fs/fs.h"
#include "gen_cpp/Types_types.h"
#include "runtime/runtime_state.h"

namespace starrocks {

struct ORCInfo {
TCompressionType::type compress_type = TCompressionType::SNAPPY;
std::string partition_location = "";
std::shared_ptr<orc::Type> schema;
TCloudConfiguration cloud_conf;
};

class RollingAsyncOrcWriter {
public:
RollingAsyncOrcWriter(const ORCInfo& orc_info, const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile,
std::function<void(AsyncOrcChunkWriter*, RuntimeState*)> commit_func, RuntimeState* state,
int32_t driver_id);

~RollingAsyncOrcWriter() = default;

Status init();
Status append_chunk(Chunk* chunk, RuntimeState* state);
Status close(RuntimeState* state);
bool writable() const { return _writer == nullptr || _writer->writable(); }
bool closed();
Status close_current_writer(RuntimeState* state);

private:
Status _new_chunk_writer();
std::string _new_file_location();

std::unique_ptr<FileSystem> _fs;
std::shared_ptr<AsyncOrcChunkWriter> _writer;
std::shared_ptr<orc::WriterOptions> _options;
std::shared_ptr<orc::Type> _schema;
std::string _partition_location;
ORCInfo _orc_info;
int32_t _file_cnt = 0;

std::string _outfile_location;
std::vector<std::shared_ptr<AsyncOrcChunkWriter>> _pending_commits;
int64_t _max_file_size = 1 * 1024 * 1024 * 1024; // 1GB

std::vector<ExprContext*> _output_expr_ctxs;
RuntimeProfile* _parent_profile;
std::function<void(AsyncOrcChunkWriter*, RuntimeState*)> _commit_func;
RuntimeState* _state;
int32_t _driver_id;
};
} // namespace starrocks
1 change: 1 addition & 0 deletions be/src/formats/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ add_library(Formats STATIC
avro/numeric_column.cpp
avro/binary_column.cpp
orc/orc_chunk_reader.cpp
orc/orc_chunk_writer.cpp
orc/orc_input_stream.cpp
orc/orc_mapping.cpp
orc/orc_memory_pool.cpp
Expand Down
Loading

0 comments on commit 1587c9a

Please sign in to comment.