Skip to content

Commit

Permalink
feat: tweak auto compaction strategy (databendlabs#13577)
Browse files Browse the repository at this point in the history
* Add auto_compaction_threshold setting

* fix test

* make lint

* resolve conflict

---------

Co-authored-by: dantengsky <[email protected]>
  • Loading branch information
zhyass and dantengsky authored Dec 20, 2023
1 parent a4e5868 commit 4dc132f
Show file tree
Hide file tree
Showing 17 changed files with 131 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ pub trait TableContext: Send + Sync {
fn set_cacheable(&self, cacheable: bool);
fn get_can_scan_from_agg_index(&self) -> bool;
fn set_can_scan_from_agg_index(&self, enable: bool);
fn set_auto_compact_after_write(&self, enable: bool);
fn get_auto_compact_after_write(&self) -> bool;

fn attach_query_str(&self, kind: QueryKind, query: String);
fn get_query_str(&self) -> String;
Expand Down
7 changes: 3 additions & 4 deletions src/query/ee/tests/it/aggregating_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> {

async fn test_sync_agg_index_after_copy_into() -> Result<()> {
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;
fixture
.default_session()
.get_settings()
.set_enable_refresh_aggregating_index_after_write(true)?;
let settings = fixture.default_session().get_settings();
settings.set_enable_refresh_aggregating_index_after_write(true)?;
settings.set_auto_compaction_threshold(1)?;

// Create table
fixture.execute_command(
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ async fn do_hook_compact(

if !pipeline.is_empty() && ctx.get_settings().get_enable_recluster_after_write()? {
pipeline.set_on_finished(move |err| {
if !ctx.get_auto_compact_after_write() {
return Ok(());
}

let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
Expand Down
51 changes: 51 additions & 0 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;

use databend_common_catalog::table::AppendMode;
use databend_common_catalog::table::TableExt;
Expand All @@ -32,7 +33,10 @@ use databend_common_sql::NameResolutionContext;

use crate::interpreters::common::build_update_stream_meta_seq;
use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::hook::hook_compact;
use crate::interpreters::hook::hook_refresh;
use crate::interpreters::hook::CompactHookTraceCtx;
use crate::interpreters::hook::CompactTargetTableDescription;
use crate::interpreters::hook::RefreshDesc;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
Expand Down Expand Up @@ -94,6 +98,7 @@ impl Interpreter for InsertInterpreter {

let mut build_res = PipelineBuildResult::create();

let start = Instant::now();
match &self.plan.source {
InsertInputSource::Stage(_) => {
unreachable!()
Expand Down Expand Up @@ -247,6 +252,29 @@ impl Interpreter for InsertInterpreter {
None,
)?;

// Compact if 'enable_recluster_after_write' on.
{
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
table: self.plan.table.clone(),
};

let trace_ctx = CompactHookTraceCtx {
start,
operation_name: "insert_into_table".to_owned(),
};

hook_compact(
self.ctx.clone(),
&mut build_res.main_pipeline,
compact_target,
trace_ctx,
true,
)
.await;
}

let refresh_desc = RefreshDesc {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
Expand Down Expand Up @@ -276,6 +304,29 @@ impl Interpreter for InsertInterpreter {
append_mode,
)?;

// Compact if 'enable_recluster_after_write' on.
{
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
table: self.plan.table.clone(),
};

let trace_ctx = CompactHookTraceCtx {
start,
operation_name: "insert_into_table".to_owned(),
};

hook_compact(
self.ctx.clone(),
&mut build_res.main_pipeline,
compact_target,
trace_ctx,
true,
)
.await;
}

let refresh_desc = RefreshDesc {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,16 @@ impl TableContext for QueryContext {
.store(enable, Ordering::Release);
}

fn get_auto_compact_after_write(&self) -> bool {
self.shared.auto_compact_after_write.load(Ordering::Acquire)
}

fn set_auto_compact_after_write(&self, enable: bool) {
self.shared
.auto_compact_after_write
.store(enable, Ordering::Release);
}

fn attach_query_str(&self, kind: QueryKind, query: String) {
self.shared.attach_query_str(kind, query);
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) partitions_shas: Arc<RwLock<Vec<String>>>,
pub(in crate::sessions) cacheable: Arc<AtomicBool>,
pub(in crate::sessions) can_scan_from_agg_index: Arc<AtomicBool>,
pub(in crate::sessions) auto_compact_after_write: Arc<AtomicBool>,
// Status info.
pub(in crate::sessions) status: Arc<RwLock<String>>,

Expand Down Expand Up @@ -148,6 +149,7 @@ impl QueryContextShared {
partitions_shas: Arc::new(RwLock::new(vec![])),
cacheable: Arc::new(AtomicBool::new(true)),
can_scan_from_agg_index: Arc::new(AtomicBool::new(true)),
auto_compact_after_write: Arc::new(AtomicBool::new(true)),
status: Arc::new(RwLock::new("null".to_string())),
user_agent: Arc::new(RwLock::new("null".to_string())),
materialized_cte_tables: Arc::new(Default::default()),
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,14 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_auto_compact_after_write(&self, _enable: bool) {
todo!()
}

fn get_auto_compact_after_write(&self) -> bool {
todo!()
}

fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> {
todo!()
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,14 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_auto_compact_after_write(&self, _enable: bool) {
todo!()
}

fn get_auto_compact_after_write(&self) -> bool {
todo!()
}

fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> {
todo!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'acquire_lock_timeout' | '15' | '15' | 'None' | 'SESSION' | 'Sets the maximum timeout in seconds for acquire a lock.' | 'UInt64' |
| 'aggregate_spilling_bytes_threshold_per_proc' | '0' | '0' | 'None' | 'SESSION' | 'Sets the maximum amount of memory in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' |
| 'aggregate_spilling_memory_ratio' | '0' | '0' | 'None' | 'SESSION' | 'Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' |
| 'auto_compaction_threshold' | '1000' | '1000' | 'None' | 'SESSION' | 'Threshold for triggering auto compaction after write(copy/insert).' | 'UInt64' |
| 'collation' | 'binary' | 'binary' | '["binary", "utf8"]' | 'SESSION' | 'Sets the character collation. Available values include "binary" and "utf8".' | 'String' |
| 'create_query_flight_client_with_current_rt' | '1' | '1' | 'None' | 'SESSION' | 'create query flight client with current runtime' | 'UInt64' |
| 'ddl_column_type_nullable' | '1' | '1' | 'None' | 'SESSION' | 'If columns are default nullable when create or alter table' | 'UInt64' |
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: None,
}),
("auto_compaction_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(1000),
desc: "Threshold for triggering auto compaction after write(copy/insert).",
mode: SettingMode::Both,
range: None,
}),
("use_parquet2", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Use parquet2 instead of parquet_rs when infer_schema().",
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ impl Settings {
Ok(self.try_get_u64("enable_recluster_after_write")? != 0)
}

pub fn get_auto_compaction_threshold(&self) -> Result<u64> {
self.try_get_u64("auto_compaction_threshold")
}

pub fn set_auto_compaction_threshold(&self, val: u64) -> Result<()> {
self.try_set_u64("auto_compaction_threshold", val)
}

pub fn get_use_parquet2(&self) -> Result<bool> {
Ok(self.try_get_u64("use_parquet2")? != 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ impl SnapshotGenerator for AppendGenerator {
}
}

let imperfect_count = new_summary.block_count - new_summary.perfect_block_count;
let auto_compaction_threshold = self.ctx.get_settings().get_auto_compaction_threshold()?;
let auto_compact = imperfect_count >= auto_compaction_threshold;
self.ctx.set_auto_compact_after_write(auto_compact);

Ok(TableSnapshot::new(
Uuid::new_v4(),
&prev_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ SELECT max(b), count(1) from t3;
----
2021-09-07 21:38:35.000000 3002

query II
SELECT segment_count, block_count, row_count from FUSE_SNAPSHOT('db1', 't3') limit 1;
----
4 4 3002

statement ok
SET auto_compaction_threshold = 3;

statement ok
create table t4(number String);

Expand All @@ -94,6 +102,14 @@ select * from t4 where number = '15' limit 1;
----
15

query II
SELECT segment_count, block_count, row_count from FUSE_SNAPSHOT('db1', 't4') limit 2;
----
1 1 300
3 3 300

statement ok
SET auto_compaction_threshold = 1000;

statement ok
DROP TABLE t1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,10 @@ statement ok
copy into t_query2 from (select $1,$2,$3 from @st_query2 as t2) force = true purge = true;


# table will be auto compacted after copy into, the volume of data is small, so by default setting, it will be compacted to 1 block.
query I
select block_count from fuse_snapshot('default','t_query2') limit 1;
----
1
2

#test cluster key
statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ set enable_distributed_copy_into = 1;
statement ok
set global max_threads = 1;

statement ok
set auto_compaction_threshold = 3;

statement ok
drop table if exists products;

Expand Down Expand Up @@ -50,3 +53,6 @@ select block_count from fuse_snapshot('default','products') limit 1;

statement ok
set enable_distributed_copy_into = 0;

statement ok
set auto_compaction_threshold = 1000;
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
---row_count
1000
---block_count
1
2
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ echo "copy /*+ set_var(input_read_buffer_size=100) set_var(max_threads=1) */ in
echo "---row_count"
echo "select count(*) from t1" | $BENDSQL_CLIENT_CONNECT

# table will be auto compacted after copy into, thus we need to limit the result of fuse_snapshot to 1
echo "---block_count"
echo "select block_count from fuse_snapshot('default','t1') limit 1" | $BENDSQL_CLIENT_CONNECT

Expand Down

0 comments on commit 4dc132f

Please sign in to comment.