Skip to content

Commit

Permalink
refactor: SchemaApi::list_databases() (#16485)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 20, 2024
1 parent 5b88e97 commit 356e5cb
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 41 deletions.
39 changes: 39 additions & 0 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,25 @@ pub trait KVPbApi: KVApi {
}
}

/// Same as [`get_pb_values`](Self::get_pb_values) but collect the result in a `Vec` instead of a stream.
fn get_pb_values_vec<K, I>(
&self,
keys: I,
) -> impl Future<Output = Result<Vec<Option<SeqV<K::ValueType>>>, Self::Error>> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K> + Send,
Self::Error: From<PbApiReadError<Self::Error>>,
{
async move {
self.get_pb_values(keys)
.await?
.try_collect::<Vec<_>>()
.await
}
}

/// Same as `get_pb_stream` but does not return keys, only values.
///
/// It guaranteed to return the same number of results as the input keys.
Expand Down Expand Up @@ -337,6 +356,26 @@ pub trait KVPbApi: KVApi {
}
}

/// Same as [`list_pb`](Self::list_pb)` but collect the result in a `Vec` instead of a stream.
fn list_pb_vec<K>(
&self,
prefix: &DirName<K>,
) -> impl Future<Output = Result<Vec<(K, SeqV<K::ValueType>)>, Self::Error>> + Send
where
K: kvapi::Key + Send + Sync + 'static,
K::ValueType: FromToProto + Send,
Self::Error: From<PbApiReadError<Self::Error>>,
{
async move {
let strm = self.list_pb(prefix).await?;
let kvs = strm
.map_ok(|itm| (itm.key, itm.seqv))
.try_collect::<Vec<_>>()
.await?;
Ok(kvs)
}
}

/// Same as `list_pb` but does not return values, only keys.
fn list_pb_keys<K>(
&self,
Expand Down
62 changes: 26 additions & 36 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_crud_api::KVPbCrudApi;
use crate::list_keys;
use crate::list_u64_value;
use crate::meta_txn_error::MetaTxnError;
use crate::name_id_value_api::NameIdValueApi;
use crate::name_value_api::NameValueApi;
Expand Down Expand Up @@ -791,48 +790,39 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

// Using a empty db to to list all
let name_key = DatabaseNameIdent::new(req.tenant(), "");

// Pairs of db-name and db_id with seq
let (tenant_dbnames, db_ids) = list_u64_value(self, &name_key).await?;

// Keys for fetching serialized DatabaseMeta from kvapi::KVApi
let mut kv_keys = Vec::with_capacity(db_ids.len());

for db_id in db_ids.iter() {
let k = DatabaseId { db_id: *db_id }.to_string_key();
kv_keys.push(k);
}
let name_key = DatabaseNameIdent::new(req.tenant(), "dummy");
let dir = DirName::new(name_key);

// Batch get all db-metas.
// - A db-meta may be already deleted. It is Ok. Just ignore it.
let name_seq_ids = self.list_pb_vec(&dir).await?;

let seq_metas = self.mget_kv(&kv_keys).await?;
let mut db_infos = Vec::with_capacity(kv_keys.len());
let id_idents = name_seq_ids
.iter()
.map(|(_k, id)| {
let db_id = id.data;
DatabaseId { db_id: *db_id }
})
.collect::<Vec<_>>();

for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?;
let id_metas = self.get_pb_values_vec(id_idents).await?;

let name_id_metas = name_seq_ids
.into_iter()
.zip(id_metas.into_iter())
// Remove values that are not found, may be just removed.
.filter_map(|((name, seq_id), opt_seq_meta)| {
opt_seq_meta.map(|seq_meta| (name, seq_id.data, seq_meta))
})
.map(|(name, db_id, seq_meta)| {
let db_info = DatabaseInfo {
database_id: DatabaseId::new(db_ids[i]),
name_ident: DatabaseNameIdent::new(
name_key.tenant(),
tenant_dbnames[i].database_name(),
),
meta: SeqV::new(seq_meta.seq, db_meta),
database_id: db_id.into_inner(),
name_ident: name,
meta: seq_meta,
};
db_infos.push(Arc::new(db_info));
} else {
debug!(
k = &kv_keys[i];
"db_meta not found, maybe just deleted after listing names and before listing meta"
);
}
}
Arc::new(db_info)
})
.collect::<Vec<_>>();

Ok(db_infos)
Ok(name_id_metas)
}

#[logcall::logcall]
Expand Down
10 changes: 5 additions & 5 deletions src/meta/app/src/schema/database_name_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use crate::tenant_key::ident::TIdent;
use crate::tenant_key::raw::TIdentRaw;

pub type DatabaseNameIdent = TIdent<Resource>;
pub type DatabaseNameIdentRaw = TIdentRaw<Resource>;
pub type DatabaseNameIdent = TIdent<DatabaseNameRsc>;
pub type DatabaseNameIdentRaw = TIdentRaw<DatabaseNameRsc>;

pub use kvapi_impl::Resource;
pub use kvapi_impl::DatabaseNameRsc;

impl DatabaseNameIdent {
pub fn database_name(&self) -> &str {
Expand All @@ -42,8 +42,8 @@ mod kvapi_impl {
use crate::schema::DatabaseId;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
pub struct DatabaseNameRsc;
impl TenantResource for DatabaseNameRsc {
const PREFIX: &'static str = "__fd_database";
const TYPE: &'static str = "DatabaseNameIdent";
const HAS_TENANT: bool = true;
Expand Down

0 comments on commit 356e5cb

Please sign in to comment.