From 4f1a202f92fe8737f221a1697709d5d07e6ec6fe Mon Sep 17 00:00:00 2001 From: "zihe.liu" Date: Fri, 20 Sep 2024 10:07:48 +0800 Subject: [PATCH] [BugFix] Capture resource group for scan task (#51121) Signed-off-by: zihe.liu (cherry picked from commit 3317f49811a181e4697041e5359484549cb4271b) # Conflicts: # be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp # be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp # be/src/exec/spill/executor.h # be/src/exec/spill/spiller.hpp # be/src/exec/workgroup/scan_task_queue.cpp # be/src/exec/workgroup/scan_task_queue.h # be/src/udf/java/utils.cpp --- .../exchange/mem_limited_chunk_queue.cpp | 567 ++++++++++++++++++ .../spillable_hash_join_probe_operator.cpp | 560 +++++++++++++++++ be/src/exec/pipeline/scan/scan_operator.cpp | 2 +- be/src/exec/spill/executor.h | 182 ++++++ be/src/exec/spill/spiller.hpp | 357 +++++++++++ be/src/exec/workgroup/scan_task_queue.cpp | 30 + be/src/exec/workgroup/scan_task_queue.h | 16 + be/src/udf/java/utils.cpp | 6 + 8 files changed, 1719 insertions(+), 1 deletion(-) create mode 100644 be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp create mode 100644 be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp create mode 100644 be/src/exec/spill/executor.h create mode 100644 be/src/exec/spill/spiller.hpp diff --git a/be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp b/be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp new file mode 100644 index 0000000000000..40a46d233be58 --- /dev/null +++ b/be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp @@ -0,0 +1,567 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/exchange/mem_limited_chunk_queue.h" + +#include +#include + +#include "common/logging.h" +#include "exec/pipeline/exchange/multi_cast_local_exchange.h" +#include "exec/pipeline/exchange/multi_cast_local_exchange_sink_operator.h" +#include "exec/spill/block_manager.h" +#include "exec/spill/data_stream.h" +#include "exec/spill/dir_manager.h" +#include "exec/spill/executor.h" +#include "exec/spill/mem_table.h" +#include "exec/spill/options.h" +#include "fmt/format.h" +#include "fs/fs.h" +#include "serde/column_array_serde.h" +#include "serde/protobuf_serde.h" +#include "testutil/sync_point.h" +#include "util/defer_op.h" +#include "util/raw_container.h" +#include "util/runtime_profile.h" + +namespace starrocks::pipeline { + +MemLimitedChunkQueue::MemLimitedChunkQueue(RuntimeState* state, int32_t consumer_number, Options opts) + : _state(state), + _consumer_number(consumer_number), + _opened_source_opcount(consumer_number), + _consumer_progress(consumer_number), + _opts(std::move(opts)) { + Block* block = new Block(consumer_number); + _head = block; + _tail = block; + _next_flush_block = block; + // push a dummy Cell + Cell dummy; + dummy.used_count = consumer_number; + _tail->cells.emplace_back(dummy); + + for (int32_t i = 0; i < consumer_number; i++) { + // init as dummy cell + _consumer_progress[i] = std::make_unique(_head, 0); + _opened_source_opcount[i] = 0; + } + + _iterator.block = _head; + _iterator.index = 0; +} + +MemLimitedChunkQueue::~MemLimitedChunkQueue() { + while (_head) { + Block* next = _head->next; + delete _head; + _head = next; + } +} + +Status MemLimitedChunkQueue::init_metrics(RuntimeProfile* parent) { + _peak_memory_bytes_counter = parent->AddHighWaterMarkCounter( + "ExchangerPeakMemoryUsage", TUnit::BYTES, + RuntimeProfile::Counter::create_strategy(TUnit::BYTES, TCounterMergeType::SKIP_FIRST_MERGE)); + _peak_memory_rows_counter = parent->AddHighWaterMarkCounter( + "ExchangerPeakBufferRowSize", TUnit::UNIT, + RuntimeProfile::Counter::create_strategy(TUnit::UNIT, TCounterMergeType::SKIP_FIRST_MERGE)); + + _flush_io_timer = ADD_TIMER(parent, "FlushIOTime"); + _flush_io_count = ADD_COUNTER(parent, "FlushIOCount", TUnit::UNIT); + _flush_io_bytes = ADD_COUNTER(parent, "FlushIOBytes", TUnit::BYTES); + _read_io_timer = ADD_TIMER(parent, "ReadIOTime"); + _read_io_count = ADD_COUNTER(parent, "ReadIOCount", TUnit::UNIT); + _read_io_bytes = ADD_COUNTER(parent, "ReadIOBytes", TUnit::BYTES); + return Status::OK(); +} + +Status MemLimitedChunkQueue::push(const ChunkPtr& chunk) { + RETURN_IF_ERROR(_get_io_task_status()); + std::unique_lock l(_mutex); + if (UNLIKELY(_chunk_builder == nullptr)) { + _chunk_builder = chunk->clone_empty(); + } + int32_t closed_source_number = _consumer_number - _opened_source_number; + + DCHECK(_tail != nullptr) << "tail won't be null"; + DCHECK(_tail->next == nullptr) << "tail should be the last block"; + + // create a new block + if (_tail->memory_usage >= _opts.block_size) { + Block* block = new Block(_consumer_number); + block->next = nullptr; + _tail->next = block; + _tail = block; + } + + _total_accumulated_rows += chunk->num_rows(); + _total_accumulated_bytes += chunk->memory_usage(); + + Cell cell; + cell.chunk = chunk; + cell.used_count = closed_source_number; + cell.accumulated_rows = _total_accumulated_rows; + cell.accumulated_bytes = _total_accumulated_bytes; + _tail->cells.emplace_back(cell); + _tail->memory_usage += chunk->memory_usage(); + +#ifndef BE_TEST + size_t in_memory_rows = _total_accumulated_rows - _flushed_accumulated_rows + _current_load_rows; + size_t in_memory_bytes = _total_accumulated_bytes - _flushed_accumulated_bytes + _current_load_bytes; + _peak_memory_rows_counter->set(in_memory_rows); + _peak_memory_bytes_counter->set(in_memory_bytes); +#endif + return Status::OK(); +} + +bool MemLimitedChunkQueue::can_push() { + std::shared_lock l(_mutex); + size_t unconsumed_bytes = _total_accumulated_bytes - _fastest_accumulated_bytes; + // if the fastest consumer still has a lot of data to consume, it will no longer accept new input. + if (unconsumed_bytes >= _opts.max_unconsumed_bytes) { + return false; + } + + size_t in_memory_bytes = _total_accumulated_bytes - _flushed_accumulated_bytes; + if (in_memory_bytes >= _opts.memory_limit && _next_flush_block->next != nullptr) { + if (bool expected = false; _has_flush_io_task.compare_exchange_strong(expected, true)) { + TEST_SYNC_POINT("MemLimitedChunkQueue::can_push::before_submit_flush_task"); + auto status = _submit_flush_task(); + if (!status.ok()) { + _update_io_task_status(status); + return true; + } + } + return false; + } + + return true; +} + +StatusOr MemLimitedChunkQueue::pop(int32_t consumer_index) { + TEST_SYNC_POINT("MemLimitedChunkQueue::pop::before_pop"); + DCHECK(consumer_index <= _consumer_number); + RETURN_IF_ERROR(_get_io_task_status()); + std::unique_lock l(_mutex); + + DCHECK(_consumer_progress[consumer_index] != nullptr); + auto iter = _consumer_progress[consumer_index].get(); + if (!iter->has_next()) { + if (_opened_sink_number == 0) { + return Status::EndOfFile("no more data"); + } + return Status::InternalError("unreachable path"); + } + // if the block is flushed forcely between can_pop and pop, + // we should return null and trigger load io task in next can_pop + if (!iter->next().get_block()->in_mem) { + return nullptr; + } + iter->move_to_next(); + DCHECK(iter->get_block()->in_mem); + Cell* cell = iter->get_cell(); + DCHECK(cell->chunk != nullptr); + + cell->used_count += 1; + VLOG_ROW << fmt::format( + "[MemLimitedChunkQueue] pop chunk for consumer[{}] from block[{}], accumulated_rows[{}], " + "accumulated_bytes[{}], used_count[{}]", + consumer_index, (void*)(iter->get_block()), cell->accumulated_rows, cell->accumulated_bytes, + cell->used_count); + iter->get_block()->remove_pending_reader(consumer_index); + auto result = cell->chunk; + + _update_progress(iter); + return result; +} + +void MemLimitedChunkQueue::_evict_loaded_block() { + if (_loaded_blocks.size() < _consumer_number) { + return; + } + // because each consumer can only read one block at a time, + // we can definitely find a block that no one is reading + Block* block = nullptr; + for (auto& loaded_block : _loaded_blocks) { + if (!loaded_block->has_pending_reader()) { + block = loaded_block; + break; + } + } + DCHECK(block != nullptr) << "can't find evicted block"; + _loaded_blocks.erase(block); + _current_load_rows -= block->flush_rows; + _current_load_bytes -= block->flush_bytes; + for (auto& cell : block->cells) { + if (cell.chunk != nullptr) { + cell.chunk.reset(); + } + } + block->in_mem = false; + VLOG_ROW << fmt::format("[MemLimitedChunkQueue] evict block [{}]", (void*)block); +} + +bool MemLimitedChunkQueue::can_pop(int32_t consumer_index) { + DCHECK(consumer_index < _consumer_number); + std::shared_lock l(_mutex); + DCHECK(_consumer_progress[consumer_index] != nullptr); + + auto iter = _consumer_progress[consumer_index].get(); + if (iter->has_next()) { + auto next_iter = iter->next(); + if (next_iter.block->in_mem) { + next_iter.block->add_pending_reader(consumer_index); + TEST_SYNC_POINT("MemLimitedChunkQueue::can_pop::return_true::1"); + return true; + } + TEST_SYNC_POINT_CALLBACK("MemLimitedChunkQueue::can_pop::before_submit_load_task", next_iter.block); + if (bool expected = false; next_iter.block->has_load_task.compare_exchange_strong(expected, true)) { + VLOG_ROW << fmt::format("[MemLimitedChunkQueue] submit load task for block [{}]", (void*)next_iter.block); + auto status = _submit_load_task(next_iter.block); + if (!status.ok()) { + _update_io_task_status(status); + return true; + } + } + return false; + } else if (_opened_sink_number == 0) { + TEST_SYNC_POINT("MemLimitedChunkQueue::can_pop::return_true::2"); + return true; + } + + return false; +} + +void MemLimitedChunkQueue::open_consumer(int32_t consumer_index) { + std::unique_lock l(_mutex); + if (_opened_source_opcount[consumer_index] == 0) { + _opened_source_number++; + } + _opened_source_opcount[consumer_index]++; +} + +void MemLimitedChunkQueue::close_consumer(int32_t consumer_index) { + std::unique_lock l(_mutex); + _opened_source_opcount[consumer_index] -= 1; + if (_opened_source_opcount[consumer_index] == 0) { + _opened_source_number--; + _close_consumer(consumer_index); + } +} + +void MemLimitedChunkQueue::open_producer() { + std::unique_lock l(_mutex); + _opened_sink_number++; +} + +void MemLimitedChunkQueue::close_producer() { + std::unique_lock l(_mutex); + _opened_sink_number--; +} + +void MemLimitedChunkQueue::_update_progress(Iterator* iter) { + if (iter != nullptr) { + Cell* cell = iter->get_cell(); + _fastest_accumulated_rows = std::max(_fastest_accumulated_rows, cell->accumulated_rows); + _fastest_accumulated_bytes = std::max(_fastest_accumulated_bytes, cell->accumulated_bytes); + } else { + // nullptr means one consumer is closed, should update + _fastest_accumulated_rows = 0; + _fastest_accumulated_bytes = 0; + for (int32_t i = 0; i < _consumer_number; i++) { + auto iter = _consumer_progress[i].get(); + if (iter == nullptr) { + continue; + } + if (!iter->has_next()) { + // all data is consumed + _fastest_accumulated_rows = _total_accumulated_rows; + _fastest_accumulated_bytes = _total_accumulated_bytes; + break; + } + Cell* cell = iter->get_cell(); + _fastest_accumulated_rows = std::max(_fastest_accumulated_rows, cell->accumulated_rows); + _fastest_accumulated_bytes = std::max(_fastest_accumulated_bytes, cell->accumulated_bytes); + } + } + + while (_iterator.valid()) { + Cell* cell = _iterator.get_cell(); + if (cell->used_count != _consumer_number) { + break; + } + _head_accumulated_rows = cell->accumulated_rows; + _head_accumulated_bytes = cell->accumulated_bytes; + // advance flushed position + _flushed_accumulated_rows = std::max(_flushed_accumulated_rows, _head_accumulated_rows); + _flushed_accumulated_bytes = std::max(_flushed_accumulated_bytes, _head_accumulated_bytes); + VLOG_ROW << fmt::format("release chunk, current head_accumulated_rows[{}], head_accumulated_bytes[{}]", + _head_accumulated_rows, _head_accumulated_bytes); + cell->chunk.reset(); + if (!_iterator.has_next()) { + break; + } + _iterator.move_to_next(); + } +} + +void MemLimitedChunkQueue::_close_consumer(int32_t consumer_index) { + DCHECK(_consumer_progress[consumer_index] != nullptr); + auto iter = _consumer_progress[consumer_index].get(); + if (iter->has_next()) { + do { + iter->move_to_next(); + Cell* cell = iter->get_cell(); + cell->used_count += 1; + } while (iter->has_next()); + } + _consumer_progress[consumer_index].reset(); + _update_progress(); +} + +Status MemLimitedChunkQueue::_flush() { + Block* block = nullptr; + std::vector chunks; + { + std::unique_lock l(_mutex); + + std::vector flush_idx; + // 1. find block to flush + while (_next_flush_block) { + // don't flush the last block + if (_next_flush_block->next == nullptr) { + break; + } + if (_next_flush_block->block) { + // already flushed, should skip it + _next_flush_block = _next_flush_block->next; + } else { + flush_idx.clear(); + for (size_t i = 0; i < _next_flush_block->cells.size(); i++) { + auto& cell = _next_flush_block->cells[i]; + if (cell.chunk == nullptr || cell.used_count == _consumer_number) { + continue; + } + flush_idx.push_back(i); + } + if (!flush_idx.empty()) { + break; + } + _next_flush_block = _next_flush_block->next; + } + } + block = _next_flush_block; + DCHECK(block != nullptr) << "block can't be null"; + TEST_SYNC_POINT_CALLBACK("MemLimitedChunkQueue::flush::after_find_block_to_flush", block); + if (block->next == nullptr) { + return Status::OK(); + } + if (block->block != nullptr) { + return Status::OK(); + } + // If a consumer is about to read this block, we should avoid flushing it. + // However, there is one exception: when the fatest consumer has no data to consume, + // in order to avoid blocking the producer, we must flush it. + // In this case, if the consumer reads this block, we will return null and trigger a new load task on the next can_pop + if (block->has_pending_reader()) { + size_t unconsumed_bytes = _total_accumulated_bytes - _fastest_accumulated_bytes; + if (unconsumed_bytes > 0) { + VLOG_ROW << fmt::format("block [{}] has pending reader, should skip flush it", (void*)block); + return Status::OK(); + } + VLOG_ROW << fmt::format("force flush block [{}] to avoid blocking producer", (void*)block); + } + + for (const auto idx : flush_idx) { + auto& cell = block->cells[idx]; + cell.flushed = true; + chunks.emplace_back(cell.chunk); + } + } + + DCHECK(!chunks.empty()); + + size_t max_serialize_size = 0; + size_t flushed_rows = 0, flushed_bytes = 0; + for (auto& chunk : chunks) { + for (const auto& column : chunk->columns()) { + max_serialize_size += serde::ColumnArraySerde::max_serialized_size(*column, _opts.encode_level); + } + flushed_rows += chunk->num_rows(); + flushed_bytes += chunk->memory_usage(); + } + std::shared_ptr spill_block; + + TEST_SYNC_POINT_CALLBACK("MemLimitedChunkQueue::after_calculate_max_serialize_size", &max_serialize_size); + raw::RawString serialize_buffer; + serialize_buffer.resize(max_serialize_size); + uint8_t* buf = reinterpret_cast(serialize_buffer.data()); + uint8_t* begin = buf; + // 2. serialize data + for (auto& chunk : chunks) { + for (const auto& column : chunk->columns()) { + buf = serde::ColumnArraySerde::serialize(*column, buf, false, _opts.encode_level); + RETURN_IF(buf == nullptr, Status::InternalError("serialize data error")); + } + } + size_t content_length = buf - begin; + // 3. acquire block + spill::AcquireBlockOptions options; + options.block_size = content_length; + options.direct_io = false; + options.query_id = _state->query_id(); + options.fragment_instance_id = _state->fragment_instance_id(); + options.name = "mcast_local_exchange"; + options.plan_node_id = _opts.plan_node_id; + ASSIGN_OR_RETURN(spill_block, _opts.block_manager->acquire_block(options)); + + // 4. flush serialized data + std::vector data; + data.emplace_back(serialize_buffer.data(), content_length); + { + SCOPED_TIMER(_flush_io_timer); + RETURN_IF_ERROR(spill_block->append(data)); + RETURN_IF_ERROR(spill_block->flush()); + COUNTER_UPDATE(_flush_io_count, 1); + COUNTER_UPDATE(_flush_io_bytes, content_length); + } + + { + std::unique_lock l(_mutex); + block->block = spill_block; + block->in_mem = false; + // 5. clear flushed data in memory + for (auto& cell : block->cells) { + if (cell.flushed) { + cell.chunk.reset(); + block->flush_chunks++; + } + } + block->flush_rows = flushed_rows; + block->flush_bytes = flushed_bytes; + _flushed_accumulated_rows = block->cells.back().accumulated_rows; + _flushed_accumulated_bytes = block->cells.back().accumulated_bytes; + _next_flush_block = block->next; + VLOG_ROW << fmt::format("flush block [{}], rows[{}], bytes[{}], flushed bytes[{}]", (void*)block, + _flushed_accumulated_rows, _flushed_accumulated_bytes, content_length); + TEST_SYNC_POINT_CALLBACK("MemLimitedChunkQueue::after_flush_block", block); + } + return Status::OK(); +} + +Status MemLimitedChunkQueue::_submit_flush_task() { + auto flush_task = [this, guard = RESOURCE_TLS_MEMTRACER_GUARD(_state)](auto& yield_ctx) { + TEST_SYNC_POINT("MemLimitedChunkQueue::before_execute_flush_task"); + RETURN_IF(!guard.scoped_begin(), (void)0); + DEFER_GUARD_END(guard); + auto defer = DeferOp([&]() { + _has_flush_io_task.store(false); + TEST_SYNC_POINT("MemLimitedChunkQueue::after_execute_flush_task"); + }); + + auto status = _flush(); + if (!status.ok()) { + _update_io_task_status(status); + return; + } + }; + + auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(flush_task)); + RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task))); + return Status::OK(); +} + +// reload block from disk +Status MemLimitedChunkQueue::_load(Block* block) { + std::shared_ptr block_reader; + raw::RawString buffer; + size_t block_len; + size_t flush_chunks; + // 1. read serialize data + { + std::shared_lock l(_mutex); + DCHECK(!block->in_mem) << "block should not be in memory, " << (void*)(block); + DCHECK(block->block != nullptr) << "block must have spill block"; + + spill::BlockReaderOptions options; + options.enable_buffer_read = true; + options.read_io_timer = _read_io_timer; + options.read_io_count = _read_io_count; + options.read_io_bytes = _read_io_bytes; + block_reader = block->block->get_reader(options); + block_len = block->block->size(); + flush_chunks = block->flush_chunks; + } + VLOG_ROW << fmt::format("load block begin, block[{}], flushed_bytes[{}], flushed_chunks[{}]", (void*)block, + block_len, flush_chunks); + buffer.resize(block_len); + RETURN_IF_ERROR(block_reader->read_fully(buffer.data(), block_len)); + + // 2. deserialize data + uint8_t* buf = reinterpret_cast(buffer.data()); + const uint8_t* read_cursor = buf; + std::vector chunks(flush_chunks); + for (auto& chunk : chunks) { + chunk = _chunk_builder->clone_empty(); + for (auto& column : chunk->columns()) { + read_cursor = serde::ColumnArraySerde::deserialize(read_cursor, column.get(), false, _opts.encode_level); + RETURN_IF(read_cursor == nullptr, Status::InternalError("deserialize failed")); + } + } + { + std::unique_lock l(_mutex); + for (size_t i = 0, j = 0; i < chunks.size() && j < block->cells.size(); j++) { + if (block->cells[j].flushed) { + block->cells[j].chunk = chunks[i++]; + } + } + block->in_mem = true; + block->has_load_task = false; + _evict_loaded_block(); + _loaded_blocks.insert(block); + _current_load_rows += block->flush_rows; + _current_load_bytes += block->flush_bytes; + VLOG_ROW << fmt::format("load block done, block[{}], current load rows[{}], current load bytes[{}]", + (void*)block, _current_load_rows, _current_load_bytes); + } + + return Status::OK(); +} + +Status MemLimitedChunkQueue::_submit_load_task(Block* block) { + auto load_task = [this, block, guard = RESOURCE_TLS_MEMTRACER_GUARD(_state)](auto& yield_ctx) { + TEST_SYNC_POINT_CALLBACK("MemLimitedChunkQueue::before_execute_load_task", block); + RETURN_IF(!guard.scoped_begin(), (void)0); + DEFER_GUARD_END(guard); + auto status = _load(block); + if (!status.ok()) { + _update_io_task_status(status); + } + }; + auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(load_task)); + RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task))); + return Status::OK(); +} + +const size_t memory_limit_per_producer = 1L * 1024 * 1024; + +void MemLimitedChunkQueue::enter_release_memory_mode() { + std::unique_lock l(_mutex); + size_t new_memory_limit = memory_limit_per_producer * _opened_sink_number; + VLOG_ROW << fmt::format("change memory limit from [{}] to [{}]", _opts.memory_limit, new_memory_limit); + _opts.memory_limit = new_memory_limit; +} + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp b/be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp new file mode 100644 index 0000000000000..99e1889caa279 --- /dev/null +++ b/be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp @@ -0,0 +1,560 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/hashjoin/spillable_hash_join_probe_operator.h" + +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "exec/hash_joiner.h" +#include "exec/pipeline/hashjoin/hash_join_probe_operator.h" +#include "exec/pipeline/hashjoin/hash_joiner_factory.h" +#include "exec/pipeline/query_context.h" +#include "exec/spill/executor.h" +#include "exec/spill/partition.h" +#include "exec/spill/spill_components.h" +#include "exec/spill/spiller.h" +#include "exec/spill/spiller.hpp" +#include "gen_cpp/PlanNodes_types.h" +#include "gutil/casts.h" +#include "runtime/current_thread.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" + +namespace starrocks::pipeline { +Status SpillableHashJoinProbeOperator::prepare(RuntimeState* state) { + RETURN_IF_ERROR(HashJoinProbeOperator::prepare(state)); + _need_post_probe = has_post_probe(_join_prober->join_type()); + _probe_spiller->set_metrics(spill::SpillProcessMetrics(_unique_metrics.get(), state->mutable_total_spill_bytes())); + metrics.hash_partitions = ADD_COUNTER(_unique_metrics.get(), "SpillPartitions", TUnit::UNIT); + metrics.build_partition_peak_memory_usage = _unique_metrics->AddHighWaterMarkCounter( + "SpillBuildPartitionPeakMemoryUsage", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES)); + metrics.prober_peak_memory_usage = _unique_metrics->AddHighWaterMarkCounter( + "SpillProberPeakMemoryUsage", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES)); + RETURN_IF_ERROR(_probe_spiller->prepare(state)); + auto wg = state->fragment_ctx()->workgroup(); + return Status::OK(); +} + +void SpillableHashJoinProbeOperator::close(RuntimeState* state) { + HashJoinProbeOperator::close(state); +} + +bool SpillableHashJoinProbeOperator::has_output() const { + if (!is_ready()) { + DCHECK(false) << "is_ready() must be true before call has_output"; + return false; + } + if (!spilled()) { + return HashJoinProbeOperator::has_output(); + } + + // if any partition hash_table is loading. just return false + if (!_latch.ready()) { + return false; + } + + if (!_status().ok()) { + return true; + } + + if (_processing_partitions.empty()) { + as_mutable()->_acquire_next_partitions(); + _update_status(as_mutable()->_load_all_partition_build_side(runtime_state())); + return false; + } + + // if any hash_join_prober has data. + for (auto prober : _probers) { + if (!prober->probe_chunk_empty()) { + return true; + } + } + + // + if (_probe_spiller->is_full()) { + return false; + } + + if (_is_finishing) { + if (_all_partition_finished()) { + return false; + } + + // reader is empty. + // need to call pull_chunk to acquire next partitions + if (_current_reader.empty()) { + return true; + } + + for (size_t i = 0; i < _probers.size(); ++i) { + if (_current_reader[i]->has_output_data()) { + return true; + } else if (!_current_reader[i]->has_restore_task()) { + // if trigger_restore returns error, should record this status and return it in pull_chunk + _update_status(_current_reader[i]->trigger_restore( + runtime_state(), + RESOURCE_TLS_MEMTRACER_GUARD(runtime_state(), std::weak_ptr(_current_reader[i])))); + if (!_status().ok()) { + return true; + } + } + } + } + + return false; +} + +bool SpillableHashJoinProbeOperator::need_input() const { + if (!is_ready()) { + DCHECK(false) << "is_ready() must be true before call has_output"; + return false; + } + if (!spilled()) { + return HashJoinProbeOperator::need_input(); + } + + if (!_latch.ready()) { + return false; + } + + if (_processing_partitions.empty()) { + as_mutable()->_acquire_next_partitions(); + _update_status(as_mutable()->_load_all_partition_build_side(runtime_state())); + return false; + } + + if (_probe_spiller->is_full()) { + return false; + } + + for (auto prober : _probers) { + if (!prober->probe_chunk_empty()) { + return false; + } + } + + return true; +} + +bool SpillableHashJoinProbeOperator::is_finished() const { + if (!spilled()) { + return HashJoinProbeOperator::is_finished(); + } + + if (_is_finished) { + return true; + } + + if (_is_finishing && _all_partition_finished()) { + return true; + } + + return false; +} + +Status SpillableHashJoinProbeOperator::set_finishing(RuntimeState* state) { + if (!spilled()) { + return HashJoinProbeOperator::set_finishing(state); + } + if (state->is_cancelled()) { + _probe_spiller->cancel(); + } + _is_finishing = true; + return Status::OK(); +} + +Status SpillableHashJoinProbeOperator::set_finished(RuntimeState* state) { + _is_finished = true; + return HashJoinProbeOperator::set_finished(state); +} + +Status SpillableHashJoinProbeOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) { + RETURN_IF_ERROR(_status()); + if (!spilled()) { + return HashJoinProbeOperator::push_chunk(state, chunk); + } + + RETURN_IF_ERROR(_push_probe_chunk(state, chunk)); + + return Status::OK(); +} + +Status SpillableHashJoinProbeOperator::_push_probe_chunk(RuntimeState* state, const ChunkPtr& chunk) { + // compute hash + size_t num_rows = chunk->num_rows(); + auto hash_column = spill::SpillHashColumn::create(num_rows); + auto& hash_values = hash_column->get_data(); + + // TODO: use another hash function + for (auto& expr_ctx : _join_prober->probe_expr_ctxs()) { + ASSIGN_OR_RETURN(auto res, expr_ctx->evaluate(chunk.get())); + res->fnv_hash(hash_values.data(), 0, num_rows); + } + + auto partition_processer = [&chunk, this, state, &hash_values](spill::SpilledPartition* probe_partition, + const std::vector& selection, int32_t from, + int32_t size) { + // nothing to do for empty partition + if (could_short_circuit(_join_prober->join_type())) { + // For left semi join and inner join we can just skip the empty partition + auto build_partition_iter = _pid_to_build_partition.find(probe_partition->partition_id); + if (build_partition_iter != _pid_to_build_partition.end()) { + if (build_partition_iter->second->empty()) { + return; + } + } + } + + for (size_t i = from; i < from + size; ++i) { + DCHECK_EQ(hash_values[selection[i]] & probe_partition->mask(), + probe_partition->partition_id & probe_partition->mask()); + } + + auto iter = _pid_to_process_id.find(probe_partition->partition_id); + if (iter == _pid_to_process_id.end()) { + auto mem_table = probe_partition->spill_writer->mem_table(); + (void)mem_table->append_selective(*chunk, selection.data(), from, size); + } else { + // maybe has some small chunk problem + // TODO: add chunk accumulator here + auto partitioned_chunk = chunk->clone_empty(); + (void)partitioned_chunk->append_selective(*chunk, selection.data(), from, size); + (void)_probers[iter->second]->push_probe_chunk(state, std::move(partitioned_chunk)); + } + probe_partition->num_rows += size; + }; + RETURN_IF_ERROR(_probe_spiller->partitioned_spill(state, chunk, hash_column.get(), partition_processer, + TRACKER_WITH_SPILLER_GUARD(state, _probe_spiller))); + + return Status::OK(); +} + +Status SpillableHashJoinProbeOperator::_load_partition_build_side(workgroup::YieldContext& ctx, RuntimeState* state, + const std::shared_ptr& reader, + size_t idx) { + using SyncTaskExecutor = spill::SyncTaskExecutor; + using MemTrackerGuard = spill::MemTrackerGuard; + TRY_CATCH_ALLOC_SCOPE_START() + SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(state->instance_mem_tracker()); + auto builder = _builders[idx]; + auto prober = _probers[idx]; + bool finish = false; + int64_t hash_table_mem_usage = builder->ht_mem_usage(); + enum SpillLoadPartitionStage { BEGIN = 0, FINISH = 1 }; + ctx.total_yield_point_cnt = FINISH; + auto wg = ctx.wg; + while (!finish && !_is_finished) { + BREAK_IF_YIELD(wg, &ctx.need_yield, ctx.time_spent_ns); + { + SCOPED_RAW_TIMER(&ctx.time_spent_ns); + if (state->is_cancelled()) { + return Status::Cancelled("cancelled"); + } + + RETURN_IF_ERROR(reader->trigger_restore(state, MemTrackerGuard(tls_mem_tracker))); + auto chunk_st = reader->restore(state, MemTrackerGuard(tls_mem_tracker)); + + if (chunk_st.ok() && chunk_st.value() != nullptr && !chunk_st.value()->is_empty()) { + int64_t old_mem_usage = hash_table_mem_usage; + RETURN_IF_ERROR(builder->append_chunk(std::move(chunk_st.value()))); + hash_table_mem_usage = builder->ht_mem_usage(); + COUNTER_ADD(metrics.build_partition_peak_memory_usage, hash_table_mem_usage - old_mem_usage); + } else if (chunk_st.status().is_end_of_file()) { + RETURN_IF_ERROR(builder->build(state)); + prober->attach(builder, _join_prober->probe_metrics()); + finish = true; + } else if (!chunk_st.ok()) { + return chunk_st.status(); + } + } + } + if (finish) { + DCHECK_EQ(builder->hash_table_row_count(), _processing_partitions[idx]->num_rows); + } + TRY_CATCH_ALLOC_SCOPE_END() + return Status::OK(); +} + +Status SpillableHashJoinProbeOperator::_load_all_partition_build_side(RuntimeState* state) { + auto spill_readers = _join_builder->spiller()->get_partition_spill_readers(_processing_partitions); + _latch.reset(_processing_partitions.size()); + int32_t driver_id = CurrentThread::current().get_driver_id(); + auto query_ctx = state->query_ctx()->weak_from_this(); + for (size_t i = 0; i < _processing_partitions.size(); ++i) { + std::shared_ptr reader = std::move(spill_readers[i]); + auto task = [this, state, reader, i, query_ctx, driver_id](auto& yield_ctx) { + if (auto acquired = query_ctx.lock()) { + SCOPED_SET_TRACE_INFO(driver_id, state->query_id(), state->fragment_instance_id()); + auto defer = CancelableDefer([&]() { + _latch.count_down(); + yield_ctx.set_finished(); + }); + if (!yield_ctx.task_context_data.has_value()) { + yield_ctx.task_context_data = std::make_shared(); + } + yield_ctx.time_spent_ns = 0; + yield_ctx.need_yield = false; + _update_status(_load_partition_build_side(yield_ctx, state, reader, i)); + if (yield_ctx.need_yield) { + defer.cancel(); + } + } + }; + auto yield_func = [&](workgroup::ScanTask&& task) { spill::IOTaskExecutor::force_submit(std::move(task)); }; + auto io_task = + workgroup::ScanTask(_join_builder->spiller()->options().wg, std::move(task), std::move(yield_func)); + RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task))); + } + return Status::OK(); +} + +void SpillableHashJoinProbeOperator::_update_status(Status&& status) const { + if (!status.ok()) { + std::lock_guard guard(_mutex); + _operator_status = std::move(status); + } +} + +Status SpillableHashJoinProbeOperator::_status() const { + std::lock_guard guard(_mutex); + return _operator_status; +} + +void SpillableHashJoinProbeOperator::_check_partitions() { + if (_is_finishing) { +#ifndef NDEBUG + auto partitioned_writer = down_cast(_probe_spiller->writer().get()); + size_t build_rows = 0; + for (const auto& [level, partitions] : partitioned_writer->level_to_partitions()) { + auto writer = down_cast(_join_builder->spiller()->writer().get()); + auto& build_partitions = writer->level_to_partitions().find(level)->second; + DCHECK_EQ(build_partitions.size(), partitions.size()); + for (size_t i = 0; i < partitions.size(); ++i) { + build_rows += build_partitions[i]->num_rows; + } + // CHECK if left table is the same as right table + // for (size_t i = 0; i < partitions.size(); ++i) { + // DCHECK_EQ(partitions[i]->num_rows, build_partitions[i]->num_rows); + // } + } + DCHECK_EQ(build_rows, _join_builder->spiller()->spilled_append_rows()); +#endif + } +} + +Status SpillableHashJoinProbeOperator::_restore_probe_partition(RuntimeState* state) { + for (size_t i = 0; i < _probers.size(); ++i) { + // probe partition has been processed + if (_probe_read_eofs[i]) continue; + if (!_current_reader[i]->has_restore_task()) { + RETURN_IF_ERROR(_current_reader[i]->trigger_restore( + state, RESOURCE_TLS_MEMTRACER_GUARD(state, std::weak_ptr(_current_reader[i])))); + } + if (_current_reader[i]->has_output_data()) { + auto chunk_st = _current_reader[i]->restore( + state, RESOURCE_TLS_MEMTRACER_GUARD(state, std::weak_ptr(_current_reader[i]))); + if (chunk_st.ok() && chunk_st.value() && !chunk_st.value()->is_empty()) { + RETURN_IF_ERROR(_probers[i]->push_probe_chunk(state, std::move(chunk_st.value()))); + } else if (chunk_st.status().is_end_of_file()) { + _probe_read_eofs[i] = true; + } else if (!chunk_st.ok()) { + return chunk_st.status(); + } + } + } + return Status::OK(); +} + +StatusOr SpillableHashJoinProbeOperator::pull_chunk(RuntimeState* state) { + RETURN_IF_ERROR(_status()); + if (!spilled()) { + return HashJoinProbeOperator::pull_chunk(state); + } + + _check_partitions(); + + auto all_probe_partition_is_empty = [this]() { + for (auto& _prober : _probers) { + if (!_prober->probe_chunk_empty()) { + return false; + } + } + return true; + }; + + bool probe_has_no_output = all_probe_partition_is_empty() && !_has_probe_remain; + + if (_current_reader.empty() && _is_finishing && probe_has_no_output) { + // init spill reader + _current_reader = _probe_spiller->get_partition_spill_readers(_processing_partitions); + _probe_read_eofs.assign(_current_reader.size(), false); + _probe_post_eofs.assign(_current_reader.size(), false); + _has_probe_remain = true; + } + + // restore chunk from spilled partition then push it to hash join prober + if (!_current_reader.empty() && all_probe_partition_is_empty()) { + RETURN_IF_ERROR(_restore_probe_partition(state)); + } + + // probe chunk + for (size_t i = 0; i < _probers.size(); ++i) { + if (!_probers[i]->probe_chunk_empty()) { + ASSIGN_OR_RETURN(auto res, _probers[i]->probe_chunk(state)); + return res; + } + } + + size_t eofs = std::accumulate(_probe_read_eofs.begin(), _probe_read_eofs.end(), 0); + if (_need_post_probe && _has_probe_remain) { + if (_is_finishing) { + bool has_remain = false; + for (size_t i = 0; i < _probers.size(); ++i) { + if (!_probe_post_eofs[i] && _probe_read_eofs[i]) { + bool has_remain = false; + ASSIGN_OR_RETURN(auto res, _probers[i]->probe_remain(state, &has_remain)); + _probe_post_eofs[i] = !has_remain; + if (res && !res->is_empty()) { + return res; + } + } + has_remain |= !_probe_post_eofs[i]; + } + _has_probe_remain = has_remain; + } + } else { + _has_probe_remain = false; + } + + // processing partitions + if (_is_finishing && eofs == _processing_partitions.size() && !_has_probe_remain) { + DCHECK(all_probe_partition_is_empty()); + // current partition is finished + for (auto* partition : _processing_partitions) { + _processed_partitions.emplace(partition->partition_id); + } + _processing_partitions.clear(); + _current_reader.clear(); + _has_probe_remain = false; + _builders.clear(); + COUNTER_SET(metrics.build_partition_peak_memory_usage, 0); + } + + return nullptr; +} + +bool SpillableHashJoinProbeOperator::spilled() const { + return _join_builder->spiller()->spilled(); +} + +void SpillableHashJoinProbeOperator::_acquire_next_partitions() { + // get all spill partition + if (_build_partitions.empty()) { + _join_builder->spiller()->get_all_partitions(&_build_partitions); + for (const auto* partition : _build_partitions) { + _pid_to_build_partition[partition->partition_id] = partition; + } + + _probe_spiller->set_partition(_build_partitions); + COUNTER_SET(metrics.hash_partitions, (int64_t)_build_partitions.size()); + } + + size_t bytes_usage = 0; + // process the partition in memory firstly + if (_processing_partitions.empty()) { + for (auto partition : _build_partitions) { + if (partition->in_mem && !_processed_partitions.count(partition->partition_id)) { + _processing_partitions.emplace_back(partition); + bytes_usage += partition->bytes; + _pid_to_process_id.emplace(partition->partition_id, _processing_partitions.size() - 1); + } + } + } + + size_t avaliable_bytes = _mem_resource_manager.operator_avaliable_memory_bytes(); + // process the partition could be hold in memory + if (_processing_partitions.empty()) { + for (const auto* partition : _build_partitions) { + if (!partition->in_mem && !_processed_partitions.count(partition->partition_id)) { + if ((partition->bytes + bytes_usage < avaliable_bytes || _processing_partitions.empty()) && + std::find(_processing_partitions.begin(), _processing_partitions.end(), partition) == + _processing_partitions.end()) { + _processing_partitions.emplace_back(partition); + bytes_usage += partition->bytes; + _pid_to_process_id.emplace(partition->partition_id, _processing_partitions.size() - 1); + } + } + } + } + _component_pool.clear(); + size_t process_partition_nums = _processing_partitions.size(); + _probers.resize(process_partition_nums); + _builders.resize(process_partition_nums); + for (size_t i = 0; i < process_partition_nums; ++i) { + _probers[i] = _join_prober->new_prober(&_component_pool); + _builders[i] = _join_builder->new_builder(&_component_pool); + _builders[i]->create(_join_builder->hash_table_param()); + _probe_read_eofs.assign(process_partition_nums, true); + _probe_post_eofs.assign(process_partition_nums, false); + } +} + +bool SpillableHashJoinProbeOperator::_all_loaded_partition_data_ready() { + // check all loaded partition data ready + return std::all_of(_builders.begin(), _builders.end(), [](const auto* builder) { return builder->ready(); }); +} + +bool SpillableHashJoinProbeOperator::_all_partition_finished() const { + // In some cases has_output may be skipped. + // So we call build_partitions.empty() first to make sure the parition loads + return !_build_partitions.empty() && _processed_partitions.size() == _build_partitions.size(); +} + +Status SpillableHashJoinProbeOperatorFactory::prepare(RuntimeState* state) { + RETURN_IF_ERROR(HashJoinProbeOperatorFactory::prepare(state)); + + _spill_options = std::make_shared(config::spill_init_partition, false); + _spill_options->spill_mem_table_bytes_size = state->spill_mem_table_size(); + _spill_options->mem_table_pool_size = state->spill_mem_table_num(); + _spill_options->spill_type = spill::SpillFormaterType::SPILL_BY_COLUMN; + _spill_options->block_manager = state->query_ctx()->spill_manager()->block_manager(); + _spill_options->name = "hash-join-probe"; + _spill_options->plan_node_id = _plan_node_id; + _spill_options->encode_level = state->spill_encode_level(); + _spill_options->wg = state->fragment_ctx()->workgroup(); + _spill_options->enable_buffer_read = state->enable_spill_buffer_read(); + _spill_options->max_read_buffer_bytes = state->max_spill_read_buffer_bytes_per_driver(); + + return Status::OK(); +} + +OperatorPtr SpillableHashJoinProbeOperatorFactory::create(int32_t degree_of_parallelism, int32_t driver_sequence) { + auto spiller = _spill_factory->create(*_spill_options); + + auto prober = std::make_shared( + this, _id, "spillable_hash_join_probe", _plan_node_id, driver_sequence, + _hash_joiner_factory->create_prober(degree_of_parallelism, driver_sequence), + _hash_joiner_factory->get_builder(degree_of_parallelism, driver_sequence)); + + prober->set_probe_spiller(spiller); + + return prober; +} + +} // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index f00361b122290..493e2f47e08a3 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -353,7 +353,7 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in int32_t driver_id = CurrentThread::current().get_driver_id(); workgroup::ScanTask task; - task.workgroup = _workgroup.get(); + task.workgroup = _workgroup; // TODO: consider more factors, such as scan bytes and i/o time. task.priority = vectorized::OlapScanNode::compute_priority(_submit_task_counter->value()); const auto io_task_start_nano = MonotonicNanos(); diff --git a/be/src/exec/spill/executor.h b/be/src/exec/spill/executor.h new file mode 100644 index 0000000000000..5ca23b5415f17 --- /dev/null +++ b/be/src/exec/spill/executor.h @@ -0,0 +1,182 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "common/compiler_util.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/query_context.h" +#include "exec/workgroup/scan_executor.h" +#include "exec/workgroup/scan_task_queue.h" +#include "exec/workgroup/work_group_fwd.h" +#include "gen_cpp/Types_types.h" +#include "runtime/current_thread.h" +#include "runtime/mem_tracker.h" +#include "util/priority_thread_pool.hpp" + +namespace starrocks::spill { +struct TraceInfo { + TraceInfo(RuntimeState* state) : query_id(state->query_id()), fragment_id(state->fragment_instance_id()) {} + TUniqueId query_id; + TUniqueId fragment_id; +}; + +struct EmptyMemGuard { + bool scoped_begin() const { return true; } + void scoped_end() const {} +}; + +struct MemTrackerGuard { + MemTrackerGuard(MemTracker* scope_tracker_) : scope_tracker(scope_tracker_) {} + bool scoped_begin() const { + old_tracker = tls_thread_status.set_mem_tracker(scope_tracker); + return true; + } + void scoped_end() const { tls_thread_status.set_mem_tracker(old_tracker); } + MemTracker* scope_tracker; + mutable MemTracker* old_tracker = nullptr; +}; + +template +struct ResourceMemTrackerGuard { + ResourceMemTrackerGuard(MemTracker* scope_tracker_, WeakPtrs&&... args) + : scope_tracker(scope_tracker_), resources(std::make_tuple(args...)) {} + + bool scoped_begin() const { + auto res = capture(resources); + if (!res.has_value()) { + return false; + } + captured = std::move(res.value()); + old_tracker = tls_thread_status.set_mem_tracker(scope_tracker); + return true; + } + + void scoped_end() const { + tls_thread_status.set_mem_tracker(old_tracker); + captured = {}; + } + +private: + auto capture(const std::tuple& weak_tup) const + -> std::optional...>> { + auto shared_ptrs = std::make_tuple(std::get(weak_tup).lock()...); + bool all_locked = ((std::get(weak_tup).lock() != nullptr) && ...); + if (all_locked) { + return shared_ptrs; + } else { + return std::nullopt; + } + } + + MemTracker* scope_tracker; + std::tuple resources; + + mutable std::tuple...> captured; + mutable MemTracker* old_tracker = nullptr; +}; + +struct SpillIOTaskContext { + bool use_local_io_executor = true; +}; +using SpillIOTaskContextPtr = std::shared_ptr; + +struct IOTaskExecutor { + static Status submit(workgroup::ScanTask task) { + const auto& task_ctx = task.get_work_context(); + bool use_local_io_executor = true; + if (task_ctx.task_context_data.has_value()) { + auto io_ctx = std::any_cast(task_ctx.task_context_data); + use_local_io_executor = io_ctx->use_local_io_executor; + } + auto* pool = get_executor(task.workgroup.get(), use_local_io_executor); + if (pool->submit(std::move(task))) { + return Status::OK(); + } else { + return Status::InternalError("offer task failed"); + } + } + static void force_submit(workgroup::ScanTask task) { + const auto& task_ctx = task.get_work_context(); + auto io_ctx = std::any_cast(task_ctx.task_context_data); + auto* pool = get_executor(task.workgroup.get(), io_ctx->use_local_io_executor); + pool->force_submit(std::move(task)); + } + +private: + inline static workgroup::ScanExecutor* get_executor(workgroup::WorkGroup* wg, bool use_local_io_executor) { + return use_local_io_executor ? wg->executors()->scan_executor() : wg->executors()->connector_scan_executor(); + } +}; + +struct SyncTaskExecutor { + static Status submit(workgroup::ScanTask task) { + do { + task.run(); + } while (!task.is_finished()); + return Status::OK(); + } + + static void force_submit(workgroup::ScanTask task) { (void)submit(std::move(task)); } +}; + +#define BREAK_IF_YIELD(wg, yield, time_spent_ns) \ + if (time_spent_ns >= workgroup::WorkGroup::YIELD_MAX_TIME_SPENT) { \ + *yield = true; \ + break; \ + } \ + if (wg != nullptr && time_spent_ns >= workgroup::WorkGroup::YIELD_PREEMPT_MAX_TIME_SPENT && \ + wg->scan_sched_entity()->in_queue()->should_yield(wg, time_spent_ns)) { \ + *yield = true; \ + break; \ + } + +#define RETURN_OK_IF_NEED_YIELD(wg, yield, time_spent_ns) \ + if (time_spent_ns >= workgroup::WorkGroup::YIELD_MAX_TIME_SPENT) { \ + *yield = true; \ + return Status::OK(); \ + } \ + if (wg != nullptr && time_spent_ns >= workgroup::WorkGroup::YIELD_PREEMPT_MAX_TIME_SPENT && \ + wg->scan_sched_entity()->in_queue()->should_yield(wg, time_spent_ns)) { \ + *yield = true; \ + return Status::OK(); \ + } +#define RETURN_IF_ERROR_EXCEPT_YIELD(stmt) \ + do { \ + auto&& status__ = (stmt); \ + if (UNLIKELY(!status__.ok() && !status__.is_yield())) { \ + return to_status(status__).clone_and_append_context(__FILE__, __LINE__, AS_STRING(stmt)); \ + } \ + } while (false) + +#define RETURN_IF_YIELD(yield) \ + if (yield) { \ + return Status::OK(); \ + } + +#define DEFER_GUARD_END(guard) auto VARNAME_LINENUM(defer) = DeferOp([&]() { guard.scoped_end(); }); + +#define RESOURCE_TLS_MEMTRACER_GUARD(state, ...) \ + spill::ResourceMemTrackerGuard(tls_mem_tracker, state->query_ctx()->weak_from_this(), ##__VA_ARGS__) + +#define TRACKER_WITH_SPILLER_GUARD(state, spiller) RESOURCE_TLS_MEMTRACER_GUARD(state, spiller->weak_from_this()) + +#define TRACKER_WITH_SPILLER_READER_GUARD(state, spiller) \ + RESOURCE_TLS_MEMTRACER_GUARD(state, spiller->weak_from_this(), std::weak_ptr((spiller)->reader())) + +} // namespace starrocks::spill \ No newline at end of file diff --git a/be/src/exec/spill/spiller.hpp b/be/src/exec/spill/spiller.hpp new file mode 100644 index 0000000000000..ab7695183c4b4 --- /dev/null +++ b/be/src/exec/spill/spiller.hpp @@ -0,0 +1,357 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "column/chunk.h" +#include "column/vectorized_fwd.h" +#include "common/logging.h" +#include "common/status.h" +#include "exec/spill/common.h" +#include "exec/spill/executor.h" +#include "exec/spill/input_stream.h" +#include "exec/spill/serde.h" +#include "exec/spill/spill_components.h" +#include "exec/spill/spiller.h" +#include "exec/workgroup/work_group_fwd.h" +#include "storage/chunk_helper.h" +#include "util/defer_op.h" +#include "util/runtime_profile.h" + +namespace starrocks::spill { +template +Status Spiller::spill(RuntimeState* state, const ChunkPtr& chunk, MemGuard&& guard) { + SCOPED_TIMER(_metrics.append_data_timer); + RETURN_IF_ERROR(task_status()); + DCHECK(!chunk->is_empty()); + DCHECK(!is_full()); + + COUNTER_UPDATE(_metrics.spill_rows, chunk->num_rows()); + _spilled_append_rows += chunk->num_rows(); + TRACE_SPILL_LOG << "spilled rows:" << chunk->num_rows() << ",cumulative:" << _spilled_append_rows + << ",spiller:" << this; + + if (_chunk_builder.chunk_schema()->empty()) { + _chunk_builder.chunk_schema()->set_schema(chunk); + RETURN_IF_ERROR(_serde->prepare()); + _init_max_block_nums(); + } + + if (_opts.init_partition_nums > 0) { + return _writer->as()->spill(state, chunk, guard); + } else { + return _writer->as()->spill(state, chunk, guard); + } +} + +template +Status Spiller::partitioned_spill(RuntimeState* state, const ChunkPtr& chunk, SpillHashColumn* hash_column, + Processer&& processer, MemGuard&& guard) { + SCOPED_TIMER(_metrics.append_data_timer); + RETURN_IF_ERROR(task_status()); + DCHECK(!chunk->is_empty()); + COUNTER_UPDATE(_metrics.spill_rows, chunk->num_rows()); + DCHECK_GT(_opts.init_partition_nums, 0); + + if (_chunk_builder.chunk_schema()->empty()) { + _chunk_builder.chunk_schema()->set_schema(chunk); + RETURN_IF_ERROR(_serde->prepare()); + _init_max_block_nums(); + } + + std::vector indexs; + auto writer = _writer->as(); + { + SCOPED_TIMER(_metrics.shuffle_timer); + writer->shuffle(indexs, hash_column); + writer->process_partition_data(chunk, indexs, std::forward(processer)); + } + COUNTER_SET(_metrics.partition_writer_peak_memory_usage, writer->mem_consumption()); + RETURN_IF_ERROR(writer->flush_if_full(state, guard)); + return Status::OK(); +} + +template +Status Spiller::flush(RuntimeState* state, MemGuard&& guard) { + RETURN_IF_ERROR(task_status()); + if (_opts.init_partition_nums > 0) { + return _writer->as()->flush(state, true, guard); + } else { + return _writer->as()->flush(state, guard); + } +} + +template +StatusOr Spiller::restore(RuntimeState* state, MemGuard&& guard) { + RETURN_IF_ERROR(task_status()); + + ASSIGN_OR_RETURN(auto chunk, _reader->restore(state, guard)); + chunk->check_or_die(); + _restore_read_rows += chunk->num_rows(); + + RETURN_IF_ERROR(trigger_restore(state, std::forward(guard))); + return chunk; +} + +template +Status Spiller::trigger_restore(RuntimeState* state, MemGuard&& guard) { + return _reader->trigger_restore(state, guard); +} + +template +Status RawSpillerWriter::spill(RuntimeState* state, const ChunkPtr& chunk, MemGuard&& guard) { + if (_mem_table == nullptr) { + _mem_table = _acquire_mem_table_from_pool(); + DCHECK(_mem_table != nullptr); + } + + RETURN_IF_ERROR(_mem_table->append(chunk)); + + if (_mem_table->is_full()) { + return flush(state, std::forward(guard)); + } + + return Status::OK(); +} + +template +Status RawSpillerWriter::flush(RuntimeState* state, MemGuard&& guard) { + MemTablePtr captured_mem_table; + { + std::lock_guard l(_mutex); + captured_mem_table = std::move(_mem_table); + } + auto defer = DeferOp([&]() { + if (captured_mem_table) { + std::lock_guard _(_mutex); + _mem_table_pool.emplace(std::move(captured_mem_table)); + } + }); + + if (captured_mem_table == nullptr) { + return Status::OK(); + } + RETURN_IF_ERROR(captured_mem_table->done()); + + _running_flush_tasks++; + auto task = [this, state, guard = guard, mem_table = std::move(captured_mem_table), + trace = TraceInfo(state)](auto& yield_ctx) { + SCOPED_SET_TRACE_INFO({}, trace.query_id, trace.fragment_id); + RETURN_IF(!guard.scoped_begin(), Status::Cancelled("cancelled")); + DEFER_GUARD_END(guard); + SCOPED_TIMER(_spiller->metrics().flush_timer); + DCHECK_GT(_running_flush_tasks, 0); + DCHECK(has_pending_data()); + // + if (!yield_ctx.task_context_data.has_value()) { + yield_ctx.task_context_data = SpillIOTaskContextPtr(std::make_shared()); + } + auto defer = CancelableDefer([&]() { + { + std::lock_guard _(_mutex); + _mem_table_pool.emplace(std::move(mem_table)); + } + _spiller->update_spilled_task_status(_decrease_running_flush_tasks()); + yield_ctx.set_finished(); + }); + + if (_spiller->is_cancel() || !_spiller->task_status().ok()) { + return Status::OK(); + } + + yield_ctx.time_spent_ns = 0; + yield_ctx.need_yield = false; + + _spiller->update_spilled_task_status(yieldable_flush_task(yield_ctx, state, mem_table)); + if (yield_ctx.need_yield && !yield_ctx.is_finished()) { + COUNTER_UPDATE(_spiller->metrics().flush_task_yield_times, 1); + defer.cancel(); + } + + return Status::OK(); + }; + + auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); }; + auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func)); + RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task))); + COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1); + COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks); + return Status::OK(); +} + +template +StatusOr SpillerReader::restore(RuntimeState* state, MemGuard&& guard) { + SCOPED_TIMER(_spiller->metrics().restore_from_buffer_timer); + workgroup::YieldContext mock_ctx; + ASSIGN_OR_RETURN(auto chunk, _stream->get_next(mock_ctx, _spill_read_ctx)); + RETURN_IF_ERROR(trigger_restore(state, std::forward(guard))); + _read_rows += chunk->num_rows(); + COUNTER_UPDATE(_spiller->metrics().restore_rows, chunk->num_rows()); + TRACE_SPILL_LOG << "restore rows: " << chunk->num_rows() << ", total restored: " << _read_rows << ", " << this; + return chunk; +} + +template +Status SpillerReader::trigger_restore(RuntimeState* state, MemGuard&& guard) { + if (_stream == nullptr) { + return Status::OK(); + } + // if all is well and input stream enable prefetch and not eof + if (!_stream->eof()) { + // make sure _running_restore_tasks < io_tasks_per_scan_operator to avoid scan overloaded + if (_stream->is_ready() && _running_restore_tasks >= config::io_tasks_per_scan_operator) { + return Status::OK(); + } + _running_restore_tasks++; + auto restore_task = [this, guard, trace = TraceInfo(state), _stream = _stream](auto& yield_ctx) { + SCOPED_SET_TRACE_INFO({}, trace.query_id, trace.fragment_id); + RETURN_IF(!guard.scoped_begin(), (void)0); + DEFER_GUARD_END(guard); + { + auto defer = CancelableDefer([&]() { + _running_restore_tasks--; + yield_ctx.set_finished(); + }); + Status res; + SerdeContext serd_ctx; + if (!yield_ctx.task_context_data.has_value()) { + yield_ctx.task_context_data = std::make_shared(); + } + + auto ctx = std::any_cast(yield_ctx.task_context_data); + yield_ctx.time_spent_ns = 0; + yield_ctx.need_yield = false; + + YieldableRestoreTask task(_stream); + res = task.do_read(yield_ctx, serd_ctx); + + if (yield_ctx.need_yield && !yield_ctx.is_finished()) { + COUNTER_UPDATE(_spiller->metrics().restore_task_yield_times, 1); + defer.cancel(); + } + + if (!res.is_ok_or_eof()) { + _spiller->update_spilled_task_status(std::move(res)); + } + _finished_restore_tasks += !res.ok(); + }; + }; + auto yield_func = [&](workgroup::ScanTask&& task) { + auto ctx = std::any_cast(task.get_work_context().task_context_data); + TaskExecutor::force_submit(std::move(task)); + }; + auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(restore_task), std::move(yield_func)); + RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task))); + COUNTER_UPDATE(_spiller->metrics().restore_io_task_count, 1); + COUNTER_SET(_spiller->metrics().peak_restore_io_task_count, _running_restore_tasks); + } + return Status::OK(); +} + +template +Status PartitionedSpillerWriter::spill(RuntimeState* state, const ChunkPtr& chunk, MemGuard&& guard) { + DCHECK(!chunk->is_empty()); + DCHECK(!is_full()); + + // the last column was hash column + auto hash_column = chunk->columns().back(); + + { + SCOPED_TIMER(_spiller->metrics().shuffle_timer); + std::vector shuffle_result; + shuffle(shuffle_result, down_cast(hash_column.get())); + process_partition_data(chunk, shuffle_result, + [&chunk](SpilledPartition* partition, const std::vector& selection, + int32_t from, int32_t size) { + auto mem_table = partition->spill_writer->mem_table(); + (void)mem_table->append_selective(*chunk, selection.data(), from, size); + partition->mem_size = mem_table->mem_usage(); + partition->num_rows += size; + }); + } + + DCHECK_EQ(_spiller->spilled_append_rows(), _partition_rows()); + + RETURN_IF_ERROR(flush_if_full(state, guard)); + + return Status::OK(); +} + +template +Status PartitionedSpillerWriter::flush_if_full(RuntimeState* state, MemGuard&& guard) { + if (_mem_tracker->consumption() > options().spill_mem_table_bytes_size) { + return flush(state, false, guard); + } + return Status::OK(); +} + +template +Status PartitionedSpillerWriter::flush(RuntimeState* state, bool is_final_flush, MemGuard&& guard) { + std::vector splitting_partitions, spilling_partitions; + RETURN_IF_ERROR(_choose_partitions_to_flush(is_final_flush, splitting_partitions, spilling_partitions)); + + if (spilling_partitions.empty() && splitting_partitions.empty()) { + return Status::OK(); + } + + if (is_final_flush && _running_flush_tasks > 0) { + _need_final_flush = true; + return Status::OK(); + } + DCHECK_EQ(_running_flush_tasks, 0); + _running_flush_tasks++; + + auto task = [this, guard = guard, splitting_partitions = std::move(splitting_partitions), + spilling_partitions = std::move(spilling_partitions), trace = TraceInfo(state)](auto& yield_ctx) { + SCOPED_SET_TRACE_INFO({}, trace.query_id, trace.fragment_id); + RETURN_IF(!guard.scoped_begin(), Status::Cancelled("cancelled")); + DEFER_GUARD_END(guard); + // concurrency test + RACE_DETECT(detect_flush); + auto defer = CancelableDefer([&]() { + _spiller->update_spilled_task_status(_decrease_running_flush_tasks()); + yield_ctx.set_finished(); + }); + + if (_spiller->is_cancel() || !_spiller->task_status().ok()) { + return Status::OK(); + } + yield_ctx.time_spent_ns = 0; + yield_ctx.need_yield = false; + if (!yield_ctx.task_context_data.has_value()) { + yield_ctx.task_context_data = SpillIOTaskContextPtr(std::make_shared()); + } + _spiller->update_spilled_task_status( + yieldable_flush_task(yield_ctx, splitting_partitions, spilling_partitions)); + + if (yield_ctx.need_yield && !yield_ctx.is_finished()) { + COUNTER_UPDATE(_spiller->metrics().flush_task_yield_times, 1); + defer.cancel(); + } + return Status::OK(); + }; + auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); }; + auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func)); + RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task))); + COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1); + COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks); + + return Status::OK(); +} + +} // namespace starrocks::spill \ No newline at end of file diff --git a/be/src/exec/workgroup/scan_task_queue.cpp b/be/src/exec/workgroup/scan_task_queue.cpp index aa39e91a5726b..bd066882feb40 100644 --- a/be/src/exec/workgroup/scan_task_queue.cpp +++ b/be/src/exec/workgroup/scan_task_queue.cpp @@ -85,7 +85,15 @@ StatusOr WorkGroupScanTaskQueue::take() { bool WorkGroupScanTaskQueue::try_offer(ScanTask task) { std::lock_guard lock(_global_mutex); +<<<<<<< HEAD auto* wg_entity = _sched_entity(task.workgroup); +======= + if (task.peak_scan_task_queue_size_counter != nullptr) { + task.peak_scan_task_queue_size_counter->set(_num_tasks); + } + + auto* wg_entity = _sched_entity(task.workgroup.get()); +>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121)) wg_entity->set_in_queue(this); RETURN_IF_UNLIKELY(!wg_entity->queue()->try_offer(std::move(task)), false); @@ -101,6 +109,28 @@ bool WorkGroupScanTaskQueue::try_offer(ScanTask task) { void WorkGroupScanTaskQueue::update_statistics(WorkGroup* wg, int64_t runtime_ns) { std::lock_guard lock(_global_mutex); +<<<<<<< HEAD +======= + if (task.peak_scan_task_queue_size_counter != nullptr) { + task.peak_scan_task_queue_size_counter->set(_num_tasks); + } + + auto* wg_entity = _sched_entity(task.workgroup.get()); + wg_entity->set_in_queue(this); + wg_entity->queue()->force_put(std::move(task)); + + if (_wg_entities.find(wg_entity) == _wg_entities.end()) { + _enqueue_workgroup(wg_entity); + } + + _num_tasks++; + _cv.notify_one(); +} + +void WorkGroupScanTaskQueue::update_statistics(ScanTask& task, int64_t runtime_ns) { + std::lock_guard lock(_global_mutex); + auto* wg = task.workgroup.get(); +>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121)) auto* wg_entity = _sched_entity(wg); // Update bandwidth control information. diff --git a/be/src/exec/workgroup/scan_task_queue.h b/be/src/exec/workgroup/scan_task_queue.h index b9bb5b6b8b2d9..744ec58e58df7 100644 --- a/be/src/exec/workgroup/scan_task_queue.h +++ b/be/src/exec/workgroup/scan_task_queue.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "common/statusor.h" #include "exec/workgroup/work_group_fwd.h" @@ -20,9 +21,19 @@ struct ScanTask { using WorkFunction = std::function; ScanTask() : ScanTask(nullptr, nullptr) {} +<<<<<<< HEAD explicit ScanTask(WorkFunction work_function) : workgroup(nullptr), work_function(std::move(work_function)) {} ScanTask(WorkGroup* workgroup, WorkFunction work_function) : workgroup(workgroup), work_function(std::move(work_function)) {} +======= + explicit ScanTask(WorkFunction work_function) : ScanTask(nullptr, std::move(work_function)) {} + ScanTask(WorkGroupPtr workgroup, WorkFunction work_function) + : workgroup(std::move(workgroup)), work_function(std::move(work_function)) {} + ScanTask(WorkGroupPtr workgroup, WorkFunction work_function, YieldFunction yield_function) + : workgroup(std::move(workgroup)), + work_function(std::move(work_function)), + yield_function(std::move(yield_function)) {} +>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121)) ~ScanTask() = default; DISALLOW_COPY(ScanTask); @@ -37,7 +48,12 @@ struct ScanTask { } public: +<<<<<<< HEAD WorkGroup* workgroup; +======= + WorkGroupPtr workgroup; + YieldContext work_context; +>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121)) WorkFunction work_function; int priority = 0; }; diff --git a/be/src/udf/java/utils.cpp b/be/src/udf/java/utils.cpp index 0fe3c80b0bea9..9e3dc983a8017 100644 --- a/be/src/udf/java/utils.cpp +++ b/be/src/udf/java/utils.cpp @@ -37,8 +37,14 @@ PromiseStatusPtr call_function_in_pthread(RuntimeState* state, const std::functi PromiseStatusPtr call_hdfs_scan_function_in_pthread(const std::function& func) { PromiseStatusPtr ms = std::make_unique(); if (bthread_self()) { +<<<<<<< HEAD ExecEnv::GetInstance()->connector_scan_executor_without_workgroup()->submit( workgroup::ScanTask([promise = ms.get(), func]() { promise->set_value(func()); })); +======= + ExecEnv::GetInstance()->connector_scan_executor()->submit(workgroup::ScanTask( + ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup(), + [promise = ms.get(), func](workgroup::YieldContext&) { promise->set_value(func()); })); +>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121)) } else { ms->set_value(func()); }