From 04a49291886e970d20b32d12eb37f175d2f995f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 23 Aug 2024 00:28:04 +0800 Subject: [PATCH] chore: simplify create_data_mask (#16309) * chore: move types in mod seqv into separate files * refactor: get_background_job() returns SeqV This way the caller will be able to update the record against the exact seq number. * refactor: add kvapi::Pair * chore: simplify create_data_mask --- src/meta/api/src/background_api_impl.rs | 26 ++-- src/meta/api/src/background_api_test_suite.rs | 4 +- src/meta/api/src/crud/mod.rs | 4 +- src/meta/api/src/data_mask_api_impl.rs | 105 +++++++--------- src/meta/api/src/kv_pb_api/codec.rs | 2 +- src/meta/api/src/kv_pb_api/mod.rs | 6 +- src/meta/api/src/kv_pb_api/upsert_pb.rs | 11 ++ src/meta/api/src/schema_api.rs | 2 +- src/meta/api/src/schema_api_impl.rs | 6 +- src/meta/api/src/util.rs | 4 +- src/meta/app/src/background/background_job.rs | 5 +- src/meta/app/src/schema/database.rs | 2 +- src/meta/kvapi/src/kvapi/api.rs | 2 +- src/meta/kvapi/src/kvapi/item.rs | 2 +- src/meta/kvapi/src/kvapi/message.rs | 2 +- src/meta/kvapi/src/kvapi/mod.rs | 2 + src/meta/kvapi/src/kvapi/pair.rs | 20 ++++ src/meta/kvapi/src/kvapi/test_suite.rs | 4 +- src/meta/process/src/kv_processor.rs | 2 +- src/meta/raft-store/src/applier.rs | 4 +- src/meta/raft-store/src/key_spaces.rs | 2 +- .../src/leveled_store/db_exporter.rs | 2 +- .../src/leveled_store/db_map_api_ro_test.rs | 2 +- .../raft-store/src/leveled_store/level.rs | 2 +- .../leveled_map/leveled_map_test.rs | 2 +- .../leveled_store/leveled_map/map_api_impl.rs | 2 +- .../raft-store/src/leveled_store/map_api.rs | 2 +- .../src/leveled_store/rotbl_seq_mark_impl.rs | 2 +- src/meta/raft-store/src/marked/marked_test.rs | 6 +- src/meta/raft-store/src/marked/mod.rs | 8 +- .../sm_v003/compact_immutable_levels_test.rs | 2 +- .../src/sm_v003/compact_with_db_test.rs | 2 +- src/meta/raft-store/src/sm_v003/sm_v003.rs | 4 +- .../raft-store/src/sm_v003/sm_v003_test.rs | 4 +- src/meta/raft-store/src/state_machine/sm.rs | 8 +- .../src/state_machine/sm_kv_api_impl.rs | 2 +- .../raft-store/tests/it/state_machine/mod.rs | 6 +- src/meta/service/src/api/grpc/grpc_service.rs | 2 +- .../service/src/meta_service/meta_leader.rs | 2 +- .../service/tests/it/grpc/metasrv_grpc_api.rs | 2 +- .../tests/it/grpc/metasrv_grpc_kv_read_v1.rs | 2 +- .../it/meta_node/meta_node_kv_api_expire.rs | 4 +- .../it/meta_node/meta_node_replication.rs | 2 +- src/meta/sled-store/src/sled_serde_impl.rs | 2 +- src/meta/sled-store/src/sled_tree.rs | 2 +- src/meta/sled-store/tests/it/sled_tree.rs | 2 +- .../tests/it/testing/fake_key_spaces.rs | 2 +- src/meta/types/src/change.rs | 4 +- src/meta/types/src/cmd/meta_spec.rs | 2 +- src/meta/types/src/lib.rs | 5 +- src/meta/types/src/match_seq.rs | 4 +- src/meta/types/src/proto_ext/seq_v_ext.rs | 4 +- .../types/src/proto_ext/stream_item_ext.rs | 2 +- src/meta/types/src/proto_ext/txn_ext.rs | 2 +- src/meta/types/src/seq_value/kv_meta.rs | 53 +++++++++ src/meta/types/src/seq_value/mod.rs | 21 ++++ .../types/src/seq_value/seq_value_trait.rs | 47 ++++++++ .../src/{seq_value.rs => seq_value/seqv.rs} | 112 +++--------------- .../background_service_handler.rs | 3 +- .../src/background_service/compaction_job.rs | 7 +- src/query/ee/src/background_service/job.rs | 3 +- .../it/background_service/job_scheduler.rs | 23 ++-- .../management/src/cluster/cluster_mgr.rs | 2 +- src/query/management/src/quota/quota_api.rs | 2 +- src/query/management/src/quota/quota_mgr.rs | 2 +- src/query/management/src/role/role_api.rs | 2 +- src/query/management/src/role/role_mgr.rs | 2 +- src/query/management/src/serde/pb_serde.rs | 4 +- .../management/src/setting/setting_api.rs | 2 +- .../management/src/setting/setting_mgr.rs | 7 +- src/query/management/src/udf/udf_mgr.rs | 2 +- src/query/management/src/user/user_api.rs | 2 +- src/query/management/src/user/user_mgr.rs | 2 +- src/query/management/tests/it/cluster.rs | 2 +- src/query/management/tests/it/setting.rs | 2 +- src/query/management/tests/it/stage.rs | 2 +- src/query/management/tests/it/udf.rs | 2 +- src/query/management/tests/it/user.rs | 2 +- .../src/catalogs/default/mutable_catalog.rs | 2 +- .../src/catalogs/share/share_catalog.rs | 2 +- .../information_schema_database.rs | 2 +- .../src/databases/system/system_database.rs | 2 +- .../interpreters/access/privilege_access.rs | 2 +- .../tests/it/sql/exec/get_table_bind_test.rs | 2 +- .../it/storages/fuse/operations/commit.rs | 2 +- src/query/storages/iceberg/src/catalog.rs | 2 +- src/query/storages/iceberg/src/database.rs | 2 +- .../storages/result_cache/src/meta_manager.rs | 2 +- .../storages/result_cache/src/write/sink.rs | 2 +- 89 files changed, 357 insertions(+), 291 deletions(-) create mode 100644 src/meta/kvapi/src/kvapi/pair.rs create mode 100644 src/meta/types/src/seq_value/kv_meta.rs create mode 100644 src/meta/types/src/seq_value/mod.rs create mode 100644 src/meta/types/src/seq_value/seq_value_trait.rs rename src/meta/types/src/{seq_value.rs => seq_value/seqv.rs} (64%) diff --git a/src/meta/api/src/background_api_impl.rs b/src/meta/api/src/background_api_impl.rs index 7b133a5a8f91..2e181639764b 100644 --- a/src/meta/api/src/background_api_impl.rs +++ b/src/meta/api/src/background_api_impl.rs @@ -43,23 +43,20 @@ use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::ConditionResult::Eq; use databend_common_meta_types::InvalidReply; -use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeq::Any; use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::TxnRequest; -use databend_common_meta_types::With; use fastrace::func_name; use log::debug; use crate::background_api::BackgroundApi; use crate::deserialize_struct; use crate::fetch_id; -use crate::get_pb_value; use crate::get_u64_value; use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; @@ -200,10 +197,10 @@ impl> BackgroundApi for KV { let name_key = &req.name; - let (id_ident, _, job) = + let (id_ident, seq_joq) = get_background_job_or_error(self, name_key, format!("get_: {:?}", name_key)).await?; - Ok(GetBackgroundJobReply::new(id_ident, job)) + Ok(GetBackgroundJobReply::new(id_ident, seq_joq)) } #[fastrace::trace] @@ -315,14 +312,15 @@ async fn get_background_job_or_error( kv_api: &(impl kvapi::KVApi + ?Sized), name_ident: &BackgroundJobIdent, _msg: impl Display, -) -> Result<(BackgroundJobIdIdent, u64, BackgroundJobInfo), KVAppError> { +) -> Result, KVAppError> { let id_ident = get_background_job_id(kv_api, name_ident).await?; - let (id_seq, job_info) = get_pb_value(kv_api, &id_ident).await?; - assert_background_job_exist(id_seq, name_ident)?; + let seq_job = kv_api + .get_pb(&id_ident) + .await? + .ok_or_else(|| unknown_background_job(name_ident))?; - // Safe unwrap(): background_job_seq > 0 implies background_job is not None. - Ok((id_ident, id_seq, job_info.unwrap())) + Ok((id_ident, seq_job)) } /// Return OK if a db_id or db_meta exists by checking the seq. @@ -354,15 +352,15 @@ async fn update_background_job bool>( mutation: F, ) -> Result { debug!(req :? =(name); "BackgroundApi: {}", func_name!()); - let (id_ident, id_val_seq, mut info) = + let (id_ident, mut seq_job) = get_background_job_or_error(kv_api, name, "update_background_job").await?; - let should_update = mutation(&mut info); + let should_update = mutation(&mut seq_job.data); if !should_update { return Ok(UpdateBackgroundJobReply::new(id_ident.clone())); } - let req = UpsertPB::update(id_ident.clone(), info).with(MatchSeq::Exact(id_val_seq)); + let req = UpsertPB::update_exact(id_ident.clone(), seq_job); let resp = kv_api.upsert_pb(&req).await?; assert!(resp.is_changed()); diff --git a/src/meta/api/src/background_api_test_suite.rs b/src/meta/api/src/background_api_test_suite.rs index bd6f05bca1a9..5326b340f270 100644 --- a/src/meta/api/src/background_api_test_suite.rs +++ b/src/meta/api/src/background_api_test_suite.rs @@ -231,7 +231,7 @@ impl BackgroundApiTestSuite { let res = res.unwrap(); assert_eq!( BackgroundJobState::RUNNING, - res.info.job_status.unwrap().job_state, + res.info.data.job_status.unwrap().job_state, "first state is started" ); } @@ -320,7 +320,7 @@ impl BackgroundApiTestSuite { ); assert_eq!( Some("newid".to_string()), - res.info.job_status.unwrap().last_task_id + res.info.data.job_status.unwrap().last_task_id ) } diff --git a/src/meta/api/src/crud/mod.rs b/src/meta/api/src/crud/mod.rs index df31c372cd5d..5bb8b1883abd 100644 --- a/src/meta/api/src/crud/mod.rs +++ b/src/meta/api/src/crud/mod.rs @@ -28,11 +28,11 @@ use databend_common_meta_app::tenant_key::resource::TenantResource; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::ValueWithName; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::With; use databend_common_proto_conv::FromToProto; pub use errors::CrudError; diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 8cbf32298498..d7eba333ff7e 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -34,11 +34,8 @@ use databend_common_meta_app::schema::TableId; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; -use databend_common_meta_types::ConditionResult::Eq; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqValue; -use databend_common_meta_types::TxnCondition; -use databend_common_meta_types::TxnOp; use databend_common_meta_types::TxnRequest; use fastrace::func_name; use log::debug; @@ -54,7 +51,6 @@ use crate::serialize_struct; use crate::serialize_u64; use crate::txn_backoff::txn_backoff; use crate::txn_cond_eq_seq; -use crate::txn_cond_seq; use crate::txn_op_del; use crate::txn_op_put; @@ -78,8 +74,7 @@ impl> DatamaskApi for KV { let (seq, id) = get_u64_value(self, name_ident).await?; debug!(seq = seq, id = id, name_key :? =(name_ident); "create_data_mask"); - let mut condition = vec![]; - let mut if_then = vec![]; + let mut txn = TxnRequest::default(); if seq > 0 { match req.create_option { @@ -99,8 +94,7 @@ impl> DatamaskApi for KV { false, false, func_name!(), - &mut condition, - &mut if_then, + &mut txn, ) .await?; } @@ -126,20 +120,14 @@ impl> DatamaskApi for KV { { let meta: DatamaskMeta = req.clone().into(); let id_list = MaskpolicyTableIdList::default(); - condition.push(txn_cond_seq(name_ident, Eq, seq)); - if_then.extend( vec![ + txn.condition.push(txn_cond_eq_seq(name_ident, seq)); + txn.if_then.extend( vec![ txn_op_put(name_ident, serialize_u64(id)?), // name -> db_id txn_op_put(&id_ident, serialize_struct(&meta)?), // id -> meta txn_op_put(&id_list_key, serialize_struct(&id_list)?), /* data mask name -> id_list */ ]); - let txn_req = TxnRequest { - condition, - if_then, - else_then: vec![], - }; - - let (succ, _responses) = send_txn(self, txn_req).await?; + let (succ, _responses) = send_txn(self, txn).await?; debug!( name :? =(name_ident), @@ -166,8 +154,7 @@ impl> DatamaskApi for KV { loop { trials.next().unwrap()?.await; - let mut condition = vec![]; - let mut if_then = vec![]; + let mut txn = TxnRequest::default(); construct_drop_mask_policy_operations( self, @@ -175,16 +162,11 @@ impl> DatamaskApi for KV { req.if_exists, true, func_name!(), - &mut condition, - &mut if_then, + &mut txn, ) .await?; - let txn_req = TxnRequest { - condition, - if_then, - else_then: vec![], - }; - let (succ, _responses) = send_txn(self, txn_req).await?; + + let (succ, _responses) = send_txn(self, txn).await?; debug!( succ = succ; @@ -253,35 +235,38 @@ pub fn assert_data_mask_exist( async fn clear_table_column_mask_policy( kv_api: &(impl kvapi::KVApi + ?Sized), name_ident: &DataMaskNameIdent, - condition: &mut Vec, - if_then: &mut Vec, + txn: &mut TxnRequest, ) -> Result<(), KVAppError> { let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); - let (id_list_seq, id_list_opt): (_, Option) = - get_pb_value(kv_api, &id_list_key).await?; - if let Some(id_list) = id_list_opt { - condition.push(txn_cond_seq(&id_list_key, Eq, id_list_seq)); - if_then.push(txn_op_del(&id_list_key)); - - // remove mask policy from table meta - for table_id in id_list.id_list.into_iter() { - let tbid = TableId { table_id }; - - let (tb_meta_seq, table_meta_opt): (_, Option) = - get_pb_value(kv_api, &tbid).await?; - if let Some(mut table_meta) = table_meta_opt { - if let Some(column_mask_policy) = table_meta.column_mask_policy { - let new_column_mask_policy = column_mask_policy - .into_iter() - .filter(|(_, name)| name != name_ident.name()) - .collect(); - - table_meta.column_mask_policy = Some(new_column_mask_policy); - - condition.push(txn_cond_seq(&tbid, Eq, tb_meta_seq)); - if_then.push(txn_op_put(&tbid, serialize_struct(&table_meta)?)); - } + let seq_id_list = kv_api.get_pb(&id_list_key).await?; + + let Some(seq_id_list) = seq_id_list else { + return Ok(()); + }; + + txn.condition + .push(txn_cond_eq_seq(&id_list_key, seq_id_list.seq)); + txn.if_then.push(txn_op_del(&id_list_key)); + + // remove mask policy from table meta + for table_id in seq_id_list.data.id_list.into_iter() { + let tbid = TableId { table_id }; + + let (tb_meta_seq, table_meta_opt): (_, Option) = + get_pb_value(kv_api, &tbid).await?; + if let Some(mut table_meta) = table_meta_opt { + if let Some(column_mask_policy) = table_meta.column_mask_policy { + let new_column_mask_policy = column_mask_policy + .into_iter() + .filter(|(_, name)| name != name_ident.name()) + .collect(); + + table_meta.column_mask_policy = Some(new_column_mask_policy); + + txn.condition.push(txn_cond_eq_seq(&tbid, tb_meta_seq)); + txn.if_then + .push(txn_op_put(&tbid, serialize_struct(&table_meta)?)); } } } @@ -295,8 +280,7 @@ async fn construct_drop_mask_policy_operations( drop_if_exists: bool, if_delete: bool, ctx: &str, - condition: &mut Vec, - if_then: &mut Vec, + txn: &mut TxnRequest, ) -> Result<(), KVAppError> { let result = get_data_mask_or_err( kv_api, @@ -320,13 +304,14 @@ async fn construct_drop_mask_policy_operations( let id_ident = DataMaskIdIdent::new(name_key.tenant(), id); - condition.push(txn_cond_eq_seq(&id_ident, data_mask_seq)); - if_then.push(txn_op_del(&id_ident)); + txn.condition + .push(txn_cond_eq_seq(&id_ident, data_mask_seq)); + txn.if_then.push(txn_op_del(&id_ident)); if if_delete { - condition.push(txn_cond_eq_seq(name_key, id_seq)); - if_then.push(txn_op_del(name_key)); - clear_table_column_mask_policy(kv_api, name_key, condition, if_then).await?; + txn.condition.push(txn_cond_eq_seq(name_key, id_seq)); + txn.if_then.push(txn_op_del(name_key)); + clear_table_column_mask_policy(kv_api, name_key, txn).await?; } debug!( diff --git a/src/meta/api/src/kv_pb_api/codec.rs b/src/meta/api/src/kv_pb_api/codec.rs index c1485acaeb6f..0eb00bacb35e 100644 --- a/src/meta/api/src/kv_pb_api/codec.rs +++ b/src/meta/api/src/kv_pb_api/codec.rs @@ -15,9 +15,9 @@ use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::NonEmptyItem; use databend_common_meta_types::protobuf::StreamItem; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Change; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use databend_common_proto_conv::FromToProto; use crate::kv_pb_api::errors::NoneValue; diff --git a/src/meta/api/src/kv_pb_api/mod.rs b/src/meta/api/src/kv_pb_api/mod.rs index 98e92736fa5c..e86a41bee6af 100644 --- a/src/meta/api/src/kv_pb_api/mod.rs +++ b/src/meta/api/src/kv_pb_api/mod.rs @@ -26,8 +26,8 @@ use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_kvapi::kvapi::NonEmptyItem; use databend_common_meta_types::protobuf::StreamItem; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Change; -use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use databend_common_proto_conv::FromToProto; use futures::future::FutureExt; @@ -341,9 +341,9 @@ mod tests { use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::protobuf::StreamItem; + use databend_common_meta_types::seq_value::SeqV; + use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::MetaError; - use databend_common_meta_types::SeqV; - use databend_common_meta_types::SeqValue; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use databend_common_proto_conv::FromToProto; diff --git a/src/meta/api/src/kv_pb_api/upsert_pb.rs b/src/meta/api/src/kv_pb_api/upsert_pb.rs index b3723cc499d8..cb7614f02450 100644 --- a/src/meta/api/src/kv_pb_api/upsert_pb.rs +++ b/src/meta/api/src/kv_pb_api/upsert_pb.rs @@ -18,6 +18,7 @@ use databend_common_meta_kvapi::kvapi; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Operation; +use databend_common_meta_types::SeqV; use databend_common_meta_types::With; #[derive(Clone, Debug)] @@ -70,6 +71,16 @@ impl UpsertPB { } } + /// Update the value only when the seq matches exactly. Note that the meta is not copied. + pub fn update_exact(key: K, value: SeqV) -> Self { + Self { + key, + seq: MatchSeq::Exact(value.seq), + value: Operation::Update(value.data), + value_meta: None, + } + } + pub fn delete(key: K) -> Self { Self { key, diff --git a/src/meta/api/src/schema_api.rs b/src/meta/api/src/schema_api.rs index 2164316e8104..08a8b72d9bbe 100644 --- a/src/meta/api/src/schema_api.rs +++ b/src/meta/api/src/schema_api.rs @@ -104,9 +104,9 @@ use databend_common_meta_app::schema::UpdateVirtualColumnReq; use databend_common_meta_app::schema::UpsertTableOptionReply; use databend_common_meta_app::schema::UpsertTableOptionReq; use databend_common_meta_app::schema::VirtualColumnMeta; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaId; -use databend_common_meta_types::SeqV; use crate::kv_app_error::KVAppError; diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index f0816286d62d..66ca966ffbc1 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -207,19 +207,19 @@ use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::seq_value::KVMeta; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::txn_op::Request; use databend_common_meta_types::txn_op_response::Response; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::InvalidReply; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaId; use databend_common_meta_types::MetaNetworkError; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnGetRequest; use databend_common_meta_types::TxnGetResponse; diff --git a/src/meta/api/src/util.rs b/src/meta/api/src/util.rs index 3c70c607a1e0..661a9a40cafc 100644 --- a/src/meta/api/src/util.rs +++ b/src/meta/api/src/util.rs @@ -75,6 +75,8 @@ use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::txn_condition::Target; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::InvalidArgument; @@ -83,8 +85,6 @@ use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaNetworkError; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnGetResponse; use databend_common_meta_types::TxnOp; diff --git a/src/meta/app/src/background/background_job.rs b/src/meta/app/src/background/background_job.rs index ab7ddd519dae..7cad94931c14 100644 --- a/src/meta/app/src/background/background_job.rs +++ b/src/meta/app/src/background/background_job.rs @@ -20,6 +20,7 @@ use std::str::FromStr; use chrono::DateTime; use chrono::Utc; use cron::Schedule; +use databend_common_meta_types::SeqV; use crate::background::BackgroundJobIdIdent; use crate::background::BackgroundJobIdent; @@ -256,11 +257,11 @@ impl Display for GetBackgroundJobReq { #[derive(Clone, Debug, PartialEq, Eq)] pub struct GetBackgroundJobReply { pub id_ident: BackgroundJobIdIdent, - pub info: BackgroundJobInfo, + pub info: SeqV, } impl GetBackgroundJobReply { - pub fn new(id_ident: BackgroundJobIdIdent, info: BackgroundJobInfo) -> Self { + pub fn new(id_ident: BackgroundJobIdIdent, info: SeqV) -> Self { Self { id_ident, info } } } diff --git a/src/meta/app/src/schema/database.rs b/src/meta/app/src/schema/database.rs index 296bb03faff3..3b95512ce27c 100644 --- a/src/meta/app/src/schema/database.rs +++ b/src/meta/app/src/schema/database.rs @@ -21,7 +21,7 @@ use std::ops::Deref; use chrono::DateTime; use chrono::Utc; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use super::CreateOption; use crate::schema::database_id::DatabaseId; diff --git a/src/meta/kvapi/src/kvapi/api.rs b/src/meta/kvapi/src/kvapi/api.rs index b2e857c6a138..4370defe4186 100644 --- a/src/meta/kvapi/src/kvapi/api.rs +++ b/src/meta/kvapi/src/kvapi/api.rs @@ -17,7 +17,7 @@ use std::ops::Deref; use async_trait::async_trait; use databend_common_meta_types::errors; use databend_common_meta_types::protobuf::StreamItem; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use futures_util::stream::BoxStream; diff --git a/src/meta/kvapi/src/kvapi/item.rs b/src/meta/kvapi/src/kvapi/item.rs index 6f01b1cfd8e7..8495a929ed57 100644 --- a/src/meta/kvapi/src/kvapi/item.rs +++ b/src/meta/kvapi/src/kvapi/item.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use crate::kvapi::Key; diff --git a/src/meta/kvapi/src/kvapi/message.rs b/src/meta/kvapi/src/kvapi/message.rs index 1760504066f7..8f37e5cb3cf9 100644 --- a/src/meta/kvapi/src/kvapi/message.rs +++ b/src/meta/kvapi/src/kvapi/message.rs @@ -15,8 +15,8 @@ use std::fmt; use std::fmt::Formatter; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Change; -use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::VecDisplay; diff --git a/src/meta/kvapi/src/kvapi/mod.rs b/src/meta/kvapi/src/kvapi/mod.rs index ef709944f0dc..4c8838d7a33e 100644 --- a/src/meta/kvapi/src/kvapi/mod.rs +++ b/src/meta/kvapi/src/kvapi/mod.rs @@ -21,6 +21,7 @@ mod key_builder; mod key_codec; mod key_parser; mod message; +mod pair; mod prefix; mod test_suite; mod value; @@ -48,6 +49,7 @@ pub use message::MGetKVReply; pub use message::MGetKVReq; pub use message::UpsertKVReply; pub use message::UpsertKVReq; +pub use pair::Pair; pub use prefix::prefix_to_range; pub use test_suite::TestSuite; pub use value::Value; diff --git a/src/meta/kvapi/src/kvapi/pair.rs b/src/meta/kvapi/src/kvapi/pair.rs new file mode 100644 index 000000000000..305e89542475 --- /dev/null +++ b/src/meta/kvapi/src/kvapi/pair.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_types::SeqV; + +use crate::kvapi; + +/// A Key-Value pair for type Key. The value has a seq number. +pub type Pair = (K, SeqV<::ValueType>); diff --git a/src/meta/kvapi/src/kvapi/test_suite.rs b/src/meta/kvapi/src/kvapi/test_suite.rs index b3d265ec17d0..7e10ea5d1577 100644 --- a/src/meta/kvapi/src/kvapi/test_suite.rs +++ b/src/meta/kvapi/src/kvapi/test_suite.rs @@ -15,15 +15,15 @@ use std::time::Duration; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::seq_value::KVMeta; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::txn_condition; use databend_common_meta_types::txn_op; use databend_common_meta_types::txn_op_response; use databend_common_meta_types::ConditionResult; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnDeleteByPrefixRequest; use databend_common_meta_types::TxnDeleteByPrefixResponse; diff --git a/src/meta/process/src/kv_processor.rs b/src/meta/process/src/kv_processor.rs index 37f5789096ea..47e7c2e80b90 100644 --- a/src/meta/process/src/kv_processor.rs +++ b/src/meta/process/src/kv_processor.rs @@ -14,13 +14,13 @@ use anyhow::Error; use databend_common_meta_raft_store::key_spaces::RaftStoreEntry; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::txn_condition::Target; use databend_common_meta_types::txn_op::Request; use databend_common_meta_types::Cmd; use databend_common_meta_types::Entry; use databend_common_meta_types::LogEntry; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnOp; use databend_common_meta_types::TxnPutRequest; diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index 4aac71323817..5a985d99115b 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -17,6 +17,8 @@ use std::time::Duration; use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::txn_condition; use databend_common_meta_types::txn_op; use databend_common_meta_types::txn_op_response; @@ -31,8 +33,6 @@ use databend_common_meta_types::Interval; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Node; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::StoredMembership; use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnDeleteByPrefixRequest; diff --git a/src/meta/raft-store/src/key_spaces.rs b/src/meta/raft-store/src/key_spaces.rs index a163bfa43e06..eb957026f466 100644 --- a/src/meta/raft-store/src/key_spaces.rs +++ b/src/meta/raft-store/src/key_spaces.rs @@ -20,13 +20,13 @@ use databend_common_meta_sled_store::SledKeySpace; use databend_common_meta_sled_store::SledOrderedSerde; use databend_common_meta_sled_store::SledSerde; use databend_common_meta_stoerr::MetaStorageError; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Entry; use databend_common_meta_types::LogId; use databend_common_meta_types::LogIndex; use databend_common_meta_types::Node; use databend_common_meta_types::NodeId; use databend_common_meta_types::SeqNum; -use databend_common_meta_types::SeqV; use serde::Deserialize; use serde::Serialize; diff --git a/src/meta/raft-store/src/leveled_store/db_exporter.rs b/src/meta/raft-store/src/leveled_store/db_exporter.rs index c926dc749e12..acd59e7b1c16 100644 --- a/src/meta/raft-store/src/leveled_store/db_exporter.rs +++ b/src/meta/raft-store/src/leveled_store/db_exporter.rs @@ -17,9 +17,9 @@ use std::future; use std::io; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::snapshot_db::DB; use databend_common_meta_types::SeqNum; -use databend_common_meta_types::SeqV; use futures_util::StreamExt; use futures_util::TryStreamExt; use log::info; diff --git a/src/meta/raft-store/src/leveled_store/db_map_api_ro_test.rs b/src/meta/raft-store/src/leveled_store/db_map_api_ro_test.rs index eb42b9ee8b42..6021826fa1bd 100644 --- a/src/meta/raft-store/src/leveled_store/db_map_api_ro_test.rs +++ b/src/meta/raft-store/src/leveled_store/db_map_api_ro_test.rs @@ -14,7 +14,7 @@ //! Test for db_map_api_ro_impl. -use databend_common_meta_types::KVMeta; +use databend_common_meta_types::seq_value::KVMeta; use databend_common_meta_types::UpsertKV; use futures_util::TryStreamExt; diff --git a/src/meta/raft-store/src/leveled_store/level.rs b/src/meta/raft-store/src/leveled_store/level.rs index 953de9d30a23..c02c17bc9f15 100644 --- a/src/meta/raft-store/src/leveled_store/level.rs +++ b/src/meta/raft-store/src/leveled_store/level.rs @@ -17,8 +17,8 @@ use std::collections::BTreeMap; use std::io; use std::ops::RangeBounds; +use databend_common_meta_types::seq_value::KVMeta; use databend_common_meta_types::sys_data::SysData; -use databend_common_meta_types::KVMeta; use futures_util::StreamExt; use log::warn; diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs b/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs index 0d6720c4d9bc..e9739d9c0ef2 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_meta_types::KVMeta; +use databend_common_meta_types::seq_value::KVMeta; use futures_util::TryStreamExt; use crate::leveled_store::leveled_map::LeveledMap; diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/map_api_impl.rs b/src/meta/raft-store/src/leveled_store/leveled_map/map_api_impl.rs index 814a34faa827..1c39618a95ac 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/map_api_impl.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/map_api_impl.rs @@ -17,8 +17,8 @@ use std::fmt; use std::io; use std::ops::RangeBounds; +use databend_common_meta_types::seq_value::KVMeta; use databend_common_meta_types::snapshot_db::DB; -use databend_common_meta_types::KVMeta; use crate::leveled_store::immutable::Immutable; use crate::leveled_store::level::Level; diff --git a/src/meta/raft-store/src/leveled_store/map_api.rs b/src/meta/raft-store/src/leveled_store/map_api.rs index dc89c0475276..dfd5d875328d 100644 --- a/src/meta/raft-store/src/leveled_store/map_api.rs +++ b/src/meta/raft-store/src/leveled_store/map_api.rs @@ -22,7 +22,7 @@ use std::fmt::Write; use std::io; use std::ops::RangeBounds; -use databend_common_meta_types::KVMeta; +use databend_common_meta_types::seq_value::KVMeta; use futures::stream::StreamExt; use futures_util::stream::BoxStream; use stream_more::KMerge; diff --git a/src/meta/raft-store/src/leveled_store/rotbl_seq_mark_impl.rs b/src/meta/raft-store/src/leveled_store/rotbl_seq_mark_impl.rs index 760194d199e2..78f863fb10ad 100644 --- a/src/meta/raft-store/src/leveled_store/rotbl_seq_mark_impl.rs +++ b/src/meta/raft-store/src/leveled_store/rotbl_seq_mark_impl.rs @@ -16,7 +16,7 @@ use std::io; -use databend_common_meta_types::KVMeta; +use databend_common_meta_types::seq_value::KVMeta; use rotbl::v001::SeqMarked; use crate::marked::Marked; diff --git a/src/meta/raft-store/src/marked/marked_test.rs b/src/meta/raft-store/src/marked/marked_test.rs index 8d9f8fd85ea7..007282e6d43c 100644 --- a/src/meta/raft-store/src/marked/marked_test.rs +++ b/src/meta/raft-store/src/marked/marked_test.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_meta_types::KVMeta; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; +use databend_common_meta_types::seq_value::KVMeta; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use crate::marked::InternalSeq; use crate::marked::Marked; diff --git a/src/meta/raft-store/src/marked/mod.rs b/src/meta/raft-store/src/marked/mod.rs index 531209db3033..44c52a99df2c 100644 --- a/src/meta/raft-store/src/marked/mod.rs +++ b/src/meta/raft-store/src/marked/mod.rs @@ -19,9 +19,9 @@ mod internal_seq; mod marked_impl; -use databend_common_meta_types::KVMeta; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; +use databend_common_meta_types::seq_value::KVMeta; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; pub(crate) use internal_seq::InternalSeq; use crate::state_machine::ExpireValue; @@ -243,7 +243,7 @@ impl From> for Option { #[cfg(test)] mod tests { - use databend_common_meta_types::KVMeta; + use databend_common_meta_types::seq_value::KVMeta; use super::Marked; diff --git a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs index b1e216ff23be..a911829bd5e4 100644 --- a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_types::seq_value::KVMeta; use databend_common_meta_types::Endpoint; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::Membership; use databend_common_meta_types::Node; use databend_common_meta_types::StoredMembership; diff --git a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs index 46caf5b5861c..2468b9cef06d 100644 --- a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs @@ -14,8 +14,8 @@ use std::io; +use databend_common_meta_types::seq_value::KVMeta; use databend_common_meta_types::Endpoint; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::Membership; use databend_common_meta_types::Node; use databend_common_meta_types::StoredMembership; diff --git a/src/meta/raft-store/src/sm_v003/sm_v003.rs b/src/meta/raft-store/src/sm_v003/sm_v003.rs index 7edebfb7630b..9b30d7ca72fa 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003.rs @@ -21,6 +21,8 @@ use databend_common_meta_kvapi::kvapi::KVStream; use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::protobuf::StreamItem; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::snapshot_db::DB; use databend_common_meta_types::sys_data::SysData; use databend_common_meta_types::AppliedState; @@ -29,8 +31,6 @@ use databend_common_meta_types::Entry; use databend_common_meta_types::EvalExpireTime; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::StorageError; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; diff --git a/src/meta/raft-store/src/sm_v003/sm_v003_test.rs b/src/meta/raft-store/src/sm_v003/sm_v003_test.rs index e6688120a3c5..c9418521e87c 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003_test.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003_test.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::UpsertKV; use futures_util::TryStreamExt; use pretty_assertions::assert_eq; diff --git a/src/meta/raft-store/src/state_machine/sm.rs b/src/meta/raft-store/src/state_machine/sm.rs index 06b7687c508b..75d1672d62a4 100644 --- a/src/meta/raft-store/src/state_machine/sm.rs +++ b/src/meta/raft-store/src/state_machine/sm.rs @@ -28,6 +28,8 @@ use databend_common_meta_sled_store::Store; use databend_common_meta_sled_store::TransactionSledTree; use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::txn_condition; use databend_common_meta_types::txn_op; use databend_common_meta_types::txn_op_response; @@ -46,8 +48,6 @@ use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Node; use databend_common_meta_types::NodeId; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::StoredMembership; use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnDeleteByPrefixRequest; @@ -1054,8 +1054,8 @@ impl StateMachine { #[cfg(test)] mod tests { - use databend_common_meta_types::KVMeta; - use databend_common_meta_types::SeqV; + use databend_common_meta_types::seq_value::KVMeta; + use databend_common_meta_types::seq_value::SeqV; use crate::state_machine::StateMachine; diff --git a/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs b/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs index cd1d5b3b5e89..61466bcb3e7e 100644 --- a/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs +++ b/src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs @@ -17,10 +17,10 @@ use databend_common_meta_kvapi::kvapi::KVStream; use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::protobuf::StreamItem; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::AppliedState; use databend_common_meta_types::Cmd; use databend_common_meta_types::MetaError; -use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use databend_common_meta_types::UpsertKV; diff --git a/src/meta/raft-store/tests/it/state_machine/mod.rs b/src/meta/raft-store/tests/it/state_machine/mod.rs index 5546c748e43f..797dbdd5f250 100644 --- a/src/meta/raft-store/tests/it/state_machine/mod.rs +++ b/src/meta/raft-store/tests/it/state_machine/mod.rs @@ -19,6 +19,9 @@ use std::time::UNIX_EPOCH; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_raft_store::state_machine::StateMachine; use databend_common_meta_types::new_log_id; +use databend_common_meta_types::seq_value::KVMeta; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::AppliedState; use databend_common_meta_types::Change; use databend_common_meta_types::Cmd; @@ -26,14 +29,11 @@ use databend_common_meta_types::CmdContext; use databend_common_meta_types::Endpoint; use databend_common_meta_types::Entry; use databend_common_meta_types::EntryPayload; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::LogEntry; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Node; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use log::info; diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 4fe30a782ab4..387e4dcd83fa 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -41,12 +41,12 @@ use databend_common_meta_types::protobuf::RaftRequest; use databend_common_meta_types::protobuf::StreamItem; use databend_common_meta_types::protobuf::WatchRequest; use databend_common_meta_types::protobuf::WatchResponse; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::AppliedState; use databend_common_meta_types::Cmd; use databend_common_meta_types::Endpoint; use databend_common_meta_types::GrpcHelper; use databend_common_meta_types::LogEntry; -use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use databend_common_metrics::count::Count; diff --git a/src/meta/service/src/meta_service/meta_leader.rs b/src/meta/service/src/meta_service/meta_leader.rs index 9432ae3dcbbb..39350e2c9016 100644 --- a/src/meta/service/src/meta_service/meta_leader.rs +++ b/src/meta/service/src/meta_service/meta_leader.rs @@ -22,6 +22,7 @@ use databend_common_meta_raft_store::sm_v003::SMV003; use databend_common_meta_sled_store::openraft::ChangeMembers; use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::protobuf::StreamItem; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::AppliedState; use databend_common_meta_types::ClientWriteError; use databend_common_meta_types::Cmd; @@ -33,7 +34,6 @@ use databend_common_meta_types::MetaOperationError; use databend_common_meta_types::Node; use databend_common_meta_types::NodeId; use databend_common_meta_types::RaftError; -use databend_common_meta_types::SeqV; use databend_common_metrics::count::Count; use futures::StreamExt; use log::debug; diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs index 55eb5eb3b156..b121cb784812 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs @@ -20,7 +20,7 @@ use databend_common_base::base::Stoppable; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use log::debug; use log::info; use pretty_assertions::assert_eq; diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs index 61664cb3aa9d..f510c5563cd4 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs @@ -25,8 +25,8 @@ use databend_common_meta_kvapi::kvapi::MGetKVReq; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::protobuf as pb; use databend_common_meta_types::protobuf::KvMeta; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MetaSpec; -use databend_common_meta_types::SeqV; use databend_common_meta_types::With; use futures::stream::StreamExt; use futures::TryStreamExt; diff --git a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs index 810a454c32d7..38874a91e6da 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs @@ -16,12 +16,12 @@ use std::time::Duration; use databend_common_base::base::tokio::time::sleep; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_types::seq_value::KVMeta; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Cmd; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::LogEntry; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaSpec; -use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use log::info; diff --git a/src/meta/service/tests/it/meta_node/meta_node_replication.rs b/src/meta/service/tests/it/meta_node/meta_node_replication.rs index d8cfbe25f3d1..4dac4d55b083 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_replication.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_replication.rs @@ -24,13 +24,13 @@ use databend_common_meta_sled_store::openraft::LogIdOptionExt; use databend_common_meta_sled_store::openraft::ServerState; use databend_common_meta_types::protobuf::SnapshotChunkRequest; use databend_common_meta_types::protobuf::SnapshotChunkRequestV003; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::sys_data::SysData; use databend_common_meta_types::Cmd; use databend_common_meta_types::InstallSnapshotError; use databend_common_meta_types::InstallSnapshotRequest; use databend_common_meta_types::LogEntry; use databend_common_meta_types::RaftError; -use databend_common_meta_types::SeqV; use databend_common_meta_types::SnapshotMeta; use databend_common_meta_types::SnapshotResponse; use databend_common_meta_types::StoredMembership; diff --git a/src/meta/sled-store/src/sled_serde_impl.rs b/src/meta/sled-store/src/sled_serde_impl.rs index dde06376165b..08825465223b 100644 --- a/src/meta/sled-store/src/sled_serde_impl.rs +++ b/src/meta/sled-store/src/sled_serde_impl.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Entry; use databend_common_meta_types::EntryPayload; use databend_common_meta_types::LogId; use databend_common_meta_types::Membership; use databend_common_meta_types::Node; use databend_common_meta_types::SeqNum; -use databend_common_meta_types::SeqV; use databend_common_meta_types::SnapshotMeta; use databend_common_meta_types::StoredMembership; use databend_common_meta_types::Vote; diff --git a/src/meta/sled-store/src/sled_tree.rs b/src/meta/sled-store/src/sled_tree.rs index 8842790b1924..b64c4bbd4674 100644 --- a/src/meta/sled-store/src/sled_tree.rs +++ b/src/meta/sled-store/src/sled_tree.rs @@ -21,8 +21,8 @@ use std::time::Duration; use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::anyerror::AnyError; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Change; -use databend_common_meta_types::SeqV; use fastrace::func_name; use log::debug; use log::warn; diff --git a/src/meta/sled-store/tests/it/sled_tree.rs b/src/meta/sled-store/tests/it/sled_tree.rs index 9bb7ed918fe5..11ee7a6a6ed1 100644 --- a/src/meta/sled-store/tests/it/sled_tree.rs +++ b/src/meta/sled-store/tests/it/sled_tree.rs @@ -14,12 +14,12 @@ use databend_common_meta_sled_store::SledTree; use databend_common_meta_types::new_log_id; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Cmd; use databend_common_meta_types::Entry; use databend_common_meta_types::EntryPayload; use databend_common_meta_types::LogEntry; use databend_common_meta_types::LogIndex; -use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use test_harness::test; diff --git a/src/meta/sled-store/tests/it/testing/fake_key_spaces.rs b/src/meta/sled-store/tests/it/testing/fake_key_spaces.rs index 529c40ebc619..6d69d02f8524 100644 --- a/src/meta/sled-store/tests/it/testing/fake_key_spaces.rs +++ b/src/meta/sled-store/tests/it/testing/fake_key_spaces.rs @@ -13,11 +13,11 @@ // limitations under the License. use databend_common_meta_sled_store::SledKeySpace; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::Entry; use databend_common_meta_types::LogIndex; use databend_common_meta_types::Node; use databend_common_meta_types::NodeId; -use databend_common_meta_types::SeqV; use crate::testing::fake_state_machine_meta::StateMachineMetaKey; use crate::testing::fake_state_machine_meta::StateMachineMetaValue; diff --git a/src/meta/types/src/change.rs b/src/meta/types/src/change.rs index 142edfc10eae..b6d3e83c3324 100644 --- a/src/meta/types/src/change.rs +++ b/src/meta/types/src/change.rs @@ -19,8 +19,8 @@ use std::fmt::Formatter; use serde::Deserialize; use serde::Serialize; -use crate::SeqV; -use crate::SeqValue; +use crate::seq_value::SeqV; +use crate::seq_value::SeqValue; /// `Change` describes a state transition: the states before and after an operation. /// diff --git a/src/meta/types/src/cmd/meta_spec.rs b/src/meta/types/src/cmd/meta_spec.rs index 7d4402a3f9a3..9bc454793406 100644 --- a/src/meta/types/src/cmd/meta_spec.rs +++ b/src/meta/types/src/cmd/meta_spec.rs @@ -88,7 +88,7 @@ mod tests { use super::MetaSpec; use crate::cmd::CmdContext; - use crate::KVMeta; + use crate::seq_value::KVMeta; use crate::Time; #[test] diff --git a/src/meta/types/src/lib.rs b/src/meta/types/src/lib.rs index e07578583f72..11af3e85e5f3 100644 --- a/src/meta/types/src/lib.rs +++ b/src/meta/types/src/lib.rs @@ -31,10 +31,8 @@ mod non_empty; mod operation; mod raft_snapshot_data; mod raft_txid; -pub mod raft_types; mod seq_errors; mod seq_num; -mod seq_value; mod time; mod with; @@ -44,6 +42,8 @@ mod proto_ext; pub mod cmd; pub mod config; pub mod errors; +pub mod raft_types; +pub mod seq_value; pub mod snapshot_db; pub mod sys_data; @@ -106,7 +106,6 @@ pub use protobuf::TxnRequest; pub use raft_txid::RaftTxId; pub use seq_errors::ConflictSeq; pub use seq_num::SeqNum; -pub use seq_value::IntoSeqV; pub use seq_value::KVMeta; pub use seq_value::SeqV; pub use seq_value::SeqValue; diff --git a/src/meta/types/src/match_seq.rs b/src/meta/types/src/match_seq.rs index 6dcd6b233201..ea514378e063 100644 --- a/src/meta/types/src/match_seq.rs +++ b/src/meta/types/src/match_seq.rs @@ -18,8 +18,8 @@ use std::fmt::Formatter; use serde::Deserialize; use serde::Serialize; +use crate::seq_value::SeqV; use crate::ConflictSeq; -use crate::SeqV; /// Describes what `seq` an operation must match to take effect. /// Every value written to meta data has a unique `seq` bound. @@ -92,10 +92,10 @@ impl MatchSeqExt for MatchSeq { #[cfg(test)] mod tests { + use crate::seq_value::SeqV; use crate::ConflictSeq; use crate::MatchSeq; use crate::MatchSeqExt; - use crate::SeqV; #[derive(serde::Serialize)] struct Foo { diff --git a/src/meta/types/src/proto_ext/seq_v_ext.rs b/src/meta/types/src/proto_ext/seq_v_ext.rs index 093e725c23c3..1f6d57a44193 100644 --- a/src/meta/types/src/proto_ext/seq_v_ext.rs +++ b/src/meta/types/src/proto_ext/seq_v_ext.rs @@ -13,8 +13,8 @@ // limitations under the License. use crate::protobuf as pb; -use crate::KVMeta; -use crate::SeqV; +use crate::seq_value::KVMeta; +use crate::seq_value::SeqV; impl From for pb::KvMeta { fn from(m: KVMeta) -> Self { diff --git a/src/meta/types/src/proto_ext/stream_item_ext.rs b/src/meta/types/src/proto_ext/stream_item_ext.rs index 81b9c34dc8f5..212ce3a32dc1 100644 --- a/src/meta/types/src/proto_ext/stream_item_ext.rs +++ b/src/meta/types/src/proto_ext/stream_item_ext.rs @@ -14,7 +14,7 @@ use crate::protobuf as pb; use crate::protobuf::StreamItem; -use crate::SeqV; +use crate::seq_value::SeqV; impl StreamItem { pub fn new(key: String, value: Option) -> Self { diff --git a/src/meta/types/src/proto_ext/txn_ext.rs b/src/meta/types/src/proto_ext/txn_ext.rs index d0a89b4c646d..6ec24398182a 100644 --- a/src/meta/types/src/proto_ext/txn_ext.rs +++ b/src/meta/types/src/proto_ext/txn_ext.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::protobuf as pb; -use crate::SeqV; +use crate::seq_value::SeqV; use crate::TxnRequest; impl TxnRequest { diff --git a/src/meta/types/src/seq_value/kv_meta.rs b/src/meta/types/src/seq_value/kv_meta.rs new file mode 100644 index 000000000000..340739d41b09 --- /dev/null +++ b/src/meta/types/src/seq_value/kv_meta.rs @@ -0,0 +1,53 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::Deserialize; +use serde::Serialize; + +use crate::EvalExpireTime; + +/// The meta data of a record in kv +#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)] +pub struct KVMeta { + /// expiration time in second since 1970 + pub(crate) expire_at: Option, +} + +impl KVMeta { + /// Create a new KVMeta + pub fn new(expire_at: Option) -> Self { + Self { expire_at } + } + + /// Create a KVMeta with a absolute expiration time in second since 1970-01-01. + pub fn new_expire(expire_at: u64) -> Self { + Self { + expire_at: Some(expire_at), + } + } + + /// Returns expire time in millisecond since 1970. + pub fn get_expire_at_ms(&self) -> Option { + self.expire_at.map(|t| t * 1000) + } +} + +impl EvalExpireTime for KVMeta { + fn eval_expire_at_ms(&self) -> u64 { + match self.expire_at { + None => u64::MAX, + Some(exp_at_sec) => exp_at_sec * 1000, + } + } +} diff --git a/src/meta/types/src/seq_value/mod.rs b/src/meta/types/src/seq_value/mod.rs new file mode 100644 index 000000000000..71ea39b847dc --- /dev/null +++ b/src/meta/types/src/seq_value/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod kv_meta; +mod seq_value_trait; +mod seqv; + +pub use kv_meta::KVMeta; +pub use seq_value_trait::SeqValue; +pub use seqv::SeqV; diff --git a/src/meta/types/src/seq_value/seq_value_trait.rs b/src/meta/types/src/seq_value/seq_value_trait.rs new file mode 100644 index 000000000000..16e3a06b0d99 --- /dev/null +++ b/src/meta/types/src/seq_value/seq_value_trait.rs @@ -0,0 +1,47 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::seq_value::kv_meta::KVMeta; +use crate::EvalExpireTime; + +pub trait SeqValue> { + fn seq(&self) -> u64; + fn value(&self) -> Option<&V>; + fn into_value(self) -> Option; + fn meta(&self) -> Option<&KVMeta>; + + fn unpack(self) -> (u64, Option) + where Self: Sized { + (self.seq(), self.into_value()) + } + + /// Return the expire time in millisecond since 1970. + fn get_expire_at_ms(&self) -> Option { + if let Some(meta) = self.meta() { + meta.get_expire_at_ms() + } else { + None + } + } + + /// Evaluate and returns the absolute expire time in millisecond since 1970. + fn eval_expire_at_ms(&self) -> u64 { + self.meta().eval_expire_at_ms() + } + + /// Return true if the record is expired. + fn is_expired(&self, now_ms: u64) -> bool { + self.eval_expire_at_ms() < now_ms + } +} diff --git a/src/meta/types/src/seq_value.rs b/src/meta/types/src/seq_value/seqv.rs similarity index 64% rename from src/meta/types/src/seq_value.rs rename to src/meta/types/src/seq_value/seqv.rs index 3b0bfea65e93..037b866ea7f3 100644 --- a/src/meta/types/src/seq_value.rs +++ b/src/meta/types/src/seq_value/seqv.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::convert::TryInto; +use std::fmt; use std::fmt::Formatter; use std::ops::Deref; use std::ops::DerefMut; @@ -22,68 +22,8 @@ use std::time::UNIX_EPOCH; use serde::Deserialize; use serde::Serialize; -use crate::EvalExpireTime; - -pub trait SeqValue> { - fn seq(&self) -> u64; - fn value(&self) -> Option<&V>; - fn into_value(self) -> Option; - fn meta(&self) -> Option<&KVMeta>; - - /// Return the expire time in millisecond since 1970. - fn get_expire_at_ms(&self) -> Option { - if let Some(meta) = self.meta() { - meta.get_expire_at_ms() - } else { - None - } - } - - /// Evaluate and returns the absolute expire time in millisecond since 1970. - fn eval_expire_at_ms(&self) -> u64 { - self.meta().eval_expire_at_ms() - } - - /// Return true if the record is expired. - fn is_expired(&self, now_ms: u64) -> bool { - self.eval_expire_at_ms() < now_ms - } -} - -/// The meta data of a record in kv -#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)] -pub struct KVMeta { - /// expiration time in second since 1970 - pub(crate) expire_at: Option, -} - -impl KVMeta { - /// Create a new KVMeta - pub fn new(expire_at: Option) -> Self { - Self { expire_at } - } - - /// Create a KVMeta with a absolute expiration time in second since 1970-01-01. - pub fn new_expire(expire_at: u64) -> Self { - Self { - expire_at: Some(expire_at), - } - } - - /// Returns expire time in millisecond since 1970. - pub fn get_expire_at_ms(&self) -> Option { - self.expire_at.map(|t| t * 1000) - } -} - -impl EvalExpireTime for KVMeta { - fn eval_expire_at_ms(&self) -> u64 { - match self.expire_at { - None => u64::MAX, - Some(exp_at_sec) => exp_at_sec * 1000, - } - } -} +use crate::KVMeta; +use crate::SeqValue; /// Some value bound with a seq number. /// @@ -152,8 +92,8 @@ impl SeqValue for Option> { } } -impl std::fmt::Debug for SeqV { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +impl fmt::Debug for SeqV { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { let mut de = f.debug_struct("SeqV"); de.field("seq", &self.seq); de.field("meta", &self.meta); @@ -163,25 +103,6 @@ impl std::fmt::Debug for SeqV { } } -pub trait IntoSeqV { - type Error; - fn into_seqv(self) -> Result, Self::Error>; -} - -impl IntoSeqV for SeqV -where V: TryInto -{ - type Error = >::Error; - - fn into_seqv(self) -> Result, Self::Error> { - Ok(SeqV { - seq: self.seq, - meta: self.meta, - data: self.data.try_into()?, - }) - } -} - impl From<(u64, T)> for SeqV { fn from((seq, data): (u64, T)) -> Self { Self { @@ -192,16 +113,6 @@ impl From<(u64, T)> for SeqV { } } -impl SeqV> { - pub const fn empty() -> Self { - Self { - seq: 0, - meta: None, - data: None, - } - } -} - impl SeqV { pub fn new(seq: u64, data: T) -> Self { Self { @@ -257,6 +168,7 @@ impl SeqV { self } + /// Convert data to type U and leave seq and meta unchanged. pub fn map(self, f: impl FnOnce(T) -> U) -> SeqV { SeqV { seq: self.seq, @@ -264,6 +176,14 @@ impl SeqV { data: f(self.data), } } -} -// TODO(1): test SeqValue for SeqV and Option + /// Try to convert data to type U and leave seq and meta unchanged. + /// `f` returns an error if the conversion fails. + pub fn try_map(self, f: impl FnOnce(T) -> Result) -> Result, E> { + Ok(SeqV { + seq: self.seq, + meta: self.meta, + data: f(self.data)?, + }) + } +} diff --git a/src/query/ee/src/background_service/background_service_handler.rs b/src/query/ee/src/background_service/background_service_handler.rs index 352f45c57477..6bd2df0ded50 100644 --- a/src/query/ee/src/background_service/background_service_handler.rs +++ b/src/query/ee/src/background_service/background_service_handler.rs @@ -37,6 +37,7 @@ use databend_common_meta_app::background::UpdateBackgroundJobStatusReq; use databend_common_meta_app::principal::UserIdentity; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::SeqV; use databend_common_users::UserApiProvider; use databend_enterprise_background_service::background_service::BackgroundServiceHandlerWrapper; use databend_enterprise_background_service::BackgroundServiceHandler; @@ -229,7 +230,7 @@ impl RealBackgroundService { meta: Arc, id: &BackgroundJobIdent, suspend: bool, - ) -> Result { + ) -> Result> { // create job if not exist let info = meta .get_background_job(GetBackgroundJobReq { name: id.clone() }) diff --git a/src/query/ee/src/background_service/compaction_job.rs b/src/query/ee/src/background_service/compaction_job.rs index 42bb84b9a6fb..4940d38f9c2a 100644 --- a/src/query/ee/src/background_service/compaction_job.rs +++ b/src/query/ee/src/background_service/compaction_job.rs @@ -44,6 +44,7 @@ use databend_common_meta_app::background::UpdateBackgroundTaskReq; use databend_common_meta_app::schema::TableStatistics; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::SeqV; use databend_common_users::UserApiProvider; use databend_query::sessions::QueryContext; use databend_query::sessions::Session; @@ -79,7 +80,7 @@ impl Job for CompactionJob { .expect("failed to do compaction job"); } - async fn get_info(&self) -> Result { + async fn get_info(&self) -> Result> { let job = self .meta_api .get_background_job(GetBackgroundJobReq { @@ -165,7 +166,7 @@ impl CompactionJob { let ctx = session.create_query_context().await?; let job_info = self.get_info().await?; - let (params, manual) = Self::sync_compact_params(&job_info).await; + let (params, manual) = Self::sync_compact_params(&job_info.data).await; // guarantee at least once for maunal job self.update_job_params(params).await?; @@ -308,7 +309,7 @@ impl CompactionJob { let job_info = self.get_info().await?; let id = Uuid::new_v4().to_string(); - let status = self.sync_compact_status(id.clone(), &job_info).await?; + let status = self.sync_compact_status(id.clone(), &job_info.data).await?; if status.is_none() { return Ok(()); } diff --git a/src/query/ee/src/background_service/job.rs b/src/query/ee/src/background_service/job.rs index 66c0c3f5fd72..7d25cb63fbc6 100644 --- a/src/query/ee/src/background_service/job.rs +++ b/src/query/ee/src/background_service/job.rs @@ -18,6 +18,7 @@ use databend_common_meta_app::background::BackgroundJobIdent; use databend_common_meta_app::background::BackgroundJobInfo; use databend_common_meta_app::background::BackgroundJobParams; use databend_common_meta_app::background::BackgroundJobStatus; +use databend_common_meta_types::SeqV; /// A trait for implementing a background job /// @@ -44,7 +45,7 @@ pub trait Job: JobClone { /// Runs the job async fn run(&mut self); fn get_name(&self) -> BackgroundJobIdent; - async fn get_info(&self) -> Result; + async fn get_info(&self) -> Result>; async fn update_job_status(&mut self, status: BackgroundJobStatus) -> Result<()>; async fn update_job_params(&mut self, param: BackgroundJobParams) -> Result<()>; } diff --git a/src/query/ee/tests/it/background_service/job_scheduler.rs b/src/query/ee/tests/it/background_service/job_scheduler.rs index ac27af20e767..0d29d3efbb98 100644 --- a/src/query/ee/tests/it/background_service/job_scheduler.rs +++ b/src/query/ee/tests/it/background_service/job_scheduler.rs @@ -29,13 +29,14 @@ use databend_common_meta_app::background::BackgroundJobParams; use databend_common_meta_app::background::BackgroundJobStatus; use databend_common_meta_app::principal::UserIdentity; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_types::SeqV; use databend_enterprise_query::background_service::Job; use databend_enterprise_query::background_service::JobScheduler; #[derive(Clone)] struct TestJob { counter: Arc, - info: BackgroundJobInfo, + info: SeqV, finish_tx: Arc>>, } @@ -60,7 +61,7 @@ impl Job for TestJob { let _ = self.finish_tx.clone().lock().await.send(1).await; } - async fn get_info(&self) -> Result { + async fn get_info(&self) -> Result> { Ok(self.info.clone()) } @@ -83,9 +84,12 @@ async fn test_one_shot_job() -> Result<()> { let counter = Arc::new(AtomicUsize::new(0)); let job = TestJob { counter: counter.clone(), - info: BackgroundJobInfo::new_compactor_job( - BackgroundJobParams::new_one_shot_job(), - UserIdentity::default(), + info: SeqV::new( + 0, + BackgroundJobInfo::new_compactor_job( + BackgroundJobParams::new_one_shot_job(), + UserIdentity::default(), + ), ), finish_tx: scheduler.finish_tx.clone(), }; @@ -106,9 +110,12 @@ async fn test_interval_job() -> Result<()> { let counter = Arc::new(AtomicUsize::new(0)); let job = TestJob { counter: counter.clone(), - info: BackgroundJobInfo::new_compactor_job( - BackgroundJobParams::new_interval_job(Duration::from_millis(10)), - UserIdentity::default(), + info: SeqV::new( + 0, + BackgroundJobInfo::new_compactor_job( + BackgroundJobParams::new_interval_job(Duration::from_millis(10)), + UserIdentity::default(), + ), ), finish_tx: scheduler.finish_tx.clone(), }; diff --git a/src/query/management/src/cluster/cluster_mgr.rs b/src/query/management/src/cluster/cluster_mgr.rs index 612ca6a12250..3e2f5bb3a790 100644 --- a/src/query/management/src/cluster/cluster_mgr.rs +++ b/src/query/management/src/cluster/cluster_mgr.rs @@ -22,11 +22,11 @@ use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaSpec; use databend_common_meta_types::NodeInfo; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use crate::cluster::ClusterApi; diff --git a/src/query/management/src/quota/quota_api.rs b/src/query/management/src/quota/quota_api.rs index 8d7a8330c5c2..477049b200ed 100644 --- a/src/query/management/src/quota/quota_api.rs +++ b/src/query/management/src/quota/quota_api.rs @@ -14,8 +14,8 @@ use databend_common_exception::Result; use databend_common_meta_app::tenant::TenantQuota; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait QuotaApi: Sync + Send { diff --git a/src/query/management/src/quota/quota_mgr.rs b/src/query/management/src/quota/quota_mgr.rs index fe5ba1e041e8..f8630830c62e 100644 --- a/src/query/management/src/quota/quota_mgr.rs +++ b/src/query/management/src/quota/quota_mgr.rs @@ -23,10 +23,10 @@ use databend_common_meta_app::tenant::TenantQuota; use databend_common_meta_app::tenant::TenantQuotaIdent; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; -use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use fastrace::func_name; diff --git a/src/query/management/src/role/role_api.rs b/src/query/management/src/role/role_api.rs index 7f8277027a26..6b46adfcdf94 100644 --- a/src/query/management/src/role/role_api.rs +++ b/src/query/management/src/role/role_api.rs @@ -17,8 +17,8 @@ use databend_common_meta_app::principal::OwnershipInfo; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::principal::RoleInfo; use databend_common_meta_kvapi::kvapi::ListKVReply; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait RoleApi: Sync + Send { diff --git a/src/query/management/src/role/role_mgr.rs b/src/query/management/src/role/role_mgr.rs index 91a3f2f81f07..a83c7d17f237 100644 --- a/src/query/management/src/role/role_mgr.rs +++ b/src/query/management/src/role/role_mgr.rs @@ -35,12 +35,12 @@ use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::ListKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::ConditionResult::Eq; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnRequest; use enumflags2::make_bitflags; use fastrace::func_name; diff --git a/src/query/management/src/serde/pb_serde.rs b/src/query/management/src/serde/pb_serde.rs index 1b48bfe6c09f..08031a571b3f 100644 --- a/src/query/management/src/serde/pb_serde.rs +++ b/src/query/management/src/serde/pb_serde.rs @@ -19,13 +19,13 @@ use databend_common_exception::Result; use databend_common_exception::ToErrorCode; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::InvalidReply; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaNetworkError; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use databend_common_proto_conv::FromToProto; use crate::serde::Quota; diff --git a/src/query/management/src/setting/setting_api.rs b/src/query/management/src/setting/setting_api.rs index a7f8d6e51be4..b16de19b589c 100644 --- a/src/query/management/src/setting/setting_api.rs +++ b/src/query/management/src/setting/setting_api.rs @@ -14,8 +14,8 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::UserSetting; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait SettingApi: Sync + Send { diff --git a/src/query/management/src/setting/setting_mgr.rs b/src/query/management/src/setting/setting_mgr.rs index 8863b0ff213e..6d310c2bf636 100644 --- a/src/query/management/src/setting/setting_mgr.rs +++ b/src/query/management/src/setting/setting_mgr.rs @@ -22,13 +22,12 @@ use databend_common_meta_app::tenant::Tenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::UpsertKVReq; -use databend_common_meta_types::IntoSeqV; +use databend_common_meta_types::seq_value::SeqV; +use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; use crate::setting::SettingApi; @@ -102,7 +101,7 @@ impl SettingApi for SettingMgr { })?; match seq.match_seq(&seq_value) { - Ok(_) => Ok(seq_value.into_seqv()?), + Ok(_) => Ok(seq_value.try_map(|d| d.try_into())?), Err(_) => Err(ErrorCode::UnknownVariable(format!( "Setting '{}' does not exist.", name diff --git a/src/query/management/src/udf/udf_mgr.rs b/src/query/management/src/udf/udf_mgr.rs index b333bd1a1158..683f8aef157d 100644 --- a/src/query/management/src/udf/udf_mgr.rs +++ b/src/query/management/src/udf/udf_mgr.rs @@ -25,9 +25,9 @@ use databend_common_meta_app::tenant::Tenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaError; -use databend_common_meta_types::SeqV; use databend_common_meta_types::With; use futures::TryStreamExt; diff --git a/src/query/management/src/user/user_api.rs b/src/query/management/src/user/user_api.rs index 8ecc2bc039ac..f9de4fa09e39 100644 --- a/src/query/management/src/user/user_api.rs +++ b/src/query/management/src/user/user_api.rs @@ -17,8 +17,8 @@ use databend_common_meta_app::principal::UserIdentity; use databend_common_meta_app::principal::UserInfo; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_kvapi::kvapi::ListKVReply; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait UserApi: Sync + Send { diff --git a/src/query/management/src/user/user_mgr.rs b/src/query/management/src/user/user_mgr.rs index c5d735fb8ca3..f58e073a9165 100644 --- a/src/query/management/src/user/user_mgr.rs +++ b/src/query/management/src/user/user_mgr.rs @@ -26,11 +26,11 @@ use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::ListKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use crate::serde::deserialize_struct; use crate::serde::serialize_struct; diff --git a/src/query/management/tests/it/cluster.rs b/src/query/management/tests/it/cluster.rs index f0ad4dadf678..db43eaf45b8c 100644 --- a/src/query/management/tests/it/cluster.rs +++ b/src/query/management/tests/it/cluster.rs @@ -21,9 +21,9 @@ use databend_common_management::*; use databend_common_meta_embedded::MetaEmbedded; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::NodeInfo; -use databend_common_meta_types::SeqV; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_successfully_add_node() -> Result<()> { diff --git a/src/query/management/tests/it/setting.rs b/src/query/management/tests/it/setting.rs index dd9340046139..2d42269c87f8 100644 --- a/src/query/management/tests/it/setting.rs +++ b/src/query/management/tests/it/setting.rs @@ -22,8 +22,8 @@ use databend_common_meta_app::principal::UserSettingValue; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_embedded::MetaEmbedded; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::SeqV; use fastrace::func_name; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/src/query/management/tests/it/stage.rs b/src/query/management/tests/it/stage.rs index 3893ed02f069..1f39f98f616b 100644 --- a/src/query/management/tests/it/stage.rs +++ b/src/query/management/tests/it/stage.rs @@ -27,7 +27,7 @@ use databend_common_meta_app::storage::StorageS3Config; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_embedded::MetaEmbedded; use databend_common_meta_kvapi::kvapi::KVApi; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use fastrace::func_name; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/src/query/management/tests/it/udf.rs b/src/query/management/tests/it/udf.rs index 007dd0ef3e06..253204f80c3c 100644 --- a/src/query/management/tests/it/udf.rs +++ b/src/query/management/tests/it/udf.rs @@ -26,8 +26,8 @@ use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_embedded::MetaEmbedded; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::SeqV; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_add_udf() -> Result<()> { diff --git a/src/query/management/tests/it/user.rs b/src/query/management/tests/it/user.rs index 994ea4036b12..0346b92e293e 100644 --- a/src/query/management/tests/it/user.rs +++ b/src/query/management/tests/it/user.rs @@ -29,10 +29,10 @@ use databend_common_meta_kvapi::kvapi::ListKVReply; use databend_common_meta_kvapi::kvapi::MGetKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaError; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use mockall::predicate::*; diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 1133f696b442..966a0375bb66 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -117,8 +117,8 @@ use databend_common_meta_app::schema::VirtualColumnMeta; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_store::MetaStoreProvider; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MetaId; -use databend_common_meta_types::SeqV; use fastrace::func_name; use log::info; diff --git a/src/query/service/src/catalogs/share/share_catalog.rs b/src/query/service/src/catalogs/share/share_catalog.rs index 2416b2b60639..9a18b8f7c4ad 100644 --- a/src/query/service/src/catalogs/share/share_catalog.rs +++ b/src/query/service/src/catalogs/share/share_catalog.rs @@ -118,8 +118,8 @@ use databend_common_meta_app::share::ShareDatabaseSpec; use databend_common_meta_app::share::ShareSpec; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MetaId; -use databend_common_meta_types::SeqV; use databend_common_sharing::ShareEndpointClient; use databend_common_storages_factory::StorageFactory; use databend_common_users::UserApiProvider; diff --git a/src/query/service/src/databases/information_schema/information_schema_database.rs b/src/query/service/src/databases/information_schema/information_schema_database.rs index 2b18237229ca..fef701e3a3c2 100644 --- a/src/query/service/src/databases/information_schema/information_schema_database.rs +++ b/src/query/service/src/databases/information_schema/information_schema_database.rs @@ -19,7 +19,7 @@ use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::DatabaseInfo; use databend_common_meta_app::schema::DatabaseMeta; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use databend_common_storages_information_schema::ColumnsTable; use databend_common_storages_information_schema::KeyColumnUsageTable; use databend_common_storages_information_schema::KeywordsTable; diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index 37c7ade365e5..7eb10ea42a4f 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -21,7 +21,7 @@ use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::DatabaseInfo; use databend_common_meta_app::schema::DatabaseMeta; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use databend_common_storages_system::BackgroundJobTable; use databend_common_storages_system::BackgroundTaskTable; use databend_common_storages_system::BacktraceTable; diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index f0bda370e562..c24e31a6c7f2 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -30,7 +30,7 @@ use databend_common_meta_app::principal::UserGrantSet; use databend_common_meta_app::principal::UserPrivilegeSet; use databend_common_meta_app::principal::UserPrivilegeType; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use databend_common_sql::binder::MutationType; use databend_common_sql::optimizer::get_udf_names; use databend_common_sql::plans::InsertInputSource; diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index d56799dbaf56..d60c81043795 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -131,8 +131,8 @@ use databend_common_meta_app::schema::UpsertTableOptionReply; use databend_common_meta_app::schema::UpsertTableOptionReq; use databend_common_meta_app::schema::VirtualColumnMeta; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MetaId; -use databend_common_meta_types::SeqV; use databend_common_pipeline_core::InputError; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::PlanProfile; diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 7f754ac48591..f5b82412bf62 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -132,8 +132,8 @@ use databend_common_meta_app::schema::UpsertTableOptionReply; use databend_common_meta_app::schema::UpsertTableOptionReq; use databend_common_meta_app::schema::VirtualColumnMeta; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MetaId; -use databend_common_meta_types::SeqV; use databend_common_pipeline_core::InputError; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::PlanProfile; diff --git a/src/query/storages/iceberg/src/catalog.rs b/src/query/storages/iceberg/src/catalog.rs index 338d316de089..30ef93445f49 100644 --- a/src/query/storages/iceberg/src/catalog.rs +++ b/src/query/storages/iceberg/src/catalog.rs @@ -106,8 +106,8 @@ use databend_common_meta_app::schema::UpsertTableOptionReq; use databend_common_meta_app::schema::VirtualColumnMeta; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MetaId; -use databend_common_meta_types::SeqV; use iceberg_catalog_hms::HmsCatalog; use iceberg_catalog_hms::HmsCatalogConfig; use iceberg_catalog_hms::HmsThriftTransport; diff --git a/src/query/storages/iceberg/src/database.rs b/src/query/storages/iceberg/src/database.rs index 4e47278d93e7..0d439740ad4b 100644 --- a/src/query/storages/iceberg/src/database.rs +++ b/src/query/storages/iceberg/src/database.rs @@ -26,7 +26,7 @@ use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::DatabaseInfo; use databend_common_meta_app::schema::DatabaseMeta; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_types::SeqV; +use databend_common_meta_types::seq_value::SeqV; use crate::table::IcebergTable; use crate::IcebergCatalog; diff --git a/src/query/storages/result_cache/src/meta_manager.rs b/src/query/storages/result_cache/src/meta_manager.rs index bc0c984c203c..eec8ddb26dbb 100644 --- a/src/query/storages/result_cache/src/meta_manager.rs +++ b/src/query/storages/result_cache/src/meta_manager.rs @@ -18,10 +18,10 @@ use std::time::Duration; use databend_common_exception::Result; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Operation; -use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use crate::common::ResultCacheValue; diff --git a/src/query/storages/result_cache/src/write/sink.rs b/src/query/storages/result_cache/src/write/sink.rs index 458fbc6e9c0e..5282c3e6393a 100644 --- a/src/query/storages/result_cache/src/write/sink.rs +++ b/src/query/storages/result_cache/src/write/sink.rs @@ -20,8 +20,8 @@ use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::TableSchemaRef; use databend_common_meta_store::MetaStore; +use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::SeqV; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_sinks::AsyncMpscSink;