From edaf42d35f88b7a624a52120af76a142f023dab4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 11 May 2024 20:57:37 +0800 Subject: [PATCH] chore: add KVPbApi::get_pb_stream() but it is not yet ready for production use (#15471) --- src/meta/api/src/kv_pb_api/mod.rs | 267 ++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) diff --git a/src/meta/api/src/kv_pb_api/mod.rs b/src/meta/api/src/kv_pb_api/mod.rs index f07842ed6b3b..98e92736fa5c 100644 --- a/src/meta/api/src/kv_pb_api/mod.rs +++ b/src/meta/api/src/kv_pb_api/mod.rs @@ -127,6 +127,112 @@ pub trait KVPbApi: KVApi { } } + /// Same as `get_pb_stream` but does not return keys, only values. + #[deprecated(note = "stream may be closed. The caller must check it")] + fn get_pb_values( + &self, + keys: I, + ) -> impl Future< + Output = Result< + BoxStream< + 'static, // + Result>, Self::Error>, + >, + Self::Error, + >, + > + Send + where + K: kvapi::Key + 'static, + K::ValueType: FromToProto + Send + 'static, + I: IntoIterator, + Self::Error: From>, + { + self.get_pb_stream_low(keys) + // This `map()` handles Future result + .map(|r| match r { + Ok(strm) => { + // These two `map_xx()` handles Stream result + Ok(strm.map_ok(|(_k, v)| v).map_err(Self::Error::from).boxed()) + } + Err(e) => Err(e), + }) + } + + /// Get protobuf encoded values by a series of kvapi::Key. + /// + /// The key will be converted to string and the returned value is decoded by `FromToProto`. + /// It returns the same error as `KVApi::Error`, + /// thus it requires KVApi::Error can describe a decoding error, i.e., `impl From`. + #[deprecated(note = "stream may be closed. The caller must check it")] + fn get_pb_stream( + &self, + keys: I, + ) -> impl Future< + Output = Result< + BoxStream< + 'static, // + Result<(K, Option>), Self::Error>, + >, + Self::Error, + >, + > + Send + where + K: kvapi::Key + 'static, + K::ValueType: FromToProto + Send + 'static, + I: IntoIterator, + Self::Error: From>, + { + self.get_pb_stream_low(keys).map(|r| match r { + Ok(strm) => Ok(strm.map_err(Self::Error::from).boxed()), + Err(e) => Err(e), + }) + } + + /// Same as `get_pb_stream` but returns [`PbApiReadError`]. No require of `From` for `Self::Error`. + fn get_pb_stream_low( + &self, + keys: I, + ) -> impl Future< + Output = Result< + BoxStream< + 'static, // + Result<(K, Option>), PbApiReadError>, + >, + Self::Error, + >, + > + Send + where + K: kvapi::Key + 'static, + K::ValueType: FromToProto + Send + 'static, + I: IntoIterator, + { + let keys = keys + .into_iter() + .map(|k| kvapi::Key::to_string_key(&k)) + .collect::>(); + + async move { + let strm = self.get_kv_stream(&keys).await?; + + let strm = strm.map(|r: Result| { + let item = r.map_err(PbApiReadError::KvApiError)?; + + let k = K::from_str_key(&item.key).map_err(PbApiReadError::KeyError)?; + + let v = if let Some(pb_seqv) = item.value { + let seqv = decode_seqv::(SeqV::from(pb_seqv))?; + Some(seqv) + } else { + None + }; + + Ok((k, v)) + }); + + Ok(strm.boxed()) + } + } + /// Same as `list_pb` but does not return values, only keys. fn list_pb_keys( &self, @@ -216,3 +322,164 @@ pub trait KVPbApi: KVApi { } impl KVPbApi for T where T: KVApi + ?Sized {} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use async_trait::async_trait; + use chrono::DateTime; + use chrono::Utc; + use databend_common_meta_app::schema::CatalogIdIdent; + use databend_common_meta_app::schema::CatalogMeta; + use databend_common_meta_app::schema::CatalogOption; + use databend_common_meta_app::schema::HiveCatalogOption; + use databend_common_meta_app::storage::StorageS3Config; + use databend_common_meta_app::tenant::Tenant; + use databend_common_meta_kvapi::kvapi::KVApi; + use databend_common_meta_kvapi::kvapi::KVStream; + use databend_common_meta_kvapi::kvapi::UpsertKVReply; + use databend_common_meta_kvapi::kvapi::UpsertKVReq; + use databend_common_meta_types::protobuf::StreamItem; + use databend_common_meta_types::MetaError; + use databend_common_meta_types::SeqV; + use databend_common_meta_types::SeqValue; + use databend_common_meta_types::TxnReply; + use databend_common_meta_types::TxnRequest; + use databend_common_proto_conv::FromToProto; + use futures::StreamExt; + use futures::TryStreamExt; + use prost::Message; + + use crate::kv_pb_api::KVPbApi; + + // + struct Foo { + kvs: BTreeMap, + } + + #[async_trait] + impl KVApi for Foo { + type Error = MetaError; + + async fn upsert_kv(&self, _req: UpsertKVReq) -> Result { + todo!() + } + + async fn get_kv_stream( + &self, + keys: &[String], + ) -> Result, Self::Error> { + let mut res = Vec::with_capacity(keys.len()); + for key in keys { + let k = key.clone(); + let v = self.kvs.get(key).cloned(); + + let item = StreamItem::new(k, v.map(|v| v.into())); + res.push(Ok(item)); + } + + let strm = futures::stream::iter(res); + Ok(strm.boxed()) + } + + async fn list_kv(&self, _prefix: &str) -> Result, Self::Error> { + todo!() + } + + async fn transaction(&self, _txn: TxnRequest) -> Result { + todo!() + } + } + + // TODO: test upsert_kv + // TODO: test upsert_kv + // TODO: test list_kv + + #[tokio::test] + async fn test_mget() -> anyhow::Result<()> { + let catalog_meta = CatalogMeta { + catalog_option: CatalogOption::Hive(HiveCatalogOption { + address: "127.0.0.1:10000".to_string(), + storage_params: Some(Box::new( + databend_common_meta_app::storage::StorageParams::S3(StorageS3Config { + endpoint_url: "http://127.0.0.1:9900".to_string(), + region: "hello".to_string(), + bucket: "world".to_string(), + access_key_id: "databend_has_super_power".to_string(), + secret_access_key: "databend_has_super_power".to_string(), + ..Default::default() + }), + )), + }), + created_on: DateTime::::MIN_UTC, + }; + let v = catalog_meta.to_pb()?.encode_to_vec(); + + let foo = Foo { + kvs: vec![ + (s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())), + (s("__fd_catalog_by_id/2"), SeqV::new(2, v.clone())), + (s("__fd_catalog_by_id/3"), SeqV::new(3, v.clone())), + ] + .into_iter() + .collect(), + }; + + let tenant = Tenant::new_literal("dummy"); + + // Get key value pairs + { + #[allow(deprecated)] + let strm = foo + .get_pb_stream([ + CatalogIdIdent::new(&tenant, 1), + CatalogIdIdent::new(&tenant, 2), + CatalogIdIdent::new(&tenant, 4), + ]) + .await?; + + let got = strm.try_collect::>().await?; + + assert_eq!(CatalogIdIdent::new(&tenant, 1), got[0].0); + assert_eq!(CatalogIdIdent::new(&tenant, 2), got[1].0); + assert_eq!(CatalogIdIdent::new(&tenant, 4), got[2].0); + + assert_eq!(1, got[0].1.seq()); + assert_eq!(2, got[1].1.seq()); + assert_eq!(0, got[2].1.seq()); + + assert_eq!(Some(&catalog_meta), got[0].1.value()); + assert_eq!(Some(&catalog_meta), got[1].1.value()); + assert_eq!(None, got[2].1.value()); + } + + // Get values + { + #[allow(deprecated)] + let strm = foo + .get_pb_values([ + CatalogIdIdent::new(&tenant, 1), + CatalogIdIdent::new(&tenant, 2), + CatalogIdIdent::new(&tenant, 4), + ]) + .await?; + + let got = strm.try_collect::>().await?; + + assert_eq!(1, got[0].seq()); + assert_eq!(2, got[1].seq()); + assert_eq!(0, got[2].seq()); + + assert_eq!(Some(&catalog_meta), got[0].value()); + assert_eq!(Some(&catalog_meta), got[1].value()); + assert_eq!(None, got[2].value()); + } + + Ok(()) + } + + fn s(x: impl ToString) -> String { + x.to_string() + } +}