From 356e5cbf5085d21c46a6e6705af421ff34e0df3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 20 Sep 2024 23:26:58 +0800 Subject: [PATCH] refactor: SchemaApi::list_databases() (#16485) --- src/meta/api/src/kv_pb_api/mod.rs | 39 ++++++++++++ src/meta/api/src/schema_api_impl.rs | 62 ++++++++----------- .../app/src/schema/database_name_ident.rs | 10 +-- 3 files changed, 70 insertions(+), 41 deletions(-) diff --git a/src/meta/api/src/kv_pb_api/mod.rs b/src/meta/api/src/kv_pb_api/mod.rs index 0b94960eb675..fc1caae81c9d 100644 --- a/src/meta/api/src/kv_pb_api/mod.rs +++ b/src/meta/api/src/kv_pb_api/mod.rs @@ -188,6 +188,25 @@ pub trait KVPbApi: KVApi { } } + /// Same as [`get_pb_values`](Self::get_pb_values) but collect the result in a `Vec` instead of a stream. + fn get_pb_values_vec( + &self, + keys: I, + ) -> impl Future>>, Self::Error>> + Send + where + K: kvapi::Key + 'static, + K::ValueType: FromToProto + Send + 'static, + I: IntoIterator + Send, + Self::Error: From>, + { + async move { + self.get_pb_values(keys) + .await? + .try_collect::>() + .await + } + } + /// Same as `get_pb_stream` but does not return keys, only values. /// /// It guaranteed to return the same number of results as the input keys. @@ -337,6 +356,26 @@ pub trait KVPbApi: KVApi { } } + /// Same as [`list_pb`](Self::list_pb)` but collect the result in a `Vec` instead of a stream. + fn list_pb_vec( + &self, + prefix: &DirName, + ) -> impl Future)>, Self::Error>> + Send + where + K: kvapi::Key + Send + Sync + 'static, + K::ValueType: FromToProto + Send, + Self::Error: From>, + { + async move { + let strm = self.list_pb(prefix).await?; + let kvs = strm + .map_ok(|itm| (itm.key, itm.seqv)) + .try_collect::>() + .await?; + Ok(kvs) + } + } + /// Same as `list_pb` but does not return values, only keys. fn list_pb_keys( &self, diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 8a4ddc612935..86139849d33b 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -203,7 +203,6 @@ use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; use crate::kv_pb_crud_api::KVPbCrudApi; use crate::list_keys; -use crate::list_u64_value; use crate::meta_txn_error::MetaTxnError; use crate::name_id_value_api::NameIdValueApi; use crate::name_value_api::NameValueApi; @@ -791,48 +790,39 @@ impl + ?Sized> SchemaApi for KV { ) -> Result>, KVAppError> { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); - // Using a empty db to to list all - let name_key = DatabaseNameIdent::new(req.tenant(), ""); - - // Pairs of db-name and db_id with seq - let (tenant_dbnames, db_ids) = list_u64_value(self, &name_key).await?; - - // Keys for fetching serialized DatabaseMeta from kvapi::KVApi - let mut kv_keys = Vec::with_capacity(db_ids.len()); - - for db_id in db_ids.iter() { - let k = DatabaseId { db_id: *db_id }.to_string_key(); - kv_keys.push(k); - } + let name_key = DatabaseNameIdent::new(req.tenant(), "dummy"); + let dir = DirName::new(name_key); - // Batch get all db-metas. - // - A db-meta may be already deleted. It is Ok. Just ignore it. + let name_seq_ids = self.list_pb_vec(&dir).await?; - let seq_metas = self.mget_kv(&kv_keys).await?; - let mut db_infos = Vec::with_capacity(kv_keys.len()); + let id_idents = name_seq_ids + .iter() + .map(|(_k, id)| { + let db_id = id.data; + DatabaseId { db_id: *db_id } + }) + .collect::>(); - for (i, seq_meta_opt) in seq_metas.iter().enumerate() { - if let Some(seq_meta) = seq_meta_opt { - let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?; + let id_metas = self.get_pb_values_vec(id_idents).await?; + let name_id_metas = name_seq_ids + .into_iter() + .zip(id_metas.into_iter()) + // Remove values that are not found, may be just removed. + .filter_map(|((name, seq_id), opt_seq_meta)| { + opt_seq_meta.map(|seq_meta| (name, seq_id.data, seq_meta)) + }) + .map(|(name, db_id, seq_meta)| { let db_info = DatabaseInfo { - database_id: DatabaseId::new(db_ids[i]), - name_ident: DatabaseNameIdent::new( - name_key.tenant(), - tenant_dbnames[i].database_name(), - ), - meta: SeqV::new(seq_meta.seq, db_meta), + database_id: db_id.into_inner(), + name_ident: name, + meta: seq_meta, }; - db_infos.push(Arc::new(db_info)); - } else { - debug!( - k = &kv_keys[i]; - "db_meta not found, maybe just deleted after listing names and before listing meta" - ); - } - } + Arc::new(db_info) + }) + .collect::>(); - Ok(db_infos) + Ok(name_id_metas) } #[logcall::logcall] diff --git a/src/meta/app/src/schema/database_name_ident.rs b/src/meta/app/src/schema/database_name_ident.rs index bbf4b7bad4de..a89a8401ec78 100644 --- a/src/meta/app/src/schema/database_name_ident.rs +++ b/src/meta/app/src/schema/database_name_ident.rs @@ -15,10 +15,10 @@ use crate::tenant_key::ident::TIdent; use crate::tenant_key::raw::TIdentRaw; -pub type DatabaseNameIdent = TIdent; -pub type DatabaseNameIdentRaw = TIdentRaw; +pub type DatabaseNameIdent = TIdent; +pub type DatabaseNameIdentRaw = TIdentRaw; -pub use kvapi_impl::Resource; +pub use kvapi_impl::DatabaseNameRsc; impl DatabaseNameIdent { pub fn database_name(&self) -> &str { @@ -42,8 +42,8 @@ mod kvapi_impl { use crate::schema::DatabaseId; use crate::tenant_key::resource::TenantResource; - pub struct Resource; - impl TenantResource for Resource { + pub struct DatabaseNameRsc; + impl TenantResource for DatabaseNameRsc { const PREFIX: &'static str = "__fd_database"; const TYPE: &'static str = "DatabaseNameIdent"; const HAS_TENANT: bool = true;