diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index cec9967731fb..3c159e340ca1 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -383,6 +383,10 @@ build_exceptions! { // Share error codes(continue). ErrorShareEndpointCredential(3111), WrongSharePrivileges(3112), + + // dictionary + DictionaryAlreadyExists(3113), + UnknownDictionary(3114), } // Storage errors [3001, 4000]. diff --git a/src/meta/api/src/schema_api.rs b/src/meta/api/src/schema_api.rs index b562c9626874..2164316e8104 100644 --- a/src/meta/api/src/schema_api.rs +++ b/src/meta/api/src/schema_api.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent; use databend_common_meta_app::schema::CatalogInfo; use databend_common_meta_app::schema::CommitTableMetaReply; use databend_common_meta_app::schema::CommitTableMetaReq; @@ -21,6 +22,8 @@ use databend_common_meta_app::schema::CreateCatalogReply; use databend_common_meta_app::schema::CreateCatalogReq; use databend_common_meta_app::schema::CreateDatabaseReply; use databend_common_meta_app::schema::CreateDatabaseReq; +use databend_common_meta_app::schema::CreateDictionaryReply; +use databend_common_meta_app::schema::CreateDictionaryReq; use databend_common_meta_app::schema::CreateIndexReply; use databend_common_meta_app::schema::CreateIndexReq; use databend_common_meta_app::schema::CreateLockRevReply; @@ -33,6 +36,7 @@ use databend_common_meta_app::schema::CreateVirtualColumnReply; use databend_common_meta_app::schema::CreateVirtualColumnReq; use databend_common_meta_app::schema::DatabaseInfo; use databend_common_meta_app::schema::DeleteLockRevReq; +use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropCatalogReply; use databend_common_meta_app::schema::DropCatalogReq; use databend_common_meta_app::schema::DropDatabaseReply; @@ -50,6 +54,7 @@ use databend_common_meta_app::schema::GcDroppedTableReq; use databend_common_meta_app::schema::GcDroppedTableResp; use databend_common_meta_app::schema::GetCatalogReq; use databend_common_meta_app::schema::GetDatabaseReq; +use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; use databend_common_meta_app::schema::GetLVTReply; @@ -60,6 +65,7 @@ use databend_common_meta_app::schema::GetTableReq; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::ListCatalogReq; use databend_common_meta_app::schema::ListDatabaseReq; +use databend_common_meta_app::schema::ListDictionaryReq; use databend_common_meta_app::schema::ListDroppedTableReq; use databend_common_meta_app::schema::ListDroppedTableResp; use databend_common_meta_app::schema::ListIndexesByIdReq; @@ -87,6 +93,8 @@ use databend_common_meta_app::schema::UndropDatabaseReq; use databend_common_meta_app::schema::UndropTableByIdReq; use databend_common_meta_app::schema::UndropTableReply; use databend_common_meta_app::schema::UndropTableReq; +use databend_common_meta_app::schema::UpdateDictionaryReply; +use databend_common_meta_app::schema::UpdateDictionaryReq; use databend_common_meta_app::schema::UpdateIndexReply; use databend_common_meta_app::schema::UpdateIndexReq; use databend_common_meta_app::schema::UpdateMultiTableMetaReq; @@ -304,4 +312,30 @@ pub trait SchemaApi: Send + Sync { async fn get_table_lvt(&self, req: GetLVTReq) -> Result; fn name(&self) -> String; + + // dictionary + async fn create_dictionary( + &self, + req: CreateDictionaryReq, + ) -> Result; + + async fn update_dictionary( + &self, + req: UpdateDictionaryReq, + ) -> Result; + + async fn drop_dictionary( + &self, + dict_ident: TenantDictionaryIdent, + ) -> Result>, KVAppError>; + + async fn get_dictionary( + &self, + req: TenantDictionaryIdent, + ) -> Result, KVAppError>; + + async fn list_dictionaries( + &self, + req: ListDictionaryReq, + ) -> Result, KVAppError>; } diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 91d6f5fbc3fc..d54aba9785b4 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -32,6 +32,7 @@ use databend_common_meta_app::app_error::CreateDatabaseWithDropTime; use databend_common_meta_app::app_error::CreateIndexWithDropTime; use databend_common_meta_app::app_error::CreateTableWithDropTime; use databend_common_meta_app::app_error::DatabaseAlreadyExists; +use databend_common_meta_app::app_error::DictionaryAlreadyExists; use databend_common_meta_app::app_error::DropDbWithDropTime; use databend_common_meta_app::app_error::DropIndexWithDropTime; use databend_common_meta_app::app_error::DropTableWithDropTime; @@ -53,6 +54,7 @@ use databend_common_meta_app::app_error::UndropTableHasNoHistory; use databend_common_meta_app::app_error::UndropTableWithNoDropTime; use databend_common_meta_app::app_error::UnknownCatalog; use databend_common_meta_app::app_error::UnknownDatabaseId; +use databend_common_meta_app::app_error::UnknownDictionary; use databend_common_meta_app::app_error::UnknownIndex; use databend_common_meta_app::app_error::UnknownStreamId; use databend_common_meta_app::app_error::UnknownTable; @@ -64,6 +66,7 @@ use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw; +use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent; use databend_common_meta_app::schema::CatalogIdIdent; use databend_common_meta_app::schema::CatalogIdToNameIdent; use databend_common_meta_app::schema::CatalogInfo; @@ -75,6 +78,8 @@ use databend_common_meta_app::schema::CreateCatalogReply; use databend_common_meta_app::schema::CreateCatalogReq; use databend_common_meta_app::schema::CreateDatabaseReply; use databend_common_meta_app::schema::CreateDatabaseReq; +use databend_common_meta_app::schema::CreateDictionaryReply; +use databend_common_meta_app::schema::CreateDictionaryReq; use databend_common_meta_app::schema::CreateIndexReply; use databend_common_meta_app::schema::CreateIndexReq; use databend_common_meta_app::schema::CreateLockRevReply; @@ -97,6 +102,9 @@ use databend_common_meta_app::schema::DatabaseMeta; use databend_common_meta_app::schema::DatabaseType; use databend_common_meta_app::schema::DbIdList; use databend_common_meta_app::schema::DeleteLockRevReq; +use databend_common_meta_app::schema::DictionaryId; +use databend_common_meta_app::schema::DictionaryIdentity; +use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropCatalogReply; use databend_common_meta_app::schema::DropCatalogReq; use databend_common_meta_app::schema::DropDatabaseReply; @@ -115,6 +123,7 @@ use databend_common_meta_app::schema::GcDroppedTableReq; use databend_common_meta_app::schema::GcDroppedTableResp; use databend_common_meta_app::schema::GetCatalogReq; use databend_common_meta_app::schema::GetDatabaseReq; +use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; use databend_common_meta_app::schema::GetLVTReply; @@ -131,6 +140,7 @@ use databend_common_meta_app::schema::LeastVisibleTime; use databend_common_meta_app::schema::LeastVisibleTimeKey; use databend_common_meta_app::schema::ListCatalogReq; use databend_common_meta_app::schema::ListDatabaseReq; +use databend_common_meta_app::schema::ListDictionaryReq; use databend_common_meta_app::schema::ListDroppedTableReq; use databend_common_meta_app::schema::ListDroppedTableResp; use databend_common_meta_app::schema::ListIndexesByIdReq; @@ -170,6 +180,8 @@ use databend_common_meta_app::schema::UndropDatabaseReq; use databend_common_meta_app::schema::UndropTableByIdReq; use databend_common_meta_app::schema::UndropTableReply; use databend_common_meta_app::schema::UndropTableReq; +use databend_common_meta_app::schema::UpdateDictionaryReply; +use databend_common_meta_app::schema::UpdateDictionaryReq; use databend_common_meta_app::schema::UpdateIndexReply; use databend_common_meta_app::schema::UpdateIndexReq; use databend_common_meta_app::schema::UpdateMultiTableMetaReq; @@ -200,6 +212,7 @@ use databend_common_meta_types::txn_op::Request; use databend_common_meta_types::txn_op_response::Response; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::InvalidReply; +use databend_common_meta_types::KVMeta; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; @@ -4246,6 +4259,302 @@ impl + ?Sized> SchemaApi for KV { fn name(&self) -> String { "SchemaApiImpl".to_string() } + + // dictionary + #[logcall::logcall] + #[fastrace::trace] + async fn create_dictionary( + &self, + req: CreateDictionaryReq, + ) -> Result { + debug!(req :? = (&req); "SchemaApi: {}", func_name!()); + let mut trials = txn_backoff(None, func_name!()); + let dictionary_ident = &req.dictionary_ident; + loop { + trials.next().unwrap()?.await; + + let mut condition = vec![]; + let mut if_then = vec![]; + let res = get_dictionary_or_err(self, dictionary_ident).await?; + let (dictionary_id_seq, dictionary_id, _dictionary_meta_seq, _dictionary_meta) = res; + + debug!( + dictionary_id_seq = dictionary_id_seq, + dictionary_id = dictionary_id, + dictionary_ident :? = (dictionary_ident); + "get_dictionary_seq_id" + ); + + if dictionary_id_seq > 0 { + return Err(KVAppError::AppError(AppError::DictionaryAlreadyExists( + DictionaryAlreadyExists::new( + dictionary_ident.dict_name(), + format!( + "create dictionary with tenant: {} db_id: {}", + dictionary_ident.tenant_name(), + dictionary_ident.db_id() + ), + ), + ))); + } + // Create dictionary by inserting these record: + // (tenant, db_id, dict_name) -> dict_id + // (dict_id) -> dict_meta + let dictionary_id = fetch_id(self, IdGenerator::dictionary_id()).await?; + let id_key = DictionaryId { dictionary_id }; + + debug!( + dictionary_id = dictionary_id, + dictionary_key :? = (dictionary_ident); + "new dictionary" + ); + + { + condition.extend(vec![ + txn_cond_seq(dictionary_ident, Eq, 0), + txn_cond_seq(&id_key, Eq, 0), + ]); + if_then.extend(vec![ + txn_op_put(dictionary_ident, serialize_u64(dictionary_id)?), /*(tenant, db_id, dict_name) -> dict_id */ + txn_op_put(&id_key, serialize_struct(&req.dictionary_meta)?), /*(dict_id) -> dict_meta*/ + ]); + + let txn_req = TxnRequest { + condition, + if_then, + else_then: vec![], + }; + + let (succ, _responses) = send_txn(self, txn_req).await?; + + debug!( + dictionary_ident :? = (dictionary_ident), + id :? = (&id_key), + succ = succ; + "create_dictionary" + ); + + if succ { + return Ok(CreateDictionaryReply { dictionary_id }); + } + } + } + } + + #[logcall::logcall] + #[fastrace::trace] + async fn update_dictionary( + &self, + req: UpdateDictionaryReq, + ) -> Result { + debug!(req :? = (&req); "SchemaApi: {}", func_name!()); + let mut trials = txn_backoff(None, func_name!()); + let dictionary_ident = &req.dictionary_ident; + loop { + trials.next().unwrap()?.await; + + let mut condition = vec![]; + let mut if_then = vec![]; + let res = get_dictionary_or_err(self, dictionary_ident).await?; + let (dictionary_id_seq, dictionary_id, dictionary_meta_seq, _dictionary_meta) = res; + + debug!( + dictionary_id_seq = dictionary_id_seq, + dictionary_id = dictionary_id, + dictionary_ident :? = (dictionary_ident); + "get_dictionary_seq_id" + ); + + if dictionary_id_seq == 0 { + return Err(KVAppError::AppError(AppError::UnknownDictionary( + UnknownDictionary::new( + dictionary_ident.dict_name(), + format!( + "update dictionary with tenant: {} db_id: {}", + dictionary_ident.tenant_name(), + dictionary_ident.db_id() + ), + ), + ))); + } + // Update dictionary by update dict_meta: + // (dict_id) -> dict_meta + let id_key = DictionaryId { dictionary_id }; + + debug!( + dictionary_id = dictionary_id, + dictionary_key :? = (dictionary_ident); + "update dictionary" + ); + + { + condition.extend(vec![ + txn_cond_seq(dictionary_ident, Eq, dictionary_id_seq), + txn_cond_seq(&id_key, Eq, dictionary_meta_seq), + ]); + if_then.extend(vec![ + txn_op_put(dictionary_ident, serialize_u64(dictionary_id)?), /*(tenant, db_id, dict_name) -> dict_id */ + txn_op_put(&id_key, serialize_struct(&req.dictionary_meta)?), /*(dict_id) -> dict_meta*/ + ]); + + let txn_req = TxnRequest { + condition, + if_then, + else_then: vec![], + }; + + let (succ, _responses) = send_txn(self, txn_req).await?; + + debug!( + dictionary_ident :? = (dictionary_ident), + id :? = (&id_key), + succ = succ; + "update_dictionary" + ); + + if succ { + return Ok(UpdateDictionaryReply { dictionary_id }); + } + } + } + } + + #[logcall::logcall] + #[fastrace::trace] + async fn drop_dictionary( + &self, + dictionary_ident: TenantDictionaryIdent, + ) -> Result>, KVAppError> { + debug!(dict_ident :? =(&dictionary_ident); "SchemaApi: {}", func_name!()); + + let mut trials = txn_backoff(None, func_name!()); + loop { + trials.next().unwrap()?.await; + + let mut condition = vec![]; + let mut if_then = vec![]; + + let res: (u64, u64, u64, Option) = + get_dictionary_or_err(self, &dictionary_ident).await?; + let (dictionary_id_seq, dictionary_id, dictionary_meta_seq, dictionary_meta) = res; + + if dictionary_id_seq == 0 { + return Ok(None); + } + + // delete dictionary id + // (tenant, db_id, dict_name) -> dict_id + condition.push(txn_cond_seq(&dictionary_ident, Eq, dictionary_id_seq)); + if_then.push(txn_op_del(&dictionary_ident)); + // delete dictionary meta + // (dictionary_id) -> dictionary_meta + let id_key = DictionaryId { dictionary_id }; + condition.push(txn_cond_seq(&id_key, Eq, dictionary_meta_seq)); + if_then.push(txn_op_del(&id_key)); + + let txn_req = TxnRequest { + condition, + if_then, + else_then: vec![], + }; + + let (succ, _responses) = send_txn(self, txn_req).await?; + + debug!( + name :? = (&dictionary_ident), + id :? = (&id_key), + succ = succ; + "drop_dictionary" + ); + + if succ { + let dict_meta_seqv = SeqV { + seq: dictionary_meta_seq, + meta: Some(KVMeta::new(None)), + data: dictionary_meta.unwrap(), + }; + return Ok(Some(dict_meta_seqv)); + } + } + } + + #[logcall::logcall] + #[fastrace::trace] + async fn get_dictionary( + &self, + dictionary_ident: TenantDictionaryIdent, + ) -> Result, KVAppError> { + debug!(dict_ident :? =(&dictionary_ident); "SchemaApi: {}", func_name!()); + + let res = get_dictionary_or_err(self, &dictionary_ident).await?; + let (dictionary_id_seq, dictionary_id, dictionary_meta_seq, dictionary_meta) = res; + + if dictionary_id_seq == 0 { + return Ok(None); + } + + // Safe unwrap(): dictionary_meta_seq > 0 implies dictionary_meta is not None. + let dictionary_meta = dictionary_meta.unwrap(); + + debug!( + dictionary_id = dictionary_id, + name_key :? =(&dictionary_ident); + "get_dictionary" + ); + + Ok(Some(GetDictionaryReply { + dictionary_id, + dictionary_meta, + dictionary_meta_seq, + })) + } + + #[logcall::logcall] + #[fastrace::trace] + async fn list_dictionaries( + &self, + req: ListDictionaryReq, + ) -> Result, KVAppError> { + debug!(req :? =(&req); "SchemaApi: {}", func_name!()); + + // Using a empty dictionary name to to list all + let dictionary_ident = TenantDictionaryIdent::new( + req.tenant.clone(), + DictionaryIdentity::new(req.db_id, "".to_string()), + ); + + let (dict_keys, dict_id_list) = list_u64_value(self, &dictionary_ident).await?; + if dict_id_list.is_empty() { + return Ok(vec![]); + } + let mut dict_metas = vec![]; + let inner_keys: Vec = dict_id_list + .iter() + .map(|dict_id| { + DictionaryId { + dictionary_id: *dict_id, + } + .to_string_key() + }) + .collect(); + let mut dict_id_list_iter = dict_id_list.into_iter(); + let mut dict_key_list_iter = dict_keys.into_iter(); + for c in inner_keys.chunks(DEFAULT_MGET_SIZE) { + let dict_meta_seq_meta_vec: Vec<(u64, Option)> = + mget_pb_values(self, c).await?; + for (dict_meta_seq, dict_meta) in dict_meta_seq_meta_vec { + let dict_id = dict_id_list_iter.next().unwrap(); + let dict_key = dict_key_list_iter.next().unwrap(); + if dict_meta_seq == 0 || dict_meta.is_none() { + error!("list_dictionaries cannot find {:?} dict_meta", dict_id); + continue; + } + let dict_meta = dict_meta.unwrap(); + dict_metas.push((dict_key.dict_name(), dict_meta)); + } + } + Ok(dict_metas) + } } async fn construct_drop_virtual_column_txn_operations( @@ -5185,6 +5494,26 @@ pub(crate) async fn get_index_or_err( Ok((index_id_seq, index_id, index_meta_seq, index_meta)) } +/// Returns (dictionary_id_seq, dictionary_id, dictionary_meta_seq, dictionary_meta) +pub(crate) async fn get_dictionary_or_err( + kv_api: &(impl kvapi::KVApi + ?Sized), + name_key: &TenantDictionaryIdent, +) -> Result<(u64, u64, u64, Option), KVAppError> { + let (dictionary_id_seq, dictionary_id) = get_u64_value(kv_api, name_key).await?; + if dictionary_id_seq == 0 { + return Ok((0, 0, 0, None)); + } + let id_key = DictionaryId { dictionary_id }; + let (dictionary_meta_seq, dictionary_meta) = get_pb_value(kv_api, &id_key).await?; + + Ok(( + dictionary_id_seq, + dictionary_id, + dictionary_meta_seq, + dictionary_meta, + )) +} + async fn gc_dropped_db_by_id( kv_api: &(impl kvapi::KVApi + ?Sized), db_id: u64, diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 1af7b54e0cf1..cf4903e749ec 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -41,6 +41,7 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw; +use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent; use databend_common_meta_app::schema::CatalogMeta; use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::CatalogOption; @@ -48,6 +49,7 @@ use databend_common_meta_app::schema::CommitTableMetaReq; use databend_common_meta_app::schema::CreateCatalogReq; use databend_common_meta_app::schema::CreateDatabaseReply; use databend_common_meta_app::schema::CreateDatabaseReq; +use databend_common_meta_app::schema::CreateDictionaryReq; use databend_common_meta_app::schema::CreateIndexReq; use databend_common_meta_app::schema::CreateLockRevReq; use databend_common_meta_app::schema::CreateOption; @@ -63,6 +65,8 @@ use databend_common_meta_app::schema::DatabaseInfo; use databend_common_meta_app::schema::DatabaseMeta; use databend_common_meta_app::schema::DbIdList; use databend_common_meta_app::schema::DeleteLockRevReq; +use databend_common_meta_app::schema::DictionaryIdentity; +use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropCatalogReq; use databend_common_meta_app::schema::DropDatabaseReq; use databend_common_meta_app::schema::DropIndexReq; @@ -91,6 +95,7 @@ use databend_common_meta_app::schema::IndexNameIdentRaw; use databend_common_meta_app::schema::IndexType; use databend_common_meta_app::schema::ListCatalogReq; use databend_common_meta_app::schema::ListDatabaseReq; +use databend_common_meta_app::schema::ListDictionaryReq; use databend_common_meta_app::schema::ListDroppedTableReq; use databend_common_meta_app::schema::ListIndexesByIdReq; use databend_common_meta_app::schema::ListIndexesReq; @@ -119,6 +124,7 @@ use databend_common_meta_app::schema::TableStatistics; use databend_common_meta_app::schema::TruncateTableReq; use databend_common_meta_app::schema::UndropDatabaseReq; use databend_common_meta_app::schema::UndropTableReq; +use databend_common_meta_app::schema::UpdateDictionaryReq; use databend_common_meta_app::schema::UpdateMultiTableMetaReq; use databend_common_meta_app::schema::UpdateTableMetaReq; use databend_common_meta_app::schema::UpdateVirtualColumnReq; @@ -363,6 +369,8 @@ impl SchemaApiTestSuite { suite.get_db_name_by_id(&b.build().await).await?; suite.test_sequence(&b.build().await).await?; + suite.dictionary_create_list_drop(&b.build().await).await?; + Ok(()) } @@ -1385,10 +1393,13 @@ impl SchemaApiTestSuite { #[fastrace::trace] async fn catalog_create_get_list_drop(&self, mt: &MT) -> anyhow::Result<()> { let tenant_name = "tenant1"; - let tenant = Tenant::new_literal(tenant_name); let catalog_name = "catalog1"; + let tenant = Tenant { + tenant: tenant_name.to_string(), + }; + let ident = CatalogNameIdent::new(tenant.clone(), catalog_name); info!("--- create catalog1"); @@ -7349,6 +7360,174 @@ impl SchemaApiTestSuite { Ok(()) } + + #[fastrace::trace] + async fn dictionary_create_list_drop(&self, mt: &MT) -> anyhow::Result<()> + where MT: SchemaApi + kvapi::AsKVApi { + let tenant_name = "tenant1"; + let db_name = "db1"; + let tbl_name = "tb2"; + let dict_name1 = "dict1"; + let dict_name2 = "dict2"; + let dict_tenant = Tenant::new_or_err(tenant_name.to_string(), func_name!())?; + + let mut util = Util::new(mt, tenant_name, db_name, tbl_name, "eng1"); + let dict_id; + + info!("--- prepare db"); + { + util.create_db().await?; + } + + let db_name_ident = + DatabaseNameIdent::new(Tenant::new_literal(tenant_name), db_name.to_string()); + let get_db_req = GetDatabaseReq { + inner: db_name_ident.clone(), + }; + let db_info = mt.get_database(get_db_req).await?; + let db_id = db_info.ident.db_id; + + let schema = || { + Arc::new(TableSchema::new(vec![ + TableField::new("id", TableDataType::Number(NumberDataType::UInt64)), + TableField::new("name", TableDataType::String), + ])) + }; + let options = || { + maplit::btreemap! { + "host".into() => "0.0.0.0".into(), + "port".into() => "3306".into(), + "username".into() => "root".into(), + "password".into() => "1234".into(), + "db".into() => "test".into() + } + }; + + let dictionary_meta = |source: &str| DictionaryMeta { + source: source.to_string(), + schema: schema(), + options: options(), + created_on: Utc::now(), + ..DictionaryMeta::default() + }; + + { + info!("--- list dictionary with no create before"); + let req = ListDictionaryReq::new(dict_tenant.clone(), db_id); + let res = mt.list_dictionaries(req).await?; + assert!(res.is_empty()); + } + + let dict_ident1 = TenantDictionaryIdent::new( + dict_tenant.clone(), + DictionaryIdentity::new(db_id, dict_name1.to_string()), + ); + let dict_ident2 = TenantDictionaryIdent::new( + dict_tenant.clone(), + DictionaryIdentity::new(db_id, dict_name2.to_string()), + ); + + { + info!("--- create dictionary"); + let req = CreateDictionaryReq { + dictionary_ident: dict_ident1.clone(), + dictionary_meta: dictionary_meta("mysql"), + }; + let res = mt.create_dictionary(req).await; + assert!(res.is_ok()); + dict_id = res.unwrap().dictionary_id; + } + + { + info!("--- create dictionary again"); + let req = CreateDictionaryReq { + dictionary_ident: dict_ident1.clone(), + dictionary_meta: dictionary_meta("mysql"), + }; + let res = mt.create_dictionary(req).await; + assert!(res.is_err()); + let status = res.err().unwrap(); + let err_code = ErrorCode::from(status); + + assert_eq!(ErrorCode::DICTIONARY_ALREADY_EXISTS, err_code.code()); + } + + { + info!("--- get dictionary"); + let req = dict_ident1.clone(); + let res = mt.get_dictionary(req).await?; + assert!(res.is_some()); + let dict_reply = res.unwrap(); + assert_eq!(dict_reply.dictionary_id, dict_id); + assert_eq!(dict_reply.dictionary_meta.source, "mysql".to_string()); + + let req = dict_ident2.clone(); + let res = mt.get_dictionary(req).await?; + assert!(res.is_none()); + } + + { + info!("--- update dictionary"); + let req = UpdateDictionaryReq { + dictionary_ident: dict_ident1.clone(), + dictionary_meta: dictionary_meta("postgresql"), + }; + let res = mt.update_dictionary(req).await; + assert!(res.is_ok()); + + let req = dict_ident1.clone(); + let res = mt.get_dictionary(req).await?; + assert!(res.is_some()); + let dict_reply = res.unwrap(); + assert_eq!(dict_reply.dictionary_id, dict_id); + assert_eq!(dict_reply.dictionary_meta.source, "postgresql".to_string()); + } + + { + info!("--- update unknown dictionary"); + let req = UpdateDictionaryReq { + dictionary_ident: dict_ident2.clone(), + dictionary_meta: dictionary_meta("postgresql"), + }; + let res = mt.update_dictionary(req).await; + assert!(res.is_err()); + let status = res.err().unwrap(); + let err_code = ErrorCode::from(status); + + assert_eq!(ErrorCode::UNKNOWN_DICTIONARY, err_code.code()); + } + + { + info!("--- list dictionary"); + let req = ListDictionaryReq::new(dict_tenant.clone(), db_id); + let res = mt.list_dictionaries(req).await?; + + assert_eq!(1, res.len()); + } + + { + info!("--- drop dictionary"); + let req = dict_ident1.clone(); + let res = mt.drop_dictionary(req).await?; + assert!(res.is_some()); + } + + { + info!("--- drop unknown dictionary"); + let req = dict_ident2.clone(); + let res = mt.drop_dictionary(req).await?; + assert!(res.is_none()); + } + + { + info!("--- list dictionary after drop one"); + let req = ListDictionaryReq::new(dict_tenant.clone(), db_id); + let res = mt.list_dictionaries(req).await?; + assert_eq!(0, res.len()); + } + + Ok(()) + } } struct Util<'a, MT> diff --git a/src/meta/app/src/app_error.rs b/src/meta/app/src/app_error.rs index 896fdbd1f753..9b7c52885e30 100644 --- a/src/meta/app/src/app_error.rs +++ b/src/meta/app/src/app_error.rs @@ -1075,6 +1075,38 @@ impl WrongSequenceCount { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, thiserror::Error)] +#[error("DictionaryAlreadyExists: `{dictionary_name}` while `{context}`")] +pub struct DictionaryAlreadyExists { + dictionary_name: String, + context: String, +} + +impl DictionaryAlreadyExists { + pub fn new(dictionary_name: impl Into, context: impl Into) -> Self { + Self { + dictionary_name: dictionary_name.into(), + context: context.into(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, thiserror::Error)] +#[error("UnknownDictionary: `{dictionary_name}` while `{context}`")] +pub struct UnknownDictionary { + dictionary_name: String, + context: String, +} + +impl UnknownDictionary { + pub fn new(dictionary_name: impl Into, context: impl Into) -> Self { + Self { + dictionary_name: dictionary_name.into(), + context: context.into(), + } + } +} + /// Application error. /// /// The application does not get expected result but there is nothing wrong with meta-service. @@ -1264,6 +1296,13 @@ pub enum AppError { #[error(transparent)] UpdateStreamMetasFailed(#[from] UpdateStreamMetasFailed), + + // dictionary + #[error(transparent)] + DictionaryAlreadyExists(#[from] DictionaryAlreadyExists), + + #[error(transparent)] + UnknownDictionary(#[from] UnknownDictionary), } #[derive(thiserror::Error, serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] @@ -1707,6 +1746,19 @@ impl AppErrorMessage for SequenceError { } } +// dictionary +impl AppErrorMessage for DictionaryAlreadyExists { + fn message(&self) -> String { + format!("dictionary '{}' already exists", self.dictionary_name) + } +} + +impl AppErrorMessage for UnknownDictionary { + fn message(&self) -> String { + format!("Unknown dictionary '{}'", self.dictionary_name) + } +} + impl From for ErrorCode { fn from(app_err: AppError) -> Self { match app_err { @@ -1817,6 +1869,11 @@ impl From for ErrorCode { } AppError::SequenceError(err) => ErrorCode::SequenceError(err.message()), AppError::UpdateStreamMetasFailed(e) => ErrorCode::UnresolvableConflict(e.message()), + // dictionary + AppError::DictionaryAlreadyExists(err) => { + ErrorCode::DictionaryAlreadyExists(err.message()) + } + AppError::UnknownDictionary(err) => ErrorCode::UnknownDictionary(err.message()), } } } diff --git a/src/meta/app/src/id_generator.rs b/src/meta/app/src/id_generator.rs index 6a0fe4df1692..6d7374ad136e 100644 --- a/src/meta/app/src/id_generator.rs +++ b/src/meta/app/src/id_generator.rs @@ -20,6 +20,7 @@ pub(crate) const ID_GEN_TABLE: &str = "table_id"; pub(crate) const ID_GEN_DATABASE: &str = "database_id"; pub(crate) const ID_GEN_TABLE_LOCK: &str = "table_lock_id"; pub(crate) const ID_GEN_INDEX: &str = "index_id"; +pub(crate) const ID_GEN_DICTIONARY: &str = "dictionary_id"; pub(crate) const ID_GEN_CATALOG: &str = "catalog_id"; @@ -54,6 +55,13 @@ impl IdGenerator { } } + /// Create a key for generating dictionary id with kvapi::KVApi + pub fn dictionary_id() -> Self { + Self { + resource: ID_GEN_DICTIONARY.to_string(), + } + } + /// Create a key for generating share id with kvapi::KVApi pub fn share_id() -> Self { Self { diff --git a/src/meta/app/src/schema/dictionary.rs b/src/meta/app/src/schema/dictionary.rs new file mode 100644 index 000000000000..45510ce65c40 --- /dev/null +++ b/src/meta/app/src/schema/dictionary.rs @@ -0,0 +1,187 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::fmt; +use std::collections::BTreeMap; +use std::fmt::Display; +use std::fmt::Formatter; +use std::sync::Arc; + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression::TableSchema; + +use super::tenant_dictionary_ident::TenantDictionaryIdent; +use crate::tenant::Tenant; +use crate::tenant::ToTenant; + +/// Represents the metadata of a dictionary within the system. +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub struct DictionaryMeta { + /// The source of the dictionary, which specifies where the dictionary data comes from, like `MySQL`. + pub source: String, + /// Specify the configuration related to the data source in the form of key-value pairs. + /// For example, `host='localhost' user='root' password='1234'` + pub options: BTreeMap, + /// Schema refers to an external table that corresponds to the dictionary. + /// This is typically used to understand the layout and types of data within the dictionary. + pub schema: Arc, + /// A set of key-value pairs is used to represent the annotations for each field in the dictionary, the key being column_id. + /// For example, if we have `id, address` fields, then field_comments could be `[ '1=student's number','2=home address']` + pub field_comments: BTreeMap, + /// A list of primary column IDs. + /// For example, vec![1, 2] indicating the first and second columns are the primary keys. + pub primary_column_ids: Vec, + /// A general comment string that can be used to provide additional notes or information about the dictionary. + pub comment: String, + /// The timestamp indicating when the dictionary was created, in Coordinated Universal Time (UTC). + pub created_on: DateTime, + /// if used in CreateDictionaryReq, + /// `updated_on` MUST set to None. + pub updated_on: Option>, +} + +impl Display for DictionaryMeta { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Source: {}={:?}, Schema: {:?}, Primary_Column_Id: {:?}, CreatedOn: {:?}", + self.source, self.options, self.schema, self.primary_column_ids, self.created_on + ) + } +} + +impl Default for DictionaryMeta { + fn default() -> Self { + DictionaryMeta { + source: "".to_string(), + options: BTreeMap::new(), + schema: Arc::new(TableSchema::empty()), + primary_column_ids: Vec::new(), + created_on: Utc::now(), + updated_on: None, + comment: "".to_string(), + field_comments: BTreeMap::new(), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CreateDictionaryReq { + pub dictionary_ident: TenantDictionaryIdent, + pub dictionary_meta: DictionaryMeta, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub struct CreateDictionaryReply { + pub dictionary_id: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GetDictionaryReply { + pub dictionary_id: u64, + pub dictionary_meta: DictionaryMeta, + /// Any change to a dictionary causes the seq to increment + pub dictionary_meta_seq: u64, +} + +#[derive(Clone, Debug, Eq, PartialEq, Default)] +pub struct DictionaryId { + pub dictionary_id: u64, +} + +impl DictionaryId { + pub fn new(dictionary_id: u64) -> DictionaryId { + DictionaryId { dictionary_id } + } +} + +impl Display for DictionaryId { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DictionaryId{{{}}}", self.dictionary_id) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ListDictionaryReq { + pub tenant: Tenant, + pub db_id: u64, +} + +impl ListDictionaryReq { + pub fn new(tenant: impl ToTenant, db_id: u64) -> ListDictionaryReq { + ListDictionaryReq { + tenant: tenant.to_tenant(), + db_id, + } + } + + pub fn db_id(&self) -> u64 { + self.db_id + } + + pub fn tenant(&self) -> String { + self.tenant.tenant_name().to_string() + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct UpdateDictionaryReq { + pub dictionary_meta: DictionaryMeta, + pub dictionary_ident: TenantDictionaryIdent, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct UpdateDictionaryReply { + pub dictionary_id: u64, +} + +mod kvapi_key_impl { + + use databend_common_meta_kvapi::kvapi; + + use super::DictionaryId; + use super::DictionaryMeta; + + impl kvapi::KeyCodec for DictionaryId { + fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { + b.push_u64(self.dictionary_id) + } + + fn decode_key(parser: &mut kvapi::KeyParser) -> Result + where Self: Sized { + let dict_id = parser.next_u64()?; + Ok(Self { + dictionary_id: dict_id, + }) + } + } + + /// "/" + impl kvapi::Key for DictionaryId { + const PREFIX: &'static str = "__fd_dictionary_by_id"; + + type ValueType = DictionaryMeta; + + fn parent(&self) -> Option { + None + } + } + + impl kvapi::Value for DictionaryMeta { + fn dependency_keys(&self) -> impl IntoIterator { + [] + } + } +} diff --git a/src/meta/app/src/schema/dictionary_identity.rs b/src/meta/app/src/schema/dictionary_identity.rs new file mode 100644 index 000000000000..cd268d3bbb40 --- /dev/null +++ b/src/meta/app/src/schema/dictionary_identity.rs @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Uniquely identifies a dictionary with a db_id and a dict_name +#[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] +pub struct DictionaryIdentity { + pub db_id: u64, + pub dict_name: String, +} + +impl DictionaryIdentity { + pub fn new(db_id: u64, dict_name: impl ToString) -> Self { + Self { + db_id, + dict_name: dict_name.to_string(), + } + } +} + +mod kvapi_key_impl { + + use databend_common_meta_kvapi::kvapi; + + use super::DictionaryIdentity; + + impl kvapi::KeyCodec for DictionaryIdentity { + fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { + b.push_u64(self.db_id).push_str(&self.dict_name) + } + + fn decode_key(parser: &mut kvapi::KeyParser) -> Result + where Self: Sized { + let db_id = parser.next_u64()?; + let dict_name = parser.next_str()?; + Ok(Self { db_id, dict_name }) + } + } +} diff --git a/src/meta/app/src/schema/mod.rs b/src/meta/app/src/schema/mod.rs index f7508a51dba7..0a36a14b4fcc 100644 --- a/src/meta/app/src/schema/mod.rs +++ b/src/meta/app/src/schema/mod.rs @@ -22,10 +22,13 @@ pub mod database_id_history_ident; pub mod database_name_ident; pub mod index_name_ident; pub mod table_lock_ident; +pub mod tenant_dictionary_ident; pub mod virtual_column_ident; mod create_option; mod database; +mod dictionary; +mod dictionary_identity; mod index; mod least_visible_time; mod lock; @@ -60,6 +63,8 @@ pub use database::ShareDbId; pub use database::UndropDatabaseReply; pub use database::UndropDatabaseReq; pub use database_id_history_ident::DatabaseIdHistoryIdent; +pub use dictionary::*; +pub use dictionary_identity::DictionaryIdentity; pub use index::*; pub use index_name_ident::IndexNameIdent; pub use index_name_ident::IndexNameIdentRaw; diff --git a/src/meta/app/src/schema/tenant_dictionary_ident.rs b/src/meta/app/src/schema/tenant_dictionary_ident.rs new file mode 100644 index 000000000000..e133769c094d --- /dev/null +++ b/src/meta/app/src/schema/tenant_dictionary_ident.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::DictionaryIdentity; +use crate::tenant::ToTenant; +use crate::tenant_key::ident::TIdent; +use crate::KeyWithTenant; + +/// A dictionary identity belonging to a tenant. +pub type TenantDictionaryIdent = TIdent; +pub type TenantDictionaryIdentRaw = TIdent; + +pub use kvapi_impl::Resource; + +impl TenantDictionaryIdent { + pub fn new(tenant: impl ToTenant, dictionary: DictionaryIdentity) -> Self { + Self::new_generic(tenant, dictionary) + } + + pub fn dict_name(&self) -> String { + self.name().dict_name.clone() + } + + pub fn db_id(&self) -> u64 { + self.name().db_id + } + + pub fn tenant_name(&self) -> &str { + self.tenant().tenant_name() + } +} + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + + use crate::schema::DictionaryId; + use crate::tenant_key::resource::TenantResource; + + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_dictionaries"; + const TYPE: &'static str = "TenantDictionaryIdent"; + const HAS_TENANT: bool = true; + type ValueType = DictionaryId; + } + + impl kvapi::Value for DictionaryId { + fn dependency_keys(&self) -> impl IntoIterator { + [] + } + } +} diff --git a/src/meta/proto-conv/src/dictionary_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/dictionary_from_to_protobuf_impl.rs new file mode 100644 index 000000000000..b3280585cb1d --- /dev/null +++ b/src/meta/proto-conv/src/dictionary_from_to_protobuf_impl.rs @@ -0,0 +1,73 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression as ex; +use databend_common_meta_app::schema as mt; +use databend_common_protos::pb; + +use crate::reader_check_msg; +use crate::FromToProto; +use crate::Incompatible; +use crate::MIN_READER_VER; +use crate::VER; + +impl FromToProto for mt::DictionaryMeta { + type PB = pb::DictionaryMeta; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + fn from_pb(p: pb::DictionaryMeta) -> Result + where Self: Sized { + reader_check_msg(p.ver, p.min_reader_ver)?; + let schema = p.schema.ok_or_else(|| Incompatible { + reason: "DictionaryMeta.schema can not be None".to_string(), + })?; + let v = Self { + source: p.source, + options: p.options, + schema: Arc::new(ex::TableSchema::from_pb(schema)?), + primary_column_ids: p.primary_column_ids, + comment: p.comment, + created_on: DateTime::::from_pb(p.created_on)?, + updated_on: match p.updated_on { + Some(update_on) => Some(DateTime::::from_pb(update_on)?), + None => None, + }, + field_comments: p.field_comments, + }; + Ok(v) + } + fn to_pb(&self) -> Result { + let p = pb::DictionaryMeta { + ver: VER, + min_reader_ver: MIN_READER_VER, + source: self.source.clone(), + options: self.options.clone(), + primary_column_ids: self.primary_column_ids.clone(), + created_on: self.created_on.to_pb()?, + updated_on: match self.updated_on { + Some(updated_on) => Some(updated_on.to_pb()?), + None => None, + }, + comment: self.comment.clone(), + schema: Some(self.schema.to_pb()?), + field_comments: self.field_comments.clone(), + }; + Ok(p) + } +} diff --git a/src/meta/proto-conv/src/lib.rs b/src/meta/proto-conv/src/lib.rs index c01f66cd453e..a241842e24d1 100644 --- a/src/meta/proto-conv/src/lib.rs +++ b/src/meta/proto-conv/src/lib.rs @@ -68,6 +68,7 @@ mod connection_from_to_protobuf_impl; mod data_mask_from_to_protobuf_impl; mod database_from_to_protobuf_impl; mod datetime_from_to_protobuf_impl; +mod dictionary_from_to_protobuf_impl; mod file_format_from_to_protobuf_impl; mod from_to_protobuf; mod index_from_to_protobuf_impl; diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index b87c1c388046..debcb299c2b1 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -134,6 +134,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (102, "2024-07-11: Add: UserOption add must_change_password, AuthInfo.Password add need_change"), (103, "2024-07-31: Add: ShareMetaV2"), (104, "2024-08-02: Add: add share catalog into Catalog meta"), + (105, "2024-08-05: Add: add Dictionary meta"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 4c1e602b8d3f..6923d893a56a 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -108,3 +108,4 @@ mod v101_database_meta; mod v102_user_must_change_password; mod v103_share_meta_v2; mod v104_share_catalog; +mod v105_dictionary_meta; diff --git a/src/meta/proto-conv/tests/it/v105_dictionary_meta.rs b/src/meta/proto-conv/tests/it/v105_dictionary_meta.rs new file mode 100644 index 000000000000..7eb6028c5c66 --- /dev/null +++ b/src/meta/proto-conv/tests/it/v105_dictionary_meta.rs @@ -0,0 +1,89 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::TimeZone; +use chrono::Utc; +use databend_common_expression as ce; +use databend_common_expression::types::NumberDataType; +use databend_common_meta_app::schema as mt; +use databend_common_meta_types::anyerror::func_name; +use maplit::btreemap; + +use crate::common; + +#[test] +fn test_decode_v105_dictionary_meta() -> anyhow::Result<()> { + let bytes = vec![ + 10, 5, 77, 121, 83, 81, 76, 18, 17, 10, 8, 100, 97, 116, 97, 98, 97, 115, 101, 18, 5, 109, + 121, 95, 100, 98, 18, 17, 10, 4, 104, 111, 115, 116, 18, 9, 108, 111, 99, 97, 108, 104, + 111, 115, 116, 18, 16, 10, 8, 112, 97, 115, 115, 119, 111, 114, 100, 18, 4, 49, 50, 51, 52, + 18, 12, 10, 4, 112, 111, 114, 116, 18, 4, 51, 51, 48, 54, 18, 16, 10, 8, 117, 115, 101, + 114, 110, 97, 109, 101, 18, 4, 114, 111, 111, 116, 26, 123, 10, 43, 10, 7, 117, 115, 101, + 114, 95, 105, 100, 26, 26, 178, 2, 17, 154, 2, 8, 66, 0, 160, 6, 105, 168, 6, 24, 160, 6, + 105, 168, 6, 24, 160, 6, 105, 168, 6, 24, 160, 6, 105, 168, 6, 24, 10, 30, 10, 9, 117, 115, + 101, 114, 95, 110, 97, 109, 101, 26, 9, 146, 2, 0, 160, 6, 105, 168, 6, 24, 32, 1, 160, 6, + 105, 168, 6, 24, 10, 28, 10, 7, 97, 100, 100, 114, 101, 115, 115, 26, 9, 146, 2, 0, 160, 6, + 105, 168, 6, 24, 32, 2, 160, 6, 105, 168, 6, 24, 18, 6, 10, 1, 97, 18, 1, 98, 24, 3, 160, + 6, 105, 168, 6, 24, 34, 15, 18, 13, 117, 115, 101, 114, 39, 115, 32, 110, 117, 109, 98, + 101, 114, 34, 15, 8, 1, 18, 11, 117, 115, 101, 114, 39, 115, 32, 110, 97, 109, 101, 34, 23, + 8, 2, 18, 19, 117, 115, 101, 114, 39, 115, 32, 104, 111, 109, 101, 32, 97, 100, 100, 114, + 101, 115, 115, 42, 1, 0, 50, 15, 99, 111, 109, 109, 101, 110, 116, 95, 101, 120, 97, 109, + 112, 108, 101, 58, 23, 50, 48, 50, 52, 45, 48, 56, 45, 48, 53, 32, 48, 55, 58, 48, 48, 58, + 48, 48, 32, 85, 84, 67, 160, 6, 105, 168, 6, 24, + ]; + + let want = || mt::DictionaryMeta { + source: "MySQL".to_string(), + options: btreemap! { + s("host") => s("localhost"), + s("username") => s("root"), + s("password") => s("1234"), + s("port") => s("3306"), + s("database") => s("my_db"), + }, + schema: Arc::new(ce::TableSchema::new_from( + vec![ + ce::TableField::new( + "user_id", + ce::TableDataType::Nullable(Box::new(ce::TableDataType::Number( + NumberDataType::Int64, + ))), + ), + ce::TableField::new("user_name", ce::TableDataType::String), + ce::TableField::new("address", ce::TableDataType::String), + ], + btreemap! { s("a") => s("b") }, + )), + field_comments: btreemap! { + 0u32 => s("user's number"), + 1u32 => s("user's name"), + 2u32 => s("user's home address"), + }, + primary_column_ids: vec![0], + comment: "comment_example".to_string(), + created_on: Utc.with_ymd_and_hms(2024, 8, 5, 7, 0, 0).unwrap(), + updated_on: None, + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), bytes.as_slice(), 105, want())?; + + Ok(()) +} + +fn s(ss: impl ToString) -> String { + ss.to_string() +} diff --git a/src/meta/protos/proto/dictionary.proto b/src/meta/protos/proto/dictionary.proto new file mode 100644 index 000000000000..236b54b43d6a --- /dev/null +++ b/src/meta/protos/proto/dictionary.proto @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package databend_proto; + +import "metadata.proto"; + +// Describes the metadata of a dictionary +message DictionaryMeta { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + // Dictionary data source, such as MySQL, PostgreSQL, .. + string source = 1; + + // Dictionary configuration options + map options = 2; + + // The schema of a dictionary + DataSchema schema = 3; + + // Comments of each field + map field_comments = 4; + + // ID of the primary key column + repeated uint32 primary_column_ids = 5; + + // Comment about this dictionary. + string comment = 6; + + // The time dictionary created. + string created_on = 7; + + // The time dictionary updated. + optional string updated_on = 8; +}