From 1587c9ab247acd2cf1336e9403ef60ee3f54de59 Mon Sep 17 00:00:00 2001 From: YinZheng-Sun <57469177+YinZheng-Sun@users.noreply.github.com> Date: Tue, 31 Oct 2023 13:44:34 +0800 Subject: [PATCH] [Feature] Add Orc Chunk Writer (#31473) Signed-off-by: YinZhengSun <51255903009@stu.ecnu.edu.cn> Signed-off-by: Moonm3n --- be/src/bench/CMakeLists.txt | 1 + be/src/bench/orc_writer_bench.cpp | 145 +++ be/src/exec/CMakeLists.txt | 1 + be/src/exec/orc_writer.cpp | 102 ++ be/src/exec/orc_writer.h | 71 ++ be/src/formats/CMakeLists.txt | 1 + be/src/formats/orc/orc_chunk_writer.cpp | 736 +++++++++++++++ be/src/formats/orc/orc_chunk_writer.h | 150 +++ be/src/formats/orc/utils.h | 9 +- be/test/CMakeLists.txt | 1 + be/test/formats/orc/orc_chunk_writer_test.cpp | 892 ++++++++++++++++++ 11 files changed, 2108 insertions(+), 1 deletion(-) create mode 100644 be/src/bench/orc_writer_bench.cpp create mode 100644 be/src/exec/orc_writer.cpp create mode 100644 be/src/exec/orc_writer.h create mode 100644 be/src/formats/orc/orc_chunk_writer.cpp create mode 100644 be/src/formats/orc/orc_chunk_writer.h create mode 100644 be/test/formats/orc/orc_chunk_writer_test.cpp diff --git a/be/src/bench/CMakeLists.txt b/be/src/bench/CMakeLists.txt index ea4ae7e71e97e7..d7143122f9a5b2 100644 --- a/be/src/bench/CMakeLists.txt +++ b/be/src/bench/CMakeLists.txt @@ -19,6 +19,7 @@ endif() ADD_BE_BENCH(${SRC_DIR}/bench/chunks_sorter_bench) ADD_BE_BENCH(${SRC_DIR}/bench/runtime_filter_bench) ADD_BE_BENCH(${SRC_DIR}/bench/csv_reader_bench) +ADD_BE_BENCH(${SRC_DIR}/bench/orc_writer_bench) ADD_BE_BENCH(${SRC_DIR}/bench/shuffle_chunk_bench) ADD_BE_BENCH(${SRC_DIR}/bench/parquet_writer_bench) ADD_BE_BENCH(${SRC_DIR}/bench/parquet_writer_array_bench) diff --git a/be/src/bench/orc_writer_bench.cpp b/be/src/bench/orc_writer_bench.cpp new file mode 100644 index 00000000000000..7b22ed29bb2374 --- /dev/null +++ b/be/src/bench/orc_writer_bench.cpp @@ -0,0 +1,145 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include + +#include "bench.h" +#include "column/chunk.h" +#include "common/statusor.h" +#include "formats/orc/orc_chunk_writer.h" +#include "fs/fs.h" +#include "fs/fs_posix.h" +#include "runtime/descriptor_helper.h" +#include "testutil/assert.h" +#include "util/time.h" + +namespace starrocks::orc { + +const int random_seed = 42; + +inline std::shared_ptr make_chunk(int num_rows, int null_percent) { + std::srand(random_seed); + + std::vector values(num_rows); + std::vector is_null(num_rows, 0); + + for (int i = 0; i < num_rows; i++) { + if (std::rand() % 100 < null_percent) { + is_null[i] = 1; + } else { + values[i] = std::rand(); + } + } + + auto chunk = std::make_shared(); + auto data_column = Int32Column::create(); + data_column->append_numbers(values.data(), values.size() * sizeof(int32)); + auto null_column = UInt8Column::create(); + null_column->append_numbers(is_null.data(), is_null.size()); + auto col = NullableColumn::create(data_column, null_column); + + chunk->append_column(col, 0); + return chunk; +} + +inline std::vector _make_type_descs() { + return {TypeDescriptor::from_logical_type(TYPE_INT)}; +} + +inline StatusOr> _make_schema() { + auto type_descs = _make_type_descs(); + std::vector column_names{"INT"}; + return OrcChunkWriter::make_schema(column_names, type_descs); +} + +inline std::unique_ptr make_starrocks_writer(std::unique_ptr file) { + auto writer_options = std::make_shared<::orc::WriterOptions>(); + auto type_descs = _make_type_descs(); + ASSIGN_OR_ABORT(auto schema, _make_schema()); + auto chunk_writer = + std::make_unique(std::move(file), writer_options, type_descs, std::move(schema)); + writer_options->setCompression(orc::CompressionKind::CompressionKind_SNAPPY); + return chunk_writer; +} + +static void Benchmark_OrcChunkWriterArgs(benchmark::internal::Benchmark* b) { + // num rows: 100000, 1000000, 10000000 + std::vector bm_num_chunks = {100, 1000, 10000}; + std::vector bm_null_percent = {0, 5, 25, 50, 75}; // percentage of null values + for (auto& num_chunk : bm_num_chunks) { + for (auto& null_percent : bm_null_percent) { + b->Args({num_chunk, null_percent}); + } + } +} + +static void Benchmark_StarRocksOrcWriter(benchmark::State& state) { + auto fs = new_fs_posix(); + const std::string file_path = "./be/test/exec/test_data/orc_scanner/starrocks_writer.orc"; + fs->delete_file(file_path); + + auto chunk_nums = state.range(0); + auto null_percent = state.range(1); + auto chunk = make_chunk(1000, null_percent); + + for (auto _ : state) { + state.PauseTiming(); + ASSIGN_OR_ABORT(auto file, fs->new_writable_file(file_path)); + auto writer = make_starrocks_writer(std::move(file)); + + state.ResumeTiming(); + for (int i = 0; i < chunk_nums; ++i) { + try { + writer->write(chunk.get()); + } catch (std::exception& e) { + auto s = strings::Substitute("Write ORC File Failed. reason = $0", e.what()); + LOG(WARNING) << s; + return; + } + } + writer->close(); + state.PauseTiming(); + + fs->delete_file(file_path); + } + + { + ASSIGN_OR_ABORT(auto file, fs->new_writable_file(file_path)); + auto writer = make_starrocks_writer(std::move(file)); + for (int i = 0; i < chunk_nums; ++i) { + try { + writer->write(chunk.get()); + } catch (std::exception& e) { + auto s = strings::Substitute("Write ORC File Failed. reason = $0", e.what()); + LOG(WARNING) << s; + return; + } + } + writer->close(); + } +} + +BENCHMARK(Benchmark_StarRocksOrcWriter) + ->Apply(Benchmark_OrcChunkWriterArgs) + ->Unit(benchmark::kMillisecond) + ->MinTime(10); + +} // namespace starrocks::orc + +BENCHMARK_MAIN(); \ No newline at end of file diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 2b099cc2af03c3..05d62db4fa2034 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -79,6 +79,7 @@ set(EXEC_FILES except_node.cpp file_scanner.cpp orc_scanner.cpp + orc_writer.cpp jni_scanner.cpp arrow_to_starrocks_converter.cpp arrow_to_json_converter.cpp diff --git a/be/src/exec/orc_writer.cpp b/be/src/exec/orc_writer.cpp new file mode 100644 index 00000000000000..ee2bc025cda25d --- /dev/null +++ b/be/src/exec/orc_writer.cpp @@ -0,0 +1,102 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/orc_writer.h" + +namespace starrocks { +RollingAsyncOrcWriter::RollingAsyncOrcWriter(const ORCInfo& orc_info, const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile, + std::function commit_func, + RuntimeState* state, int32_t driver_id) + : _orc_info(orc_info), + _output_expr_ctxs(output_expr_ctxs), + _parent_profile(parent_profile), + _commit_func(commit_func), + _state(state), + _driver_id(driver_id) {} + +Status RollingAsyncOrcWriter::init() { + ASSIGN_OR_RETURN(_fs, + FileSystem::CreateUniqueFromString(_orc_info.partition_location, FSOptions(&_orc_info.cloud_conf))) + _schema = _orc_info.schema; + _partition_location = _orc_info.partition_location; + return Status::OK(); +} + +std::string RollingAsyncOrcWriter::_new_file_location() { + _file_cnt += 1; + _outfile_location = _partition_location + + fmt::format("{}_{}_{}.orc", print_id(_state->fragment_instance_id()), _driver_id, _file_cnt); + return _outfile_location; +} + +Status RollingAsyncOrcWriter::_new_chunk_writer() { + std::string new_file_location = _new_file_location(); + WritableFileOptions options{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE}; + ASSIGN_OR_RETURN(auto writable_file, _fs->new_writable_file(options, new_file_location)) + + _writer = std::make_shared(std::move(writable_file), _options, _schema, _output_expr_ctxs, + ExecEnv::GetInstance()->pipeline_sink_io_pool(), _parent_profile); + + return Status::OK(); +} + +Status RollingAsyncOrcWriter::append_chunk(Chunk* chunk, RuntimeState* state) { + if (_writer == nullptr) { + RETURN_IF_ERROR(_new_chunk_writer()); + } + // exceed file size + if (_writer->file_size() > _max_file_size) { + RETURN_IF_ERROR(close_current_writer(state)); + RETURN_IF_ERROR(_new_chunk_writer()); + } + return _writer->write(chunk); +} + +Status RollingAsyncOrcWriter::close_current_writer(RuntimeState* state) { + Status st = _writer->close(state, _commit_func); + if (st.ok()) { + _pending_commits.emplace_back(_writer); + return Status::OK(); + } else { + LOG(WARNING) << "close file error: " << _outfile_location; + return Status::IOError("close file error!"); + } +} + +Status RollingAsyncOrcWriter::close(RuntimeState* state) { + if (_writer != nullptr) { + RETURN_IF_ERROR(close_current_writer(state)); + } + return Status::OK(); +} + +bool RollingAsyncOrcWriter::closed() { + for (auto& writer : _pending_commits) { + if (writer != nullptr && writer->closed()) { + writer = nullptr; + } + if (writer != nullptr && (!writer->closed())) { + return false; + } + } + + if (_writer != nullptr) { + return _writer->closed(); + } + + return true; +} + +} // namespace starrocks \ No newline at end of file diff --git a/be/src/exec/orc_writer.h b/be/src/exec/orc_writer.h new file mode 100644 index 00000000000000..3aba32f3c65d8d --- /dev/null +++ b/be/src/exec/orc_writer.h @@ -0,0 +1,71 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "common/logging.h" +#include "exec/pipeline/fragment_context.h" +#include "formats/orc/orc_chunk_writer.h" +#include "fs/fs.h" +#include "gen_cpp/Types_types.h" +#include "runtime/runtime_state.h" + +namespace starrocks { + +struct ORCInfo { + TCompressionType::type compress_type = TCompressionType::SNAPPY; + std::string partition_location = ""; + std::shared_ptr schema; + TCloudConfiguration cloud_conf; +}; + +class RollingAsyncOrcWriter { +public: + RollingAsyncOrcWriter(const ORCInfo& orc_info, const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile, + std::function commit_func, RuntimeState* state, + int32_t driver_id); + + ~RollingAsyncOrcWriter() = default; + + Status init(); + Status append_chunk(Chunk* chunk, RuntimeState* state); + Status close(RuntimeState* state); + bool writable() const { return _writer == nullptr || _writer->writable(); } + bool closed(); + Status close_current_writer(RuntimeState* state); + +private: + Status _new_chunk_writer(); + std::string _new_file_location(); + + std::unique_ptr _fs; + std::shared_ptr _writer; + std::shared_ptr _options; + std::shared_ptr _schema; + std::string _partition_location; + ORCInfo _orc_info; + int32_t _file_cnt = 0; + + std::string _outfile_location; + std::vector> _pending_commits; + int64_t _max_file_size = 1 * 1024 * 1024 * 1024; // 1GB + + std::vector _output_expr_ctxs; + RuntimeProfile* _parent_profile; + std::function _commit_func; + RuntimeState* _state; + int32_t _driver_id; +}; +} // namespace starrocks \ No newline at end of file diff --git a/be/src/formats/CMakeLists.txt b/be/src/formats/CMakeLists.txt index 31d878fe2f5a05..e4e48c204c1866 100644 --- a/be/src/formats/CMakeLists.txt +++ b/be/src/formats/CMakeLists.txt @@ -39,6 +39,7 @@ add_library(Formats STATIC avro/numeric_column.cpp avro/binary_column.cpp orc/orc_chunk_reader.cpp + orc/orc_chunk_writer.cpp orc/orc_input_stream.cpp orc/orc_mapping.cpp orc/orc_memory_pool.cpp diff --git a/be/src/formats/orc/orc_chunk_writer.cpp b/be/src/formats/orc/orc_chunk_writer.cpp new file mode 100644 index 00000000000000..ec3e81baa627e7 --- /dev/null +++ b/be/src/formats/orc/orc_chunk_writer.cpp @@ -0,0 +1,736 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "formats/orc/orc_chunk_writer.h" + +#include "column/array_column.h" +#include "column/chunk.h" +#include "column/map_column.h" +#include "column/struct_column.h" +#include "gutil/strings/substitute.h" + +namespace starrocks { + +/* +** ORCOutputStream +*/ +OrcOutputStream::OrcOutputStream(std::unique_ptr wfile) : _wfile(std::move(wfile)) {} + +OrcOutputStream::~OrcOutputStream() { + if (!_is_closed) { + close(); + } +} + +uint64_t OrcOutputStream::getLength() const { + return _wfile->size(); +} + +uint64_t OrcOutputStream::getNaturalWriteSize() const { + return config::vector_chunk_size; +} + +const std::string& OrcOutputStream::getName() const { + return _wfile->filename(); +} + +void OrcOutputStream::write(const void* buf, size_t length) { + if (_is_closed) { + throw "The output stream is closed but there are still inputs"; + } + const char* ch = reinterpret_cast(buf); + Status st = _wfile->append(Slice(ch, length)); + if (!st.ok()) { + throw "write to orc failed: " + st.to_string(); + } + return; +} + +void OrcOutputStream::close() { + if (_is_closed) { + throw "The output stream is already closed"; + } + Status st = _wfile->close(); + if (!st.ok()) { + throw "close orc output stream failed: " + st.to_string(); + } + _is_closed = true; + return; +} + +OrcChunkWriter::OrcChunkWriter(std::unique_ptr writable_file, + std::shared_ptr writer_options, std::shared_ptr schema, + const std::vector& output_expr_ctxs) + : _output_stream(std::move(writable_file)) { + _type_descs.reserve(output_expr_ctxs.size()); + for (auto expr : output_expr_ctxs) { + _type_descs.push_back(expr->root()->type()); + } + _writer_options = writer_options; + _schema = schema; +} + +Status OrcChunkWriter::set_compression(const TCompressionType::type& compression_type) { + switch (compression_type) { + case TCompressionType::SNAPPY: { + _writer_options->setCompression(orc::CompressionKind::CompressionKind_SNAPPY); + break; + } + case TCompressionType::ZLIB: { + _writer_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB); + break; + } + case TCompressionType::ZSTD: { + _writer_options->setCompression(orc::CompressionKind::CompressionKind_ZSTD); + break; + } + case TCompressionType::LZ4: { + _writer_options->setCompression(orc::CompressionKind::CompressionKind_LZ4); + break; + } + case TCompressionType::LZO: { + _writer_options->setCompression(orc::CompressionKind::CompressionKind_LZO); + break; + } + case TCompressionType::NO_COMPRESSION: { + _writer_options->setCompression(orc::CompressionKind::CompressionKind_NONE); + break; + } + default: + return Status::NotSupported("The Compression Type is not supported"); + } + return Status::OK(); +} + +StatusOr> OrcChunkWriter::_get_orc_type(const TypeDescriptor& type_desc) { + switch (type_desc.type) { + case TYPE_BOOLEAN: { + return orc::createPrimitiveType(orc::TypeKind::BOOLEAN); + } + case TYPE_TINYINT: + [[fallthrough]]; + case TYPE_UNSIGNED_TINYINT: { + return orc::createPrimitiveType(orc::TypeKind::BYTE); + } + case TYPE_SMALLINT: + [[fallthrough]]; + case TYPE_UNSIGNED_SMALLINT: { + return orc::createPrimitiveType(orc::TypeKind::SHORT); + } + case TYPE_INT: + [[fallthrough]]; + case TYPE_UNSIGNED_INT: { + return orc::createPrimitiveType(orc::TypeKind::INT); + } + case TYPE_BIGINT: + [[fallthrough]]; + case TYPE_UNSIGNED_BIGINT: { + return orc::createPrimitiveType(orc::TypeKind::LONG); + } + case TYPE_FLOAT: { + return orc::createPrimitiveType(orc::TypeKind::FLOAT); + } + case TYPE_DOUBLE: { + return orc::createPrimitiveType(orc::TypeKind::DOUBLE); + } + case TYPE_CHAR: + [[fallthrough]]; + case TYPE_VARCHAR: { + return orc::createPrimitiveType(orc::TypeKind::STRING); + } + case TYPE_DECIMAL: + [[fallthrough]]; + case TYPE_DECIMAL32: + [[fallthrough]]; + case TYPE_DECIMAL64: + [[fallthrough]]; + case TYPE_DECIMAL128: { + return orc::createDecimalType(type_desc.precision, type_desc.scale); + } + case TYPE_DATE: { + return orc::createPrimitiveType(orc::TypeKind::DATE); + } + case TYPE_DATETIME: + [[fallthrough]]; + case TYPE_TIME: { + return orc::createPrimitiveType(orc::TypeKind::TIMESTAMP); + } + case TYPE_STRUCT: { + auto struct_type = orc::createStructType(); + for (size_t i = 0; i < type_desc.children.size(); ++i) { + const TypeDescriptor& child_type = type_desc.children[i]; + ASSIGN_OR_RETURN(std::unique_ptr child_orc_type, OrcChunkWriter::_get_orc_type(child_type)); + struct_type->addStructField(type_desc.field_names[i], std::move(child_orc_type)); + } + return struct_type; + } + case TYPE_MAP: { + const TypeDescriptor& key_type = type_desc.children[0]; + const TypeDescriptor& value_type = type_desc.children[1]; + if (key_type.is_unknown_type() || value_type.is_unknown_type()) { + return Status::InternalError("This data type in MAP is not supported by ORC"); + } + ASSIGN_OR_RETURN(std::unique_ptr key_orc_type, OrcChunkWriter::_get_orc_type(key_type)); + ASSIGN_OR_RETURN(std::unique_ptr value_orc_type, OrcChunkWriter::_get_orc_type(value_type)); + return orc::createMapType(std::move(key_orc_type), std::move(value_orc_type)); + } + case TYPE_ARRAY: { + const TypeDescriptor& child_type = type_desc.children[0]; + ASSIGN_OR_RETURN(std::unique_ptr child_orc_type, OrcChunkWriter::_get_orc_type(child_type)); + return orc::createListType(std::move(child_orc_type)); + } + default: + return Status::InternalError("This data type is not supported by ORC"); + } +} + +Status OrcChunkWriter::write(Chunk* chunk) { + if (!_writer) { + _writer = orc::createWriter(*_schema, &_output_stream, *_writer_options); + } + size_t num_rows = chunk->num_rows(); + size_t num_columns = chunk->num_columns(); + + auto columns = chunk->columns(); + + _batch = _writer->createRowBatch(num_rows); + orc::StructVectorBatch& root = dynamic_cast(*_batch); + + for (size_t i = 0; i < num_columns; ++i) { + RETURN_IF_ERROR(_write_column(*root.fields[i], columns[i], _type_descs[i])); + } + + root.numElements = num_rows; + RETURN_IF_ERROR(_flush_batch()); + return Status::OK(); +} + +Status OrcChunkWriter::_flush_batch() { + if (!_writer) { + return Status::InternalError("ORC Writer is not initialized"); + } + if (!_batch) { + return Status::InternalError("ORC Batch is empty"); + } + _writer->add(*_batch); + return Status::OK(); +} + +Status OrcChunkWriter::_write_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, + const TypeDescriptor& type_desc) { + switch (type_desc.type) { + case TYPE_BOOLEAN: { + _write_number(orc_column, column); + break; + } + case TYPE_TINYINT: { + _write_number(orc_column, column); + break; + } + case TYPE_SMALLINT: { + _write_number(orc_column, column); + break; + } + case TYPE_INT: { + _write_number(orc_column, column); + break; + } + case TYPE_BIGINT: { + _write_number(orc_column, column); + break; + } + case TYPE_FLOAT: { + _write_number(orc_column, column); + } + case TYPE_DOUBLE: { + _write_number(orc_column, column); + } + case TYPE_CHAR: + [[fallthrough]]; + case TYPE_VARCHAR: { + _write_string(orc_column, column); + break; + } + case TYPE_DECIMAL: { + _write_decimal(orc_column, column, type_desc.precision, type_desc.scale); + break; + } + case TYPE_DECIMAL32: { + _write_decimal32or64or128( + orc_column, column, type_desc.precision, type_desc.scale); + break; + } + case TYPE_DECIMAL64: { + _write_decimal32or64or128( + orc_column, column, type_desc.precision, type_desc.scale); + break; + } + case TYPE_DECIMAL128: { + _write_decimal32or64or128( + orc_column, column, type_desc.precision, type_desc.scale); + break; + } + case TYPE_DATE: { + _write_date(orc_column, column); + break; + } + case TYPE_DATETIME: { + _write_datetime(orc_column, column); + break; + } + case TYPE_ARRAY: { + RETURN_IF_ERROR(_write_array_column(orc_column, column, type_desc)); + break; + } + case TYPE_STRUCT: { + RETURN_IF_ERROR(_write_struct_column(orc_column, column, type_desc)); + break; + } + case TYPE_MAP: { + RETURN_IF_ERROR(_write_map_column(orc_column, column, type_desc)); + break; + } + default: + return Status::NotSupported(strings::Substitute("Type $0 not supported", type_desc.type)); + } + return Status::OK(); +} + +template +void OrcChunkWriter::_write_number(orc::ColumnVectorBatch& orc_column, ColumnPtr& column) { + auto& number_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + + number_orc_column.resize(column_size); + number_orc_column.notNull.resize(column_size); + + if (column->is_nullable()) { + auto c = ColumnHelper::as_raw_column(column); + auto* nulls = c->null_column()->get_data().data(); + auto* values = ColumnHelper::cast_to_raw(c->data_column())->get_data().data(); + + for (size_t i = 0; i < column_size; ++i) { + // redundant copy for auto-vectorization + number_orc_column.notNull[i] = !nulls[i]; + number_orc_column.data[i] = values[i]; + } + orc_column.hasNulls = true; + } else { + auto* values = ColumnHelper::cast_to_raw(column)->get_data().data(); + for (size_t i = 0; i < column_size; ++i) { + number_orc_column.data[i] = values[i]; + } + memset(number_orc_column.notNull.data(), 1, column_size * sizeof(char)); + } + number_orc_column.numElements = column_size; +} + +void OrcChunkWriter::_write_string(orc::ColumnVectorBatch& orc_column, ColumnPtr& column) { + auto& string_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + + string_orc_column.resize(column_size); + string_orc_column.notNull.resize(column_size); + + if (column->is_nullable()) { + auto* c = ColumnHelper::as_raw_column(column); + auto* nulls = c->null_column()->get_data().data(); + auto* values = ColumnHelper::cast_to_raw(c->data_column()); + + for (size_t i = 0; i < column_size; ++i) { + if (nulls[i]) { + string_orc_column.notNull[i] = 0; + continue; + } + string_orc_column.notNull[i] = 1; + auto slice = values->get_slice(i); + string_orc_column.data[i] = const_cast(slice.get_data()); + string_orc_column.length[i] = slice.get_size(); + } + orc_column.hasNulls = true; + } else { + auto* str_column = ColumnHelper::cast_to_raw(column); + + for (size_t i = 0; i < column_size; ++i) { + auto slice = str_column->get_slice(i); + + string_orc_column.data[i] = const_cast(slice.get_data()); + string_orc_column.length[i] = string_orc_column.length[i] = slice.get_size(); + } + memset(string_orc_column.notNull.data(), 1, column_size * sizeof(char)); + } + string_orc_column.numElements = column_size; +} + +template +void OrcChunkWriter::_write_decimal32or64or128(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, int precision, + int scale) { + auto& decimal_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + using Type = RunTimeCppType; + + decimal_orc_column.resize(column_size); + decimal_orc_column.notNull.resize(column_size); + + decimal_orc_column.precision = precision; + decimal_orc_column.scale = scale; + + if (column->is_nullable()) { + auto c = ColumnHelper::as_raw_column(column); + auto* nulls = c->null_column()->get_data().data(); + auto* values = ColumnHelper::cast_to_raw(c->data_column())->get_data().data(); + + for (size_t i = 0; i < column_size; ++i) { + if (nulls[i]) { + decimal_orc_column.notNull[i] = 0; + continue; + } + decimal_orc_column.notNull[i] = 1; + T value; + DecimalV3Cast::to_decimal_trivial(values[i], &value); + if constexpr (std::is_same_v) { + auto high64Bits = static_cast(value >> 64); + auto low64Bits = static_cast(value); + decimal_orc_column.values[i] = orc::Int128{high64Bits, low64Bits}; + } else { + decimal_orc_column.values[i] = value; + } + } + orc_column.hasNulls = true; + } else { + auto* values = ColumnHelper::cast_to_raw(column)->get_data().data(); + for (size_t i = 0; i < column_size; ++i) { + T value; + DecimalV3Cast::to_decimal_trivial(values[i], &value); + if constexpr (std::is_same_v) { + int64_t high64Bits = static_cast(value >> 64); + uint64_t low64Bits = static_cast(value); + decimal_orc_column.values[i] = orc::Int128{high64Bits, low64Bits}; + } else { + decimal_orc_column.values[i] = value; + } + } + memset(decimal_orc_column.notNull.data(), 1, column_size * sizeof(char)); + } + decimal_orc_column.numElements = column_size; +} + +void OrcChunkWriter::_write_decimal(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, int precision, int scale) { + auto& decimal_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + + decimal_orc_column.resize(column_size); + decimal_orc_column.notNull.resize(column_size); + + decimal_orc_column.precision = precision; + decimal_orc_column.scale = scale; + + if (column->is_nullable()) { + auto c = ColumnHelper::as_raw_column(column); + auto* nulls = c->null_column()->get_data().data(); + auto* values = + reinterpret_cast(down_cast(c->data_column().get())->get_data().data()); + + for (size_t i = 0; i < column_size; ++i) { + if (nulls[i]) { + decimal_orc_column.notNull[i] = 0; + continue; + } + decimal_orc_column.notNull[i] = 1; + decimal_orc_column.values[i] = orc::Int128(values[i] >> 64, (values[i] << 64) >> 64); + } + orc_column.hasNulls = true; + } else { + auto* values = reinterpret_cast(down_cast(column.get())->get_data().data()); + for (size_t i = 0; i < column_size; ++i) { + decimal_orc_column.values[i] = orc::Int128(values[i] >> 64, (values[i] << 64) >> 64); + } + memset(decimal_orc_column.notNull.data(), 1, column_size * sizeof(char)); + } + decimal_orc_column.numElements = column_size; +} + +void OrcChunkWriter::_write_date(orc::ColumnVectorBatch& orc_column, ColumnPtr& column) { + auto& date_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + + date_orc_column.resize(column_size); + date_orc_column.notNull.resize(column_size); + + if (column->is_nullable()) { + auto c = ColumnHelper::as_raw_column(column); + auto* nulls = c->null_column()->get_data().data(); + auto* values = ColumnHelper::cast_to_raw(c->data_column())->get_data().data(); + + for (size_t i = 0; i < column_size; ++i) { + if (nulls[i]) { + date_orc_column.notNull[i] = 0; + continue; + } + date_orc_column.notNull[i] = 1; + date_orc_column.data[i] = OrcDateHelper::native_date_to_orc_date(values[i]); + } + orc_column.hasNulls = true; + } else { + auto* values = ColumnHelper::cast_to_raw(column)->get_data().data(); + for (size_t i = 0; i < column_size; ++i) { + date_orc_column.data[i] = OrcDateHelper::native_date_to_orc_date(values[i]); + } + memset(date_orc_column.notNull.data(), 1, column_size * sizeof(char)); + } + date_orc_column.numElements = column_size; +} + +void OrcChunkWriter::_write_datetime(orc::ColumnVectorBatch& orc_column, ColumnPtr& column) { + auto& timestamp_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + + timestamp_orc_column.resize(column_size); + timestamp_orc_column.notNull.resize(column_size); + + if (column->is_nullable()) { + auto c = ColumnHelper::as_raw_column(column); + auto* nulls = c->null_column()->get_data().data(); + auto* values = ColumnHelper::cast_to_raw(c->data_column())->get_data().data(); + + for (size_t i = 0; i < column_size; ++i) { + if (nulls[i]) { + timestamp_orc_column.notNull[i] = 0; + continue; + } + timestamp_orc_column.notNull[i] = 1; + OrcTimestampHelper::native_ts_to_orc_ts(values[i], timestamp_orc_column.data[i], + timestamp_orc_column.nanoseconds[i]); + } + orc_column.hasNulls = true; + } else { + auto* values = ColumnHelper::cast_to_raw(column)->get_data().data(); + for (size_t i = 0; i < column_size; ++i) { + OrcTimestampHelper::native_ts_to_orc_ts(values[i], timestamp_orc_column.data[i], + timestamp_orc_column.nanoseconds[i]); + } + memset(timestamp_orc_column.notNull.data(), 1, column_size * sizeof(char)); + } + timestamp_orc_column.numElements = column_size; +} + +Status OrcChunkWriter::_write_array_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, + const TypeDescriptor& type) { + auto& array_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + + array_orc_column.resize(column_size); + array_orc_column.notNull.resize(column_size); + auto& value_orc_column = *array_orc_column.elements; + + if (column->is_nullable()) { + auto* col_nullable = down_cast(column.get()); + auto* array_cols = down_cast(col_nullable->data_column().get()); + uint32_t* offsets = array_cols->offsets_column().get()->get_data().data(); + auto* nulls = col_nullable->null_column()->get_data().data(); + + array_orc_column.offsets[0] = offsets[0]; + for (size_t i = 0; i < column_size; ++i) { + array_orc_column.offsets[i + 1] = offsets[i + 1]; + array_orc_column.notNull[i] = !nulls[i]; + } + + RETURN_IF_ERROR(_write_column(value_orc_column, array_cols->elements_column(), type.children[0])); + array_orc_column.hasNulls = true; + } else { + auto* array_cols = down_cast(column.get()); + uint32_t* offsets = array_cols->offsets_column().get()->get_data().data(); + + array_orc_column.offsets[0] = offsets[0]; + for (size_t i = 0; i < column_size; ++i) { + array_orc_column.offsets[i + 1] = offsets[i + 1]; + } + memset(array_orc_column.notNull.data(), 1, column_size * sizeof(char)); + + RETURN_IF_ERROR(_write_column(value_orc_column, array_cols->elements_column(), type.children[0])); + } + array_orc_column.numElements = column_size; + return Status::OK(); +} + +Status OrcChunkWriter::_write_struct_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, + const TypeDescriptor& type) { + auto& struct_orc_column = dynamic_cast(orc_column); + auto column_size = column->size(); + + struct_orc_column.resize(column_size); + struct_orc_column.notNull.resize(column_size); + + if (column->is_nullable()) { + auto* col_nullable = down_cast(column.get()); + auto* struct_cols = down_cast(col_nullable->data_column().get()); + auto* nulls = col_nullable->null_column()->get_data().data(); + Columns& field_columns = struct_cols->fields_column(); + + for (size_t i = 0; i < column_size; ++i) { + struct_orc_column.notNull[i] = !nulls[i]; + } + + for (size_t i = 0; i < type.children.size(); ++i) { + RETURN_IF_ERROR(_write_column(*struct_orc_column.fields[i], field_columns[i], type.children[i])); + } + struct_orc_column.hasNulls = true; + } else { + auto* struct_cols = down_cast(column.get()); + Columns& field_columns = struct_cols->fields_column(); + + for (size_t i = 0; i < type.children.size(); ++i) { + RETURN_IF_ERROR(_write_column(*struct_orc_column.fields[i], field_columns[i], type.children[i])); + } + memset(struct_orc_column.notNull.data(), 1, column_size * sizeof(char)); + } + struct_orc_column.numElements = column_size; + return Status::OK(); +} + +Status OrcChunkWriter::_write_map_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, + const TypeDescriptor& type) { + auto& map_orc_column = dynamic_cast(orc_column); + size_t column_size = column->size(); + + orc::ColumnVectorBatch& keys_orc_column = *map_orc_column.keys; + orc::ColumnVectorBatch& values_orc_column = *map_orc_column.elements; + + if (column->is_nullable()) { + auto* col_nullable = down_cast(column.get()); + auto* col_map = down_cast(col_nullable->data_column().get()); + uint32_t* offsets = col_map->offsets_column().get()->get_data().data(); + auto* nulls = col_nullable->null_column()->get_data().data(); + + ColumnPtr& keys = col_map->keys_column(); + ColumnPtr& values = col_map->values_column(); + + map_orc_column.resize(column_size); + map_orc_column.offsets[0] = 0; + + for (size_t i = 0; i < column_size; ++i) { + map_orc_column.offsets[i + 1] = offsets[i + 1]; + map_orc_column.notNull[i] = !nulls[i]; + } + + RETURN_IF_ERROR(_write_column(keys_orc_column, keys, type.children[0])); + RETURN_IF_ERROR(_write_column(values_orc_column, values, type.children[1])); + map_orc_column.hasNulls = true; + } else { + auto* col_map = down_cast(column.get()); + uint32_t* offsets = col_map->offsets_column().get()->get_data().data(); + + ColumnPtr& keys = col_map->keys_column(); + ColumnPtr& values = col_map->values_column(); + + for (size_t i = 0; i < column_size; ++i) { + map_orc_column.offsets[i + 1] = offsets[i + 1]; + } + memset(map_orc_column.notNull.data(), 1, column_size * sizeof(char)); + + RETURN_IF_ERROR(_write_column(keys_orc_column, keys, type.children[0])); + RETURN_IF_ERROR(_write_column(values_orc_column, values, type.children[1])); + } + map_orc_column.numElements = column->size(); + return Status::OK(); +} + +void OrcChunkWriter::close() { + _writer->close(); +} + +StatusOr> OrcChunkWriter::make_schema(const std::vector& file_column_names, + const std::vector& type_descs) { + auto schema = orc::createStructType(); + for (size_t i = 0; i < type_descs.size(); ++i) { + ASSIGN_OR_RETURN(std::unique_ptr field_type, OrcChunkWriter::_get_orc_type(type_descs[i])); + schema->addStructField(file_column_names[i], std::move(field_type)); + } + return schema; +} + +/* +** AsyncOrcChunkWriter +*/ +AsyncOrcChunkWriter::AsyncOrcChunkWriter(std::unique_ptr writable_file, + std::shared_ptr writer_options, + std::shared_ptr schema, + const std::vector& output_expr_ctxs, + PriorityThreadPool* executor_pool, RuntimeProfile* parent_profile) + : OrcChunkWriter(std::move(writable_file), writer_options, schema, output_expr_ctxs), + _executor_pool(executor_pool), + _parent_profile(parent_profile) { + _io_timer = ADD_TIMER(_parent_profile, "OrcChunkWriterIoTimer"); +}; + +Status AsyncOrcChunkWriter::_flush_batch() { + { + auto lock = std::unique_lock(_lock); + _batch_closing = true; + } + + bool finish = _executor_pool->try_offer([&]() { + SCOPED_TIMER(_io_timer); + if (_batch != nullptr) { + _writer->add(*_batch); + _batch = nullptr; + } + { + auto lock = std::unique_lock(_lock); + _batch_closing = false; + } + _cv.notify_one(); + }); + + if (!finish) { + { + auto lock = std::unique_lock(_lock); + _batch_closing = false; + } + _cv.notify_one(); + auto st = Status::ResourceBusy("submit flush batch task fails"); + return st; + } + return Status::OK(); +} + +Status AsyncOrcChunkWriter::close(RuntimeState* state, + const std::function& cb) { + bool ret = _executor_pool->try_offer([&, state, cb]() { + SCOPED_TIMER(_io_timer); + { + auto lock = std::unique_lock(_lock); + _cv.wait(lock, [&] { return !_batch_closing; }); + } + _writer->close(); + _batch = nullptr; + if (cb != nullptr) { + cb(this, state); + } + _closed.store(true); + return Status::OK(); + }); + + if (!ret) { + return Status::InternalError("Submit close file error"); + } + return Status::OK(); +} + +bool AsyncOrcChunkWriter::writable() { + auto lock = std::unique_lock(_lock); + return !_batch_closing; +} + +} // namespace starrocks diff --git a/be/src/formats/orc/orc_chunk_writer.h b/be/src/formats/orc/orc_chunk_writer.h new file mode 100644 index 00000000000000..10c4695dcf6ecd --- /dev/null +++ b/be/src/formats/orc/orc_chunk_writer.h @@ -0,0 +1,150 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "column/column_helper.h" +#include "column/vectorized_fwd.h" +#include "exprs/expr.h" +#include "exprs/expr_context.h" +#include "formats/orc/utils.h" +#include "fs/fs.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "runtime/types.h" +#include "util/priority_thread_pool.hpp" + +namespace starrocks { + +class OrcColumnWriter; + +// OrcChunkWriter is a bridge between apache/orc file and chunk, wraps orc::writer +// Write chunks into buffer. Flush on closing. +class OrcOutputStream : public orc::OutputStream { +public: + OrcOutputStream(std::unique_ptr wfile); + + ~OrcOutputStream() override; + + uint64_t getLength() const override; + + uint64_t getNaturalWriteSize() const override; + + void write(const void* buf, size_t length) override; + + void close() override; + + const std::string& getName() const override; + +private: + std::unique_ptr _wfile; + bool _is_closed = false; +}; + +class OrcChunkWriter { +public: + OrcChunkWriter(std::unique_ptr writable_file, std::shared_ptr writer_options, + std::shared_ptr schema, const std::vector& output_expr_ctxs); + + // For UT + OrcChunkWriter(std::unique_ptr writable_file, std::shared_ptr writer_options, + std::vector& type_descs, std::unique_ptr schema) + : _type_descs(type_descs), + _writer_options(writer_options), + _schema(std::move(schema)), + _output_stream(std::move(writable_file)){}; + + Status set_compression(const TCompressionType::type& compression_type); + + Status write(Chunk* chunk); + + void close(); + + size_t file_size() { return _output_stream.getLength(); }; + + static StatusOr> make_schema(const std::vector& file_column_names, + const std::vector& type_descs); + +private: + virtual Status _flush_batch(); + + static StatusOr> _get_orc_type(const TypeDescriptor& type_desc); + + Status _write_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, const TypeDescriptor& type_desc); + + template + void _write_number(orc::ColumnVectorBatch& orc_column, ColumnPtr& column); + + void _write_string(orc::ColumnVectorBatch& orc_column, ColumnPtr& column); + + void _write_decimal(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, int precision, int scale); + + template + void _write_decimal32or64or128(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, int precision, int scale); + + void _write_date(orc::ColumnVectorBatch& orc_column, ColumnPtr& column); + + void _write_datetime(orc::ColumnVectorBatch& orc_column, ColumnPtr& column); + + Status _write_array_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, const TypeDescriptor& type); + + Status _write_struct_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, const TypeDescriptor& type); + + Status _write_map_column(orc::ColumnVectorBatch& orc_column, ColumnPtr& column, const TypeDescriptor& type); + +protected: + std::unique_ptr _writer; + std::vector _type_descs; + std::shared_ptr _writer_options; + std::shared_ptr _schema; + std::unique_ptr _batch = nullptr; + OrcOutputStream _output_stream; +}; + +class AsyncOrcChunkWriter : public OrcChunkWriter { +public: + AsyncOrcChunkWriter(std::unique_ptr writable_file, std::shared_ptr writer_options, + std::shared_ptr schema, const std::vector& output_expr_ctxs, + PriorityThreadPool* executor_pool, RuntimeProfile* parent_profile); + + // For UT + AsyncOrcChunkWriter(std::unique_ptr writable_file, std::shared_ptr writer_options, + std::vector& type_descs, std::unique_ptr schema, + PriorityThreadPool* executor_pool, RuntimeProfile* parent_profile) + : OrcChunkWriter(std::move(writable_file), writer_options, type_descs, std::move(schema)), + _executor_pool(executor_pool), + _parent_profile(parent_profile){}; + + Status close(RuntimeState* state, const std::function& cb = nullptr); + + bool writable(); + + bool closed() { return _closed.load(); } + +private: + Status _flush_batch() override; + + PriorityThreadPool* _executor_pool; + RuntimeProfile* _parent_profile = nullptr; + RuntimeProfile::Counter* _io_timer = nullptr; + + std::atomic _closed = false; + std::condition_variable _cv; + std::mutex _lock; + bool _batch_closing = false; +}; + +} // namespace starrocks \ No newline at end of file diff --git a/be/src/formats/orc/utils.h b/be/src/formats/orc/utils.h index 958fe9d6fccc85..b212f6cf04799f 100644 --- a/be/src/formats/orc/utils.h +++ b/be/src/formats/orc/utils.h @@ -53,7 +53,7 @@ class OrcDateHelper { // orc timestamp is millseconds since unix epoch time. // timestamp conversion is quite tricky, because it involves timezone info, // and it affects how we interpret `value`. according to orc v1 spec -// https://orc.apache.org/specification/ORCv1/ writer timezoe is in stripe footer. +// https://orc.apache.org/specification/ORCv1/ writer timezone is in stripe footer. // time conversion involves two aspects: // 1. timezone (UTC/GMT and local timezone) @@ -97,6 +97,13 @@ class OrcTimestampHelper { } } } + + static void native_ts_to_orc_ts(const TimestampValue& tv, int64_t& seconds, int64_t& nanoseconds) { + Timestamp time = tv._timestamp & TIMESTAMP_BITS_TIME; + uint64_t microseconds = time % USECS_PER_SEC; + seconds = tv.to_unix_second(); + nanoseconds = microseconds * 1000; + } }; } // namespace starrocks \ No newline at end of file diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 6c49f7322b068b..1279d032370e47 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -150,6 +150,7 @@ set(EXEC_FILES ./formats/avro/nullable_column_test.cpp ./formats/orc/orc_chunk_reader_test.cpp ./formats/orc/orc_column_reader_test.cpp + ./formats/orc/orc_chunk_writer_test.cpp ./formats/orc/orc_lazy_load_test.cpp ./formats/parquet/arrow_parquet_writer_test.cpp ./formats/parquet/parquet_schema_test.cpp diff --git a/be/test/formats/orc/orc_chunk_writer_test.cpp b/be/test/formats/orc/orc_chunk_writer_test.cpp new file mode 100644 index 00000000000000..9de8337691b5c3 --- /dev/null +++ b/be/test/formats/orc/orc_chunk_writer_test.cpp @@ -0,0 +1,892 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "formats/orc/orc_chunk_writer.h" + +#include + +#include +#include +#include +#include +#include + +#include "column/array_column.h" +#include "column/map_column.h" +#include "column/struct_column.h" +#include "common/object_pool.h" +#include "formats/orc/orc_chunk_reader.h" +#include "fs/fs_posix.h" +#include "gen_cpp/Exprs_types.h" +#include "gutil/strings/substitute.h" +#include "runtime/descriptor_helper.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "testutil/assert.h" +#include "util/priority_thread_pool.hpp" + +namespace starrocks { + +static void assert_equal_chunk(const Chunk* expected, const Chunk* actual) { + if (expected->debug_columns() != actual->debug_columns()) { + std::cout << expected->debug_columns() << std::endl; + std::cout << actual->debug_columns() << std::endl; + } + ASSERT_EQ(expected->debug_columns(), actual->debug_columns()); + for (size_t i = 0; i < expected->num_columns(); i++) { + const auto& expected_col = expected->get_column_by_index(i); + const auto& actual_col = actual->get_column_by_index(i); + if (expected_col->debug_string() != actual_col->debug_string()) { + std::cout << expected_col->debug_string() << std::endl; + std::cout << actual_col->debug_string() << std::endl; + } + ASSERT_EQ(expected_col->debug_string(), actual_col->debug_string()); + } +} + +class OrcChunkWriterTest : public testing::Test { +public: + void SetUp() override { + TUniqueId fragment_id; + TQueryOptions query_options; + query_options.batch_size = config::vector_chunk_size; + TQueryGlobals query_globals; + _runtime_state = std::make_shared(fragment_id, query_options, query_globals, nullptr); + _runtime_state->init_instance_mem_tracker(); + _fs = new_fs_posix(); + }; + void TearDown() override { _fs->delete_file(_file_path); }; + +protected: + std::vector _make_type_names(const std::vector& type_descs) { + std::vector names; + for (auto& desc : type_descs) { + names.push_back(desc.debug_string()); + } + return names; + } + + void _create_tuple_descriptor(RuntimeState* state, ObjectPool* pool, const std::vector& column_names, + const std::vector& type_descs, TupleDescriptor** tuple_desc, + bool nullable) { + TDescriptorTableBuilder table_desc_builder; + + TTupleDescriptorBuilder tuple_desc_builder; + for (size_t i = 0; i < column_names.size(); i++) { + TSlotDescriptorBuilder b2; + b2.column_name(column_names[i]).type(type_descs[i]).id(i).nullable(nullable); + tuple_desc_builder.add_slot(b2.build()); + } + tuple_desc_builder.build(&table_desc_builder); + + std::vector row_tuples = std::vector{0}; + std::vector nullable_tuples = std::vector{nullable}; + DescriptorTbl* tbl = nullptr; + DescriptorTbl::create(state, pool, table_desc_builder.desc_tbl(), &tbl, config::vector_chunk_size); + + RowDescriptor* row_desc = pool->add(new RowDescriptor(*tbl, row_tuples, nullable_tuples)); + *tuple_desc = row_desc->tuple_descriptors()[0]; + return; + } + + void _create_slot_descriptors(RuntimeState* state, ObjectPool* pool, std::vector* res, + const std::vector& column_names, + const std::vector& type_descs, bool nullable) { + TupleDescriptor* tuple_desc; + _create_tuple_descriptor(state, pool, column_names, type_descs, &tuple_desc, nullable); + *res = tuple_desc->slots(); + return; + } + + Status _write_chunk(const ChunkPtr& chunk, std::vector& type_descs, + std::unique_ptr schema) { + _fs->delete_file(_file_path); + ASSIGN_OR_ABORT(auto file, _fs->new_writable_file(_file_path)); + auto writer_options = std::make_shared(); + + auto chunk_writer = + std::make_shared(std::move(file), writer_options, type_descs, std::move(schema)); + auto st = chunk_writer->write(chunk.get()); + if (!st.ok()) { + std::cout << st.to_string() << std::endl; + return st; + } + chunk_writer->close(); + return st; + } + + Status _write_chunk_with_AsyncWriter(const ChunkPtr& chunk, std::vector& type_descs, + std::unique_ptr schema) { + _fs->delete_file(_file_path); + ASSIGN_OR_ABORT(auto file, _fs->new_writable_file(_file_path)); + RuntimeProfile* profile = _pool.add(new RuntimeProfile("orc_writer_prof", true)); + auto thread_pool = std::make_unique("async_chunk_writer_test", 2, 5); + + auto writer_options = std::make_shared(); + auto chunk_writer = std::make_shared(std::move(file), writer_options, type_descs, + std::move(schema), thread_pool.get(), profile); + auto st = chunk_writer->write(chunk.get()); + if (!st.ok()) { + std::cout << st.to_string() << std::endl; + return st; + } + st = chunk_writer->close(_runtime_state.get(), nullptr); + if (!st.ok()) { + std::cout << st.to_string() << std::endl; + return st; + } + // IO timeout: 2s + std::this_thread::sleep_for(std::chrono::seconds(2)); + + if (!chunk_writer->closed()) { + return Status::InternalError("Async close file error"); + } + return st; + } + + Status _read_chunk(ChunkPtr& chunk, const std::vector& column_names, + const std::vector& type_descs, bool nullable) { + std::vector src_slot_descs; + _create_slot_descriptors(_runtime_state.get(), &_pool, &src_slot_descs, column_names, type_descs, nullable); + OrcChunkReader reader(_runtime_state->chunk_size(), src_slot_descs); + auto input_stream = orc::readLocalFile(_file_path); + auto res = reader.init(std::move(input_stream)); + if (!res.ok()) { + std::cout << res.to_string(); + } + auto st = reader.read_next(); + DCHECK(st.ok()) << st.get_error_msg(); + + auto chunk_read = reader.create_chunk(); + st = reader.fill_chunk(&chunk_read); + DCHECK(st.ok()) << st.get_error_msg(); + + chunk = reader.cast_chunk(&chunk_read); + return Status::OK(); + } + +private: + std::unique_ptr _fs; + std::string _file_path{"./be/test/exec/test_data/orc_scanner/tmp.orc"}; + ObjectPool _pool; + std::shared_ptr _runtime_state; +}; + +TEST_F(OrcChunkWriterTest, TestWriteIntergersNullable) { + std::vector type_descs{ + TypeDescriptor::from_logical_type(TYPE_TINYINT), + TypeDescriptor::from_logical_type(TYPE_SMALLINT), + TypeDescriptor::from_logical_type(TYPE_INT), + TypeDescriptor::from_logical_type(TYPE_BIGINT), + }; + + auto chunk = std::make_shared(); + { + auto col0 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_TINYINT), true); + std::vector int8_nums{INT8_MIN, INT8_MAX, 0, 1}; + auto count = col0->append_numbers(int8_nums.data(), size(int8_nums) * sizeof(int8_t)); + ASSERT_EQ(4, count); + chunk->append_column(col0, chunk->num_columns()); + + auto col1 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_SMALLINT), true); + std::vector int16_nums{INT16_MIN, INT16_MAX, 0, 1}; + count = col1->append_numbers(int16_nums.data(), size(int16_nums) * sizeof(int16_t)); + ASSERT_EQ(4, count); + chunk->append_column(col1, chunk->num_columns()); + + auto col2 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_INT), true); + std::vector int32_nums{INT32_MIN, INT32_MAX, 0, 1}; + count = col2->append_numbers(int32_nums.data(), size(int32_nums) * sizeof(int32_t)); + ASSERT_EQ(4, count); + chunk->append_column(col2, chunk->num_columns()); + + auto col3 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_BIGINT), true); + std::vector int64_nums{INT64_MIN, INT64_MAX, 0, 1}; + count = col3->append_numbers(int64_nums.data(), size(int64_nums) * sizeof(int64_t)); + ASSERT_EQ(4, count); + chunk->append_column(col3, chunk->num_columns()); + } + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteIntergersNotNull) { + std::vector type_descs{ + TypeDescriptor::from_logical_type(TYPE_TINYINT), + TypeDescriptor::from_logical_type(TYPE_SMALLINT), + TypeDescriptor::from_logical_type(TYPE_INT), + TypeDescriptor::from_logical_type(TYPE_BIGINT), + }; + + auto chunk = std::make_shared(); + { + auto col0 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_TINYINT), false); + std::vector int8_nums{INT8_MIN, INT8_MAX, 0, 1}; + auto count = col0->append_numbers(int8_nums.data(), size(int8_nums) * sizeof(int8_t)); + ASSERT_EQ(4, count); + chunk->append_column(col0, chunk->num_columns()); + + auto col1 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_SMALLINT), false); + std::vector int16_nums{INT16_MIN, INT16_MAX, 0, 1}; + count = col1->append_numbers(int16_nums.data(), size(int16_nums) * sizeof(int16_t)); + ASSERT_EQ(4, count); + chunk->append_column(col1, chunk->num_columns()); + + auto col2 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_INT), false); + std::vector int32_nums{INT32_MIN, INT32_MAX, 0, 1}; + count = col2->append_numbers(int32_nums.data(), size(int32_nums) * sizeof(int32_t)); + ASSERT_EQ(4, count); + chunk->append_column(col2, chunk->num_columns()); + + auto col3 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_BIGINT), false); + std::vector int64_nums{INT64_MIN, INT64_MAX, 0, 1}; + count = col3->append_numbers(int64_nums.data(), size(int64_nums) * sizeof(int64_t)); + ASSERT_EQ(4, count); + chunk->append_column(col3, chunk->num_columns()); + } + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, false); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteBooleanNullable) { + auto type_bool = TypeDescriptor::from_logical_type(TYPE_BOOLEAN); + std::vector type_descs{type_bool}; + + auto chunk = std::make_shared(); + { + auto data_column = BooleanColumn::create(); + std::vector values = {0, 1, 1, 0}; + data_column->append_numbers(values.data(), values.size() * sizeof(uint8_t)); + auto null_column = UInt8Column::create(); + std::vector nulls = {1, 0, 1, 0}; + null_column->append_numbers(nulls.data(), nulls.size()); + auto nullable_column = NullableColumn::create(data_column, null_column); + chunk->append_column(nullable_column, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteBooleanNotNull) { + auto type_bool = TypeDescriptor::from_logical_type(TYPE_BOOLEAN); + std::vector type_descs{type_bool}; + + auto chunk = std::make_shared(); + { + auto data_column = BooleanColumn::create(); + std::vector values = {0, 1, 1, 0}; + data_column->append_numbers(values.data(), values.size() * sizeof(uint8_t)); + chunk->append_column(data_column, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, false); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteStringsNullable) { + auto type_varchar = TypeDescriptor::from_logical_type(TYPE_VARCHAR); + auto type_char = TypeDescriptor::from_logical_type(TYPE_CHAR); + std::vector type_descs{type_varchar, type_char}; + + auto chunk = std::make_shared(); + { + auto data_column = BinaryColumn::create(); + data_column->append("hello"); + data_column->append("world"); + data_column->append("starrocks"); + data_column->append("lakehouse"); + + auto null_column = UInt8Column::create(); + std::vector nulls = {1, 0, 1, 0}; + null_column->append_numbers(nulls.data(), nulls.size()); + auto nullable_column = NullableColumn::create(data_column, null_column); + chunk->append_column(nullable_column, chunk->num_columns()); + + auto data_column2 = BinaryColumn::create(); + data_column2->append("hello"); + data_column2->append("world"); + data_column2->append("abc"); + data_column2->append("def"); + + auto null_column2 = UInt8Column::create(); + std::vector nulls2 = {0, 0, 1, 1}; + null_column2->append_numbers(nulls2.data(), nulls2.size()); + auto nullable_column2 = NullableColumn::create(data_column2, null_column2); + chunk->append_column(nullable_column2, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteStringsNotNull) { + auto type_varchar = TypeDescriptor::from_logical_type(TYPE_VARCHAR); + auto type_char = TypeDescriptor::from_logical_type(TYPE_CHAR); + std::vector type_descs{type_varchar, type_char}; + + auto chunk = std::make_shared(); + { + auto data_column = BinaryColumn::create(); + data_column->append("hello"); + data_column->append("world"); + data_column->append("starrocks"); + data_column->append("lakehouse"); + + chunk->append_column(data_column, chunk->num_columns()); + + auto data_column2 = BinaryColumn::create(); + data_column2->append("hello"); + data_column2->append("world"); + data_column2->append("abc"); + data_column2->append("def"); + + chunk->append_column(data_column2, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, false); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteDecimal) { + std::vector type_descs{ + TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL32, 9, 5), + TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL64, 18, 9), + TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL128, 20, 10), + }; + + auto chunk = std::make_shared(); + { + auto col0 = ColumnHelper::create_column(type_descs[0], true); + std::vector int32_nums{INT32_MIN, INT32_MAX, 0, 1}; + auto count = col0->append_numbers(int32_nums.data(), size(int32_nums) * sizeof(int32_t)); + ASSERT_EQ(4, count); + chunk->append_column(col0, chunk->num_columns()); + + auto col1 = ColumnHelper::create_column(type_descs[1], true); + std::vector int64_nums{INT64_MIN, INT64_MAX, 0, 1}; + count = col1->append_numbers(int64_nums.data(), size(int64_nums) * sizeof(int64_t)); + ASSERT_EQ(4, count); + chunk->append_column(col1, chunk->num_columns()); + + auto col2 = ColumnHelper::create_column(type_descs[2], true); + std::vector int128_nums{INT64_MIN, INT64_MAX, 0, 1}; + count = col2->append_numbers(int128_nums.data(), size(int128_nums) * sizeof(int128_t)); + ASSERT_EQ(4, count); + chunk->append_column(col2, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteDecimalNotNull) { + std::vector type_descs{ + TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL32, 9, 5), + TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL64, 18, 9), + TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL128, 20, 10), + }; + + auto chunk = std::make_shared(); + { + auto col1 = ColumnHelper::create_column(type_descs[0], false); + std::vector int32_nums{INT32_MIN, INT32_MAX, 0, 1}; + auto count = col1->append_numbers(int32_nums.data(), size(int32_nums) * sizeof(int32_t)); + ASSERT_EQ(4, count); + chunk->append_column(col1, chunk->num_columns()); + + auto col2 = ColumnHelper::create_column(type_descs[1], false); + std::vector int64_nums{INT64_MIN, INT64_MAX, 0, 1}; + count = col2->append_numbers(int64_nums.data(), size(int64_nums) * sizeof(int64_t)); + ASSERT_EQ(4, count); + chunk->append_column(col2, chunk->num_columns()); + + auto col3 = ColumnHelper::create_column(type_descs[2], false); + std::vector int128_nums{INT64_MIN, INT64_MAX, 0, 1}; + count = col3->append_numbers(int128_nums.data(), size(int128_nums) * sizeof(int128_t)); + ASSERT_EQ(4, count); + chunk->append_column(col3, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, false); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteDate) { + auto type_date = TypeDescriptor::from_logical_type(TYPE_DATE); + std::vector type_descs{type_date}; + + auto chunk = std::make_shared(); + { + auto data_column = DateColumn::create(); + { + Datum datum; + datum.set_date(DateValue::create(1999, 9, 9)); + data_column->append_datum(datum); + datum.set_date(DateValue::create(1999, 9, 10)); + data_column->append_datum(datum); + datum.set_date(DateValue::create(1999, 9, 11)); + data_column->append_datum(datum); + data_column->append_default(); + } + + auto null_column = UInt8Column::create(); + std::vector nulls = {1, 0, 1, 0}; + null_column->append_numbers(nulls.data(), nulls.size()); + auto nullable_column = NullableColumn::create(data_column, null_column); + chunk->append_column(nullable_column, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteDateNotNull) { + auto type_date = TypeDescriptor::from_logical_type(TYPE_DATE); + std::vector type_descs{type_date}; + + auto chunk = std::make_shared(); + { + auto data_column = DateColumn::create(); + { + Datum datum; + datum.set_date(DateValue::create(1999, 9, 9)); + data_column->append_datum(datum); + datum.set_date(DateValue::create(1999, 9, 10)); + data_column->append_datum(datum); + datum.set_date(DateValue::create(1999, 9, 11)); + data_column->append_datum(datum); + data_column->append_default(); + } + + chunk->append_column(data_column, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, false); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteTimestamp) { + auto type_datetime = TypeDescriptor::from_logical_type(TYPE_DATETIME); + std::vector type_descs{type_datetime}; + + auto chunk = std::make_shared(); + { + // not-null column + auto data_column = TimestampColumn::create(); + { + Datum datum; + datum.set_timestamp(TimestampValue::create(1999, 9, 9, 0, 0, 0)); + data_column->append_datum(datum); + datum.set_timestamp(TimestampValue::create(1999, 9, 10, 1, 1, 1)); + data_column->append_datum(datum); + datum.set_timestamp(TimestampValue::create(1999, 9, 11, 2, 2, 2)); + data_column->append_datum(datum); + data_column->append_default(); + } + + auto null_column = UInt8Column::create(); + std::vector nulls = {1, 0, 1, 0}; + null_column->append_numbers(nulls.data(), nulls.size()); + auto nullable_column = NullableColumn::create(data_column, null_column); + chunk->append_column(nullable_column, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // check correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteTimestampNotNull) { + auto type_datetime = TypeDescriptor::from_logical_type(TYPE_DATETIME); + std::vector type_descs{type_datetime}; + + auto chunk = std::make_shared(); + { + // not-null column + auto data_column = TimestampColumn::create(); + { + Datum datum; + datum.set_timestamp(TimestampValue::create(1999, 9, 9, 0, 0, 0)); + data_column->append_datum(datum); + datum.set_timestamp(TimestampValue::create(1999, 9, 10, 1, 1, 1)); + data_column->append_datum(datum); + datum.set_timestamp(TimestampValue::create(1999, 9, 11, 2, 2, 2)); + data_column->append_datum(datum); + data_column->append_default(); + } + + chunk->append_column(data_column, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, false); + ASSERT_OK(st); + + // check correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteStruct) { + // type_descs + std::vector type_descs; + auto type_int_a = TypeDescriptor::from_logical_type(TYPE_SMALLINT); + auto type_int_b = TypeDescriptor::from_logical_type(TYPE_INT); + auto type_int_c = TypeDescriptor::from_logical_type(TYPE_BIGINT); + auto type_int_struct = TypeDescriptor::from_logical_type(TYPE_STRUCT); + type_int_struct.children = {type_int_a, type_int_b, type_int_c}; + type_int_struct.field_names = {"a", "b", "c"}; + type_descs.push_back(type_int_struct); + + auto chunk = std::make_shared(); + { + std::vector nulls{0, 0, 1, 0}; + + auto data_col_a = Int16Column::create(); + std::vector nums_a{1, 2, -99, 3}; + data_col_a->append_numbers(nums_a.data(), sizeof(int16_t) * nums_a.size()); + auto null_col_a = UInt8Column::create(); + null_col_a->append_numbers(nulls.data(), sizeof(uint8_t) * nulls.size()); + auto nullable_col_a = NullableColumn::create(data_col_a, null_col_a); + + auto data_col_b = Int32Column::create(); + std::vector nums_b{1, 2, -99, 3}; + data_col_b->append_numbers(nums_b.data(), sizeof(int32_t) * nums_b.size()); + auto null_col_b = UInt8Column::create(); + null_col_b->append_numbers(nulls.data(), sizeof(uint8_t) * nulls.size()); + auto nullable_col_b = NullableColumn::create(data_col_b, null_col_b); + + auto data_col_c = Int64Column::create(); + std::vector nums_c{1, 2, -99, 3}; + data_col_c->append_numbers(nums_c.data(), sizeof(int64_t) * nums_c.size()); + auto null_col_c = UInt8Column::create(); + null_col_c->append_numbers(nulls.data(), sizeof(uint8_t) * nulls.size()); + auto nullable_col_c = NullableColumn::create(data_col_c, null_col_c); + + Columns fields{nullable_col_a, nullable_col_b, nullable_col_c}; + auto struct_column = StructColumn::create(fields, type_int_struct.field_names); + auto null_column = UInt8Column::create(); + null_column->append_numbers(nulls.data(), sizeof(uint8_t) * nulls.size()); + auto nullable_col = NullableColumn::create(struct_column, null_column); + + chunk->append_column(nullable_col, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteMap) { + // type_descs + std::vector type_descs; + auto type_int_key = TypeDescriptor::from_logical_type(TYPE_INT); + auto type_int_value = TypeDescriptor::from_logical_type(TYPE_INT); + auto type_int_map = TypeDescriptor::from_logical_type(TYPE_MAP); + type_int_map.children.push_back(type_int_key); + type_int_map.children.push_back(type_int_value); + type_descs.push_back(type_int_map); + + // [1 -> 1], NULL, [], [2 -> 2, 3 -> NULL] + auto chunk = std::make_shared(); + { + auto key_data_col = Int32Column::create(); + std::vector key_nums{1, 2, 3, 4}; + key_data_col->append_numbers(key_nums.data(), sizeof(int32_t) * key_nums.size()); + auto key_null_col = UInt8Column::create(); + std::vector key_nulls{0, 0, 0, 0}; + key_null_col->append_numbers(key_nulls.data(), sizeof(uint8_t) * key_nulls.size()); + auto key_col = NullableColumn::create(key_data_col, key_null_col); + + auto value_data_col = Int32Column::create(); + std::vector value_nums{1, 2, -99, 4}; + value_data_col->append_numbers(value_nums.data(), sizeof(int32_t) * value_nums.size()); + auto value_null_col = UInt8Column::create(); + std::vector value_nulls{0, 0, 1, 0}; + value_null_col->append_numbers(value_nulls.data(), sizeof(uint8_t) * value_nulls.size()); + auto value_col = NullableColumn::create(value_data_col, value_null_col); + + auto offsets_col = UInt32Column::create(); + std::vector offsets{0, 1, 1, 1, 4}; + offsets_col->append_numbers(offsets.data(), sizeof(uint32_t) * offsets.size()); + auto map_col = MapColumn::create(key_col, value_col, offsets_col); + + std::vector _nulls{0, 1, 0, 0}; + auto null_col = UInt8Column::create(); + null_col->append_numbers(_nulls.data(), sizeof(uint8_t) * _nulls.size()); + auto nullable_col = NullableColumn::create(map_col, null_col); + + chunk->append_column(nullable_col, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestWriteNestedArray) { + // type_descs + std::vector type_descs; + auto type_int = TypeDescriptor::from_logical_type(TYPE_INT); + auto type_int_array = TypeDescriptor::from_logical_type(TYPE_ARRAY); + auto type_int_array_array = TypeDescriptor::from_logical_type(TYPE_ARRAY); + type_int_array.children.push_back(type_int); + type_int_array_array.children.push_back(type_int_array); + type_descs.push_back(type_int_array_array); + + // [[1], NULL, [], [2, NULL, 3]], [[4, 5], [6]], NULL + auto chunk = std::make_shared(); + { + auto int_data_col = Int32Column::create(); + std::vector nums{1, 2, -99, 3, 4, 5, 6}; + int_data_col->append_numbers(nums.data(), sizeof(int32_t) * nums.size()); + auto int_null_col = UInt8Column::create(); + std::vector nulls{0, 0, 1, 0, 0, 0, 0}; + int_null_col->append_numbers(nulls.data(), sizeof(uint8_t) * nulls.size()); + auto int_col = NullableColumn::create(int_data_col, int_null_col); + + auto offsets_col = UInt32Column::create(); + std::vector offsets{0, 1, 1, 1, 4, 6, 7}; + offsets_col->append_numbers(offsets.data(), sizeof(uint32_t) * offsets.size()); + auto array_data_col = ArrayColumn::create(int_col, offsets_col); + + std::vector _nulls{0, 1, 0, 0, 0, 0}; + auto array_null_col = UInt8Column::create(); + array_null_col->append_numbers(_nulls.data(), sizeof(uint8_t) * _nulls.size()); + auto array_col = NullableColumn::create(array_data_col, array_null_col); + + auto array_array_offsets_col = UInt32Column::create(); + std::vector array_array_offsets{0, 4, 6, 6}; + array_array_offsets_col->append_numbers(array_array_offsets.data(), + sizeof(uint32_t) * array_array_offsets.size()); + auto array_array_data_col = ArrayColumn::create(array_col, array_array_offsets_col); + + std::vector outer_nulls{0, 0, 1}; + auto array_array_null_col = UInt8Column::create(); + array_array_null_col->append_numbers(outer_nulls.data(), sizeof(uint8_t) * outer_nulls.size()); + auto array_array_col = NullableColumn::create(array_array_data_col, array_array_null_col); + + chunk->append_column(array_array_col, chunk->num_columns()); + } + + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +TEST_F(OrcChunkWriterTest, TestAsyncWriteChunk) { + std::vector type_descs{ + TypeDescriptor::from_logical_type(TYPE_INT), + }; + + auto chunk = std::make_shared(); + { + auto col0 = ColumnHelper::create_column(TypeDescriptor::from_logical_type(TYPE_INT), true); + std::vector int32_nums{INT32_MIN, INT32_MAX, 0, 1}; + auto count = col0->append_numbers(int32_nums.data(), size(int32_nums) * sizeof(int32_t)); + ASSERT_EQ(4, count); + chunk->append_column(col0, chunk->num_columns()); + } + auto column_names = _make_type_names(type_descs); + ASSIGN_OR_ABORT(auto schema, OrcChunkWriter::make_schema(column_names, type_descs)); + + // write chunk + auto st = _write_chunk_with_AsyncWriter(chunk, type_descs, std::move(schema)); + ASSERT_OK(st); + + // read chunk + ChunkPtr read_chunk; + st = _read_chunk(read_chunk, column_names, type_descs, true); + ASSERT_OK(st); + + // verify correctness + assert_equal_chunk(chunk.get(), read_chunk.get()); +} + +} // namespace starrocks \ No newline at end of file