Skip to content

Commit

Permalink
chore: simplify create_data_mask (#16309)
Browse files Browse the repository at this point in the history
* chore: move types in mod seqv into separate files

* refactor: get_background_job() returns SeqV<T>

This way the caller will be able to update the record against the exact
seq number.

* refactor: add kvapi::Pair

* chore: simplify create_data_mask
  • Loading branch information
drmingdrmer committed Aug 22, 2024
1 parent 4a6ac70 commit 04a4929
Show file tree
Hide file tree
Showing 89 changed files with 357 additions and 291 deletions.
26 changes: 12 additions & 14 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,20 @@ use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeq::Any;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::With;
use fastrace::func_name;
use log::debug;

use crate::background_api::BackgroundApi;
use crate::deserialize_struct;
use crate::fetch_id;
use crate::get_pb_value;
use crate::get_u64_value;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
Expand Down Expand Up @@ -200,10 +197,10 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {

let name_key = &req.name;

let (id_ident, _, job) =
let (id_ident, seq_joq) =
get_background_job_or_error(self, name_key, format!("get_: {:?}", name_key)).await?;

Ok(GetBackgroundJobReply::new(id_ident, job))
Ok(GetBackgroundJobReply::new(id_ident, seq_joq))
}

#[fastrace::trace]
Expand Down Expand Up @@ -315,14 +312,15 @@ async fn get_background_job_or_error(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_ident: &BackgroundJobIdent,
_msg: impl Display,
) -> Result<(BackgroundJobIdIdent, u64, BackgroundJobInfo), KVAppError> {
) -> Result<kvapi::Pair<BackgroundJobIdIdent>, KVAppError> {
let id_ident = get_background_job_id(kv_api, name_ident).await?;

let (id_seq, job_info) = get_pb_value(kv_api, &id_ident).await?;
assert_background_job_exist(id_seq, name_ident)?;
let seq_job = kv_api
.get_pb(&id_ident)
.await?
.ok_or_else(|| unknown_background_job(name_ident))?;

// Safe unwrap(): background_job_seq > 0 implies background_job is not None.
Ok((id_ident, id_seq, job_info.unwrap()))
Ok((id_ident, seq_job))
}

/// Return OK if a db_id or db_meta exists by checking the seq.
Expand Down Expand Up @@ -354,15 +352,15 @@ async fn update_background_job<F: FnOnce(&mut BackgroundJobInfo) -> bool>(
mutation: F,
) -> Result<UpdateBackgroundJobReply, KVAppError> {
debug!(req :? =(name); "BackgroundApi: {}", func_name!());
let (id_ident, id_val_seq, mut info) =
let (id_ident, mut seq_job) =
get_background_job_or_error(kv_api, name, "update_background_job").await?;

let should_update = mutation(&mut info);
let should_update = mutation(&mut seq_job.data);
if !should_update {
return Ok(UpdateBackgroundJobReply::new(id_ident.clone()));
}

let req = UpsertPB::update(id_ident.clone(), info).with(MatchSeq::Exact(id_val_seq));
let req = UpsertPB::update_exact(id_ident.clone(), seq_job);
let resp = kv_api.upsert_pb(&req).await?;

assert!(resp.is_changed());
Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/background_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl BackgroundApiTestSuite {
let res = res.unwrap();
assert_eq!(
BackgroundJobState::RUNNING,
res.info.job_status.unwrap().job_state,
res.info.data.job_status.unwrap().job_state,
"first state is started"
);
}
Expand Down Expand Up @@ -320,7 +320,7 @@ impl BackgroundApiTestSuite {
);
assert_eq!(
Some("newid".to_string()),
res.info.job_status.unwrap().last_task_id
res.info.data.job_status.unwrap().last_task_id
)
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/crud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use databend_common_meta_app::tenant_key::resource::TenantResource;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::ValueWithName;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::With;
use databend_common_proto_conv::FromToProto;
pub use errors::CrudError;
Expand Down
105 changes: 45 additions & 60 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnRequest;
use fastrace::func_name;
use log::debug;
Expand All @@ -54,7 +51,6 @@ use crate::serialize_struct;
use crate::serialize_u64;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::txn_cond_seq;
use crate::txn_op_del;
use crate::txn_op_put;

Expand All @@ -78,8 +74,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
let (seq, id) = get_u64_value(self, name_ident).await?;
debug!(seq = seq, id = id, name_key :? =(name_ident); "create_data_mask");

let mut condition = vec![];
let mut if_then = vec![];
let mut txn = TxnRequest::default();

if seq > 0 {
match req.create_option {
Expand All @@ -99,8 +94,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
false,
false,
func_name!(),
&mut condition,
&mut if_then,
&mut txn,
)
.await?;
}
Expand All @@ -126,20 +120,14 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
{
let meta: DatamaskMeta = req.clone().into();
let id_list = MaskpolicyTableIdList::default();
condition.push(txn_cond_seq(name_ident, Eq, seq));
if_then.extend( vec![
txn.condition.push(txn_cond_eq_seq(name_ident, seq));
txn.if_then.extend( vec![
txn_op_put(name_ident, serialize_u64(id)?), // name -> db_id
txn_op_put(&id_ident, serialize_struct(&meta)?), // id -> meta
txn_op_put(&id_list_key, serialize_struct(&id_list)?), /* data mask name -> id_list */
]);

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;
let (succ, _responses) = send_txn(self, txn).await?;

debug!(
name :? =(name_ident),
Expand All @@ -166,25 +154,19 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
loop {
trials.next().unwrap()?.await;

let mut condition = vec![];
let mut if_then = vec![];
let mut txn = TxnRequest::default();

construct_drop_mask_policy_operations(
self,
name_key,
req.if_exists,
true,
func_name!(),
&mut condition,
&mut if_then,
&mut txn,
)
.await?;
let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let (succ, _responses) = send_txn(self, txn_req).await?;

let (succ, _responses) = send_txn(self, txn).await?;

debug!(
succ = succ;
Expand Down Expand Up @@ -253,35 +235,38 @@ pub fn assert_data_mask_exist(
async fn clear_table_column_mask_policy(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_ident: &DataMaskNameIdent,
condition: &mut Vec<TxnCondition>,
if_then: &mut Vec<TxnOp>,
txn: &mut TxnRequest,
) -> Result<(), KVAppError> {
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());

let (id_list_seq, id_list_opt): (_, Option<MaskpolicyTableIdList>) =
get_pb_value(kv_api, &id_list_key).await?;
if let Some(id_list) = id_list_opt {
condition.push(txn_cond_seq(&id_list_key, Eq, id_list_seq));
if_then.push(txn_op_del(&id_list_key));

// remove mask policy from table meta
for table_id in id_list.id_list.into_iter() {
let tbid = TableId { table_id };

let (tb_meta_seq, table_meta_opt): (_, Option<TableMeta>) =
get_pb_value(kv_api, &tbid).await?;
if let Some(mut table_meta) = table_meta_opt {
if let Some(column_mask_policy) = table_meta.column_mask_policy {
let new_column_mask_policy = column_mask_policy
.into_iter()
.filter(|(_, name)| name != name_ident.name())
.collect();

table_meta.column_mask_policy = Some(new_column_mask_policy);

condition.push(txn_cond_seq(&tbid, Eq, tb_meta_seq));
if_then.push(txn_op_put(&tbid, serialize_struct(&table_meta)?));
}
let seq_id_list = kv_api.get_pb(&id_list_key).await?;

let Some(seq_id_list) = seq_id_list else {
return Ok(());
};

txn.condition
.push(txn_cond_eq_seq(&id_list_key, seq_id_list.seq));
txn.if_then.push(txn_op_del(&id_list_key));

// remove mask policy from table meta
for table_id in seq_id_list.data.id_list.into_iter() {
let tbid = TableId { table_id };

let (tb_meta_seq, table_meta_opt): (_, Option<TableMeta>) =
get_pb_value(kv_api, &tbid).await?;
if let Some(mut table_meta) = table_meta_opt {
if let Some(column_mask_policy) = table_meta.column_mask_policy {
let new_column_mask_policy = column_mask_policy
.into_iter()
.filter(|(_, name)| name != name_ident.name())
.collect();

table_meta.column_mask_policy = Some(new_column_mask_policy);

txn.condition.push(txn_cond_eq_seq(&tbid, tb_meta_seq));
txn.if_then
.push(txn_op_put(&tbid, serialize_struct(&table_meta)?));
}
}
}
Expand All @@ -295,8 +280,7 @@ async fn construct_drop_mask_policy_operations(
drop_if_exists: bool,
if_delete: bool,
ctx: &str,
condition: &mut Vec<TxnCondition>,
if_then: &mut Vec<TxnOp>,
txn: &mut TxnRequest,
) -> Result<(), KVAppError> {
let result = get_data_mask_or_err(
kv_api,
Expand All @@ -320,13 +304,14 @@ async fn construct_drop_mask_policy_operations(

let id_ident = DataMaskIdIdent::new(name_key.tenant(), id);

condition.push(txn_cond_eq_seq(&id_ident, data_mask_seq));
if_then.push(txn_op_del(&id_ident));
txn.condition
.push(txn_cond_eq_seq(&id_ident, data_mask_seq));
txn.if_then.push(txn_op_del(&id_ident));

if if_delete {
condition.push(txn_cond_eq_seq(name_key, id_seq));
if_then.push(txn_op_del(name_key));
clear_table_column_mask_policy(kv_api, name_key, condition, if_then).await?;
txn.condition.push(txn_cond_eq_seq(name_key, id_seq));
txn.if_then.push(txn_op_del(name_key));
clear_table_column_mask_policy(kv_api, name_key, txn).await?;
}

debug!(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/api/src/kv_pb_api/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::NonEmptyItem;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::Change;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_proto_conv::FromToProto;

use crate::kv_pb_api::errors::NoneValue;
Expand Down
6 changes: 3 additions & 3 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::NonEmptyItem;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::Change;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use futures::future::FutureExt;
Expand Down Expand Up @@ -341,9 +341,9 @@ mod tests {
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_proto_conv::FromToProto;
Expand Down
11 changes: 11 additions & 0 deletions src/meta/api/src/kv_pb_api/upsert_pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::With;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -70,6 +71,16 @@ impl<K: kvapi::Key> UpsertPB<K> {
}
}

/// Update the value only when the seq matches exactly. Note that the meta is not copied.
pub fn update_exact(key: K, value: SeqV<K::ValueType>) -> Self {
Self {
key,
seq: MatchSeq::Exact(value.seq),
value: Operation::Update(value.data),
value_meta: None,
}
}

pub fn delete(key: K) -> Self {
Self {
key,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ use databend_common_meta_app::schema::UpdateVirtualColumnReq;
use databend_common_meta_app::schema::UpsertTableOptionReply;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_app::schema::VirtualColumnMeta;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;

use crate::kv_app_error::KVAppError;

Expand Down
6 changes: 3 additions & 3 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,19 @@ use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf as pb;
use databend_common_meta_types::seq_value::KVMeta;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::txn_op::Request;
use databend_common_meta_types::txn_op_response::Response;
use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::MetaNetworkError;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnGetRequest;
use databend_common_meta_types::TxnGetResponse;
Expand Down
Loading

0 comments on commit 04a4929

Please sign in to comment.