Skip to content

Commit

Permalink
increase buffer size
Browse files Browse the repository at this point in the history
  • Loading branch information
murphyatwork committed Dec 20, 2024
1 parent 8085929 commit 9ec48ae
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
4 changes: 3 additions & 1 deletion be/src/exec/chunks_sorter_full_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ Status ChunksSorterFullSort::_partial_sort(RuntimeState* state, bool done) {
if (!_staging_unsorted_rows) {
return Status::OK();
}
bool reach_limit = _staging_unsorted_rows >= max_buffered_rows || _staging_unsorted_bytes >= max_buffered_bytes;
bool reach_limit = _staging_unsorted_rows >= kDefaultMinBufferRows &&
(_staging_unsorted_rows >= max_buffered_rows || _staging_unsorted_bytes >= max_buffered_bytes);
if (done || reach_limit) {
_max_num_rows = std::max<int>(_max_num_rows, _staging_unsorted_rows);
_profiler->input_required_memory->update(_staging_unsorted_bytes);
Expand Down Expand Up @@ -147,6 +148,7 @@ Status ChunksSorterFullSort::_partial_sort(RuntimeState* state, bool done) {
Status ChunksSorterFullSort::_merge_sorted(RuntimeState* state) {
SCOPED_TIMER(_merge_timer);
_profiler->num_sorted_runs->set((int64_t)_sorted_chunks.size());
// TODO: introduce an extra merge before cascading merge to handle the case that has a lot of sortruns
// In cascading merging phase, the height of merging tree is ceiling(log2(num_sorted_runs)) + 1,
// so when num_sorted_runs is 1 or 2, the height merging tree is less than 2, the sorted runs just be processed
// in at most one pass. there is no need to enable lazy materialization which eliminates non-order-by output
Expand Down
8 changes: 6 additions & 2 deletions be/src/exec/chunks_sorter_full_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ struct ChunksSorterFullSortProfiler {
};
class ChunksSorterFullSort : public ChunksSorter {
public:
static constexpr size_t kDefaultMaxBufferRows = 2 << 20; // 2097152 rows
static constexpr size_t kDefaultMinBufferRows = 1 << 20; // 1048576 rows
static constexpr size_t kDefaultMaxBufferBytes = 16 << 20; // 16MB

/**
* Constructor.
* @param sort_exprs The order-by columns or columns with expression. This sorter will use but not own the object.
Expand Down Expand Up @@ -91,8 +95,8 @@ class ChunksSorterFullSort : public ChunksSorter {

// Parameters to control the buffering behavior: buffering some chunks before partial-sort to reduce memory random access
// TODO: further tunning the buffer parameter
const size_t max_buffered_rows; // Max buffer 1024000 rows
const size_t max_buffered_bytes; // Max buffer 16MB bytes
const size_t max_buffered_rows; // Max buffer 2097152 rows
const size_t max_buffered_bytes; // Max buffer 16MB
std::set<SlotId> _sort_slots; // Slots participating in the sorting procedure

// only when order-by columns(_sort_exprs) are all ColumnRefs and the cost of eager-materialization of
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/topn_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,10 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> TopNNode::_decompose_to_

OperatorFactoryPtr sink_operator;

int64_t max_buffered_rows = 1024000;
int64_t max_buffered_bytes = 16 * 1024 * 1024;
int64_t max_buffered_rows = ChunksSorterFullSort::kDefaultMaxBufferRows;
int64_t max_buffered_bytes =
std::max<int64_t>(ChunksSorterFullSort::kDefaultMaxBufferBytes, CpuInfo::get_l3_cache_size());

if (_tnode.sort_node.__isset.max_buffered_bytes) {
max_buffered_rows = _tnode.sort_node.max_buffered_rows;
max_buffered_bytes = _tnode.sort_node.max_buffered_bytes;
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/cpu_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class CpuInfo {
return cache_sizes;
}

static long get_l3_cache_size() {
auto& cache_sizes = get_cache_sizes();
return cache_sizes[CacheLevel::L3_CACHE] ? cache_sizes[CacheLevel::L3_CACHE]
: cache_sizes[CacheLevel::L2_CACHE];
}

static std::vector<size_t> get_core_ids();

static bool is_cgroup_with_cpuset() { return is_cgroup_with_cpuset_; }
Expand Down

0 comments on commit 9ec48ae

Please sign in to comment.