Skip to content

Commit

Permalink
chore: simplify txn-op (databendlabs#14048)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer authored Dec 16, 2023
1 parent caad7d2 commit 188426e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 111 deletions.
11 changes: 1 addition & 10 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnGetRequest;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnPutRequest;
use databend_common_meta_types::TxnRequest;
use futures::TryStreamExt;
use log::as_debug;
Expand Down Expand Up @@ -4176,15 +4175,7 @@ fn build_upsert_table_copied_file_info_conditions(

fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
let expire_at = Some(SeqV::<()>::now_ms() / 1000 + 24 * 60 * 60);
TxnOp {
request: Some(Request::Put(TxnPutRequest {
key: deduplicated_label,
value: 1_i8.to_le_bytes().to_vec(),
prev_value: false,
expire_at,
ttl_ms: None,
})),
}
TxnOp::put_with_expire(deduplicated_label, 1_i8.to_le_bytes().to_vec(), expire_at)
}

fn set_update_expire_operation(
Expand Down
31 changes: 3 additions & 28 deletions src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::txn_condition::Target;
use databend_common_meta_types::txn_op::Request;
use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::InvalidArgument;
use databend_common_meta_types::InvalidReply;
Expand All @@ -62,10 +61,8 @@ use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaNetworkError;
use databend_common_meta_types::Operation;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnDeleteRequest;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnOpResponse;
use databend_common_meta_types::TxnPutRequest;
use databend_common_meta_types::TxnRequest;
use databend_common_proto_conv::FromToProto;
use enumflags2::BitFlags;
Expand Down Expand Up @@ -326,39 +323,17 @@ pub fn txn_cond_seq(key: &impl kvapi::Key, op: ConditionResult, seq: u64) -> Txn

/// Build a txn operation that puts a record.
pub fn txn_op_put(key: &impl kvapi::Key, value: Vec<u8>) -> TxnOp {
TxnOp {
request: Some(Request::Put(TxnPutRequest {
key: key.to_string_key(),
value,
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}
TxnOp::put(key.to_string_key(), value)
}

// TODO: replace it with common_meta_types::with::With
pub fn txn_op_put_with_expire(key: &impl kvapi::Key, value: Vec<u8>, expire_at: u64) -> TxnOp {
TxnOp {
request: Some(Request::Put(TxnPutRequest {
key: key.to_string_key(),
value,
prev_value: true,
expire_at: Some(expire_at),
ttl_ms: None,
})),
}
TxnOp::put_with_expire(key.to_string_key(), value, Some(expire_at))
}

/// Build a txn operation that deletes a record.
pub fn txn_op_del(key: &impl kvapi::Key) -> TxnOp {
TxnOp {
request: Some(Request::Delete(TxnDeleteRequest {
key: key.to_string_key(),
prev_value: true,
match_seq: None,
})),
}
TxnOp::delete(key.to_string_key())
}

/// Return OK if a db_id or db_meta exists by checking the seq.
Expand Down
3 changes: 3 additions & 0 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// - 2023-10-20: since 1.2.176:
/// Meta client: call stream api: kv_read_v1(), revert to 1.1.32 if server < 1.2.163
///
/// - 2023-12-16: since TODO:
/// Meta service: add: ttl to TxnPutRequest and Upsert
///
/// Server feature set:
/// ```yaml
/// server_features:
Expand Down
61 changes: 6 additions & 55 deletions src/meta/kvapi/src/kvapi/test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use databend_common_meta_types::TxnGetRequest;
use databend_common_meta_types::TxnGetResponse;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnOpResponse;
use databend_common_meta_types::TxnPutRequest;
use databend_common_meta_types::TxnPutResponse;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
Expand Down Expand Up @@ -505,15 +504,7 @@ impl kvapi::TestSuite {
target: Some(txn_condition::Target::Seq(0)),
}];

let if_then: Vec<TxnOp> = vec![TxnOp {
request: Some(txn_op::Request::Put(TxnPutRequest {
key: txn_key.clone(),
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];
let if_then: Vec<TxnOp> = vec![TxnOp::put(txn_key.clone(), b("new_v1"))];

let else_then: Vec<TxnOp> = vec![];

Expand Down Expand Up @@ -667,15 +658,7 @@ impl kvapi::TestSuite {
target: Some(txn_condition::Target::Seq(0)),
}];

let if_then: Vec<TxnOp> = vec![TxnOp {
request: Some(txn_op::Request::Put(TxnPutRequest {
key: txn_key.clone(),
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];
let if_then: Vec<TxnOp> = vec![TxnOp::put(txn_key.clone(), b("new_v1"))];

let else_then: Vec<TxnOp> = vec![];
let txn = TxnRequest {
Expand Down Expand Up @@ -720,15 +703,7 @@ impl kvapi::TestSuite {
},
];

let if_then: Vec<TxnOp> = vec![TxnOp {
request: Some(txn_op::Request::Put(TxnPutRequest {
key: txn_key1.clone(),
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];
let if_then: Vec<TxnOp> = vec![TxnOp::put(txn_key1.clone(), b("new_v1"))];

let else_then: Vec<TxnOp> = vec![];
let txn = TxnRequest {
Expand Down Expand Up @@ -776,25 +751,9 @@ impl kvapi::TestSuite {

let if_then: Vec<TxnOp> = vec![
// change k1
TxnOp {
request: Some(txn_op::Request::Put(TxnPutRequest {
key: txn_key1.clone(),
value: val1_new.to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
TxnOp::put(txn_key1.clone(), val1_new.clone()),
// change k2
TxnOp {
request: Some(txn_op::Request::Put(TxnPutRequest {
key: txn_key2.clone(),
value: b"new_v2".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
TxnOp::put(txn_key2.clone(), b("new_v2")),
// get k1
TxnOp {
request: Some(txn_op::Request::Get(TxnGetRequest {
Expand Down Expand Up @@ -896,15 +855,7 @@ impl kvapi::TestSuite {

let if_then: Vec<TxnOp> = vec![
// change k1
TxnOp {
request: Some(txn_op::Request::Put(TxnPutRequest {
key: txn_key1.clone(),
value: val1_new.to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
TxnOp::put(txn_key1.clone(), val1_new.clone()),
// get k1
TxnOp {
request: Some(txn_op::Request::Get(TxnGetRequest {
Expand Down
20 changes: 2 additions & 18 deletions src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::Operation;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnDeleteByPrefixRequest;
use databend_common_meta_types::TxnDeleteRequest;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnPutRequest;
use databend_meta::meta_service::MetaNode;
use log::info;
use test_harness::test;
Expand Down Expand Up @@ -263,22 +261,8 @@ async fn test_watch() -> anyhow::Result<()> {
}];

let if_then: Vec<TxnOp> = vec![
TxnOp {
request: Some(txn_op::Request::Put(TxnPutRequest {
key: txn_key.clone(),
value: txn_val.clone(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
TxnOp {
request: Some(txn_op::Request::Delete(TxnDeleteRequest {
key: delete_key.to_string(),
prev_value: true,
match_seq: None,
})),
},
TxnOp::put(txn_key.clone(), txn_val.clone()),
TxnOp::delete(delete_key),
TxnOp {
request: Some(txn_op::Request::DeleteByPrefix(TxnDeleteByPrefixRequest {
prefix: watch_delete_by_prefix_key.to_string(),
Expand Down

0 comments on commit 188426e

Please sign in to comment.