Skip to content

Commit

Permalink
chore: add KVPbApi::get_pb_stream() but it is not yet ready for produ…
Browse files Browse the repository at this point in the history
…ction use (#15471)
  • Loading branch information
drmingdrmer committed May 11, 2024
1 parent 6981624 commit edaf42d
Showing 1 changed file with 267 additions and 0 deletions.
267 changes: 267 additions & 0 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,112 @@ pub trait KVPbApi: KVApi {
}
}

/// Same as `get_pb_stream` but does not return keys, only values.
#[deprecated(note = "stream may be closed. The caller must check it")]
fn get_pb_values<K, I>(
&self,
keys: I,
) -> impl Future<
Output = Result<
BoxStream<
'static, //
Result<Option<SeqV<K::ValueType>>, Self::Error>,
>,
Self::Error,
>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K>,
Self::Error: From<PbApiReadError<Self::Error>>,
{
self.get_pb_stream_low(keys)
// This `map()` handles Future result
.map(|r| match r {
Ok(strm) => {
// These two `map_xx()` handles Stream result
Ok(strm.map_ok(|(_k, v)| v).map_err(Self::Error::from).boxed())
}
Err(e) => Err(e),
})
}

/// Get protobuf encoded values by a series of kvapi::Key.
///
/// The key will be converted to string and the returned value is decoded by `FromToProto`.
/// It returns the same error as `KVApi::Error`,
/// thus it requires KVApi::Error can describe a decoding error, i.e., `impl From<PbApiReadError>`.
#[deprecated(note = "stream may be closed. The caller must check it")]
fn get_pb_stream<K, I>(
&self,
keys: I,
) -> impl Future<
Output = Result<
BoxStream<
'static, //
Result<(K, Option<SeqV<K::ValueType>>), Self::Error>,
>,
Self::Error,
>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K>,
Self::Error: From<PbApiReadError<Self::Error>>,
{
self.get_pb_stream_low(keys).map(|r| match r {
Ok(strm) => Ok(strm.map_err(Self::Error::from).boxed()),
Err(e) => Err(e),
})
}

/// Same as `get_pb_stream` but returns [`PbApiReadError`]. No require of `From<PbApiReadError>` for `Self::Error`.
fn get_pb_stream_low<K, I>(
&self,
keys: I,
) -> impl Future<
Output = Result<
BoxStream<
'static, //
Result<(K, Option<SeqV<K::ValueType>>), PbApiReadError<Self::Error>>,
>,
Self::Error,
>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K>,
{
let keys = keys
.into_iter()
.map(|k| kvapi::Key::to_string_key(&k))
.collect::<Vec<_>>();

async move {
let strm = self.get_kv_stream(&keys).await?;

let strm = strm.map(|r: Result<StreamItem, Self::Error>| {
let item = r.map_err(PbApiReadError::KvApiError)?;

let k = K::from_str_key(&item.key).map_err(PbApiReadError::KeyError)?;

let v = if let Some(pb_seqv) = item.value {
let seqv = decode_seqv::<K::ValueType>(SeqV::from(pb_seqv))?;
Some(seqv)
} else {
None
};

Ok((k, v))
});

Ok(strm.boxed())
}
}

/// Same as `list_pb` but does not return values, only keys.
fn list_pb_keys<K>(
&self,
Expand Down Expand Up @@ -216,3 +322,164 @@ pub trait KVPbApi: KVApi {
}

impl<T> KVPbApi for T where T: KVApi + ?Sized {}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use databend_common_meta_app::schema::CatalogIdIdent;
use databend_common_meta_app::schema::CatalogMeta;
use databend_common_meta_app::schema::CatalogOption;
use databend_common_meta_app::schema::HiveCatalogOption;
use databend_common_meta_app::storage::StorageS3Config;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_proto_conv::FromToProto;
use futures::StreamExt;
use futures::TryStreamExt;
use prost::Message;

use crate::kv_pb_api::KVPbApi;

//
struct Foo {
kvs: BTreeMap<String, SeqV>,
}

#[async_trait]
impl KVApi for Foo {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
todo!()
}

async fn get_kv_stream(
&self,
keys: &[String],
) -> Result<KVStream<Self::Error>, Self::Error> {
let mut res = Vec::with_capacity(keys.len());
for key in keys {
let k = key.clone();
let v = self.kvs.get(key).cloned();

let item = StreamItem::new(k, v.map(|v| v.into()));
res.push(Ok(item));
}

let strm = futures::stream::iter(res);
Ok(strm.boxed())
}

async fn list_kv(&self, _prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
todo!()
}

async fn transaction(&self, _txn: TxnRequest) -> Result<TxnReply, Self::Error> {
todo!()
}
}

// TODO: test upsert_kv
// TODO: test upsert_kv
// TODO: test list_kv

#[tokio::test]
async fn test_mget() -> anyhow::Result<()> {
let catalog_meta = CatalogMeta {
catalog_option: CatalogOption::Hive(HiveCatalogOption {
address: "127.0.0.1:10000".to_string(),
storage_params: Some(Box::new(
databend_common_meta_app::storage::StorageParams::S3(StorageS3Config {
endpoint_url: "http://127.0.0.1:9900".to_string(),
region: "hello".to_string(),
bucket: "world".to_string(),
access_key_id: "databend_has_super_power".to_string(),
secret_access_key: "databend_has_super_power".to_string(),
..Default::default()
}),
)),
}),
created_on: DateTime::<Utc>::MIN_UTC,
};
let v = catalog_meta.to_pb()?.encode_to_vec();

let foo = Foo {
kvs: vec![
(s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())),
(s("__fd_catalog_by_id/2"), SeqV::new(2, v.clone())),
(s("__fd_catalog_by_id/3"), SeqV::new(3, v.clone())),
]
.into_iter()
.collect(),
};

let tenant = Tenant::new_literal("dummy");

// Get key value pairs
{
#[allow(deprecated)]
let strm = foo
.get_pb_stream([
CatalogIdIdent::new(&tenant, 1),
CatalogIdIdent::new(&tenant, 2),
CatalogIdIdent::new(&tenant, 4),
])
.await?;

let got = strm.try_collect::<Vec<_>>().await?;

assert_eq!(CatalogIdIdent::new(&tenant, 1), got[0].0);
assert_eq!(CatalogIdIdent::new(&tenant, 2), got[1].0);
assert_eq!(CatalogIdIdent::new(&tenant, 4), got[2].0);

assert_eq!(1, got[0].1.seq());
assert_eq!(2, got[1].1.seq());
assert_eq!(0, got[2].1.seq());

assert_eq!(Some(&catalog_meta), got[0].1.value());
assert_eq!(Some(&catalog_meta), got[1].1.value());
assert_eq!(None, got[2].1.value());
}

// Get values
{
#[allow(deprecated)]
let strm = foo
.get_pb_values([
CatalogIdIdent::new(&tenant, 1),
CatalogIdIdent::new(&tenant, 2),
CatalogIdIdent::new(&tenant, 4),
])
.await?;

let got = strm.try_collect::<Vec<_>>().await?;

assert_eq!(1, got[0].seq());
assert_eq!(2, got[1].seq());
assert_eq!(0, got[2].seq());

assert_eq!(Some(&catalog_meta), got[0].value());
assert_eq!(Some(&catalog_meta), got[1].value());
assert_eq!(None, got[2].value());
}

Ok(())
}

fn s(x: impl ToString) -> String {
x.to_string()
}
}

0 comments on commit edaf42d

Please sign in to comment.