From b4e0a2bde2cdb4b06ffc445a736faffb248d1028 Mon Sep 17 00:00:00 2001 From: Winnie-Hong0927 <136137323+Winnie-Hong0927@users.noreply.github.com> Date: Sat, 14 Sep 2024 00:23:42 +0800 Subject: [PATCH] feat(query): Support access Mysql data from dictionaries via the `dict_get` function. (#16444) * feat: access data from mysql via dict_get * fix * update: operator & transform * update: dict_get mysql * update: cargo & transform * update : add Date & Timestamp * update : Date & Timestamp. * fix: cancel date & timestamp feat: test. * fix * fix. * update:test & mysql_source. * fix: transform dictionary. * fix: cargo * fix: cargo.lock --- Cargo.lock | 419 +++++++++++++++++- src/common/exception/Cargo.toml | 1 + src/common/exception/src/exception_into.rs | 6 + src/query/service/Cargo.toml | 1 + .../transforms/transform_async_function.rs | 6 +- .../transforms/transform_dictionary.rs | 173 +++++--- .../sql/src/planner/binder/ddl/dictionary.rs | 8 +- tests/sqllogictests/Cargo.toml | 3 + tests/sqllogictests/src/main.rs | 18 +- tests/sqllogictests/src/mock_source/mod.rs | 2 + .../src/mock_source/mysql_source.rs | 360 +++++++++++++++ .../functions/02_0077_function_dict_get.test | 52 ++- 12 files changed, 978 insertions(+), 71 deletions(-) create mode 100644 tests/sqllogictests/src/mock_source/mysql_source.rs diff --git a/Cargo.lock b/Cargo.lock index 8d6b7c67eca8..2cae8952e576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1591,6 +1591,9 @@ name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +dependencies = [ + "serde", +] [[package]] name = "bitmaps" @@ -1737,7 +1740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.58", @@ -2761,6 +2764,21 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc16" version = "0.4.0" @@ -3425,6 +3443,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "sqlx", "tantivy", "thiserror", "tonic 0.11.0", @@ -5254,6 +5273,7 @@ dependencies = [ "serde_urlencoded", "sha2", "socket2 0.5.7", + "sqlx", "strength_reduce", "sysinfo", "temp-env", @@ -5304,13 +5324,16 @@ dependencies = [ "databend-common-exception", "env_logger", "futures-util", + "msql-srv", "mysql_async", + "mysql_common 0.32.4", "rand 0.8.5", "regex", "reqwest", "serde", "serde_json", "sqllogictest", + "sqlparser 0.50.0", "thiserror", "tokio", "walkdir", @@ -6174,6 +6197,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -6290,6 +6319,9 @@ name = "either" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -6503,6 +6535,17 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5692dd7b5a1978a5aeb0ce83b7655c58ca8efdcb79d21036ea249da95afec2c6" +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if 1.0.0", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "ethnum" version = "1.5.0" @@ -6812,6 +6855,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8" +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -7068,6 +7122,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.12.3", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -8396,6 +8461,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "hdfs-sys" version = "0.3.0" @@ -9813,6 +9887,17 @@ dependencies = [ "redox_syscall 0.5.3", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libtest-mimic" version = "0.7.3" @@ -10378,6 +10463,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "msql-srv" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b821d09e9a4ed6b61015a889597446b3b6c7721544d0f4b617bcfdacf6ee7877" +dependencies = [ + "byteorder", + "chrono", + "mysql_common 0.31.0", + "nom", + "rustls 0.22.4", +] + [[package]] name = "multer" version = "3.1.0" @@ -10420,6 +10518,24 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2195bf6aa996a481483b29d62a7663eed3fe39600c460e323f8ff41e90bdd89b" +[[package]] +name = "mysql-common-derive" +version = "0.30.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f" +dependencies = [ + "darling", + "heck 0.4.1", + "num-bigint", + "proc-macro-crate 1.3.1", + "proc-macro-error 1.0.4", + "proc-macro2", + "quote", + "syn 2.0.58", + "termcolor", + "thiserror", +] + [[package]] name = "mysql-common-derive" version = "0.31.1" @@ -10429,7 +10545,7 @@ dependencies = [ "darling", "heck 0.4.1", "num-bigint", - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro-error 1.0.4", "proc-macro2", "quote", @@ -10452,7 +10568,7 @@ dependencies = [ "futures-util", "keyed_priority_queue", "lru", - "mysql_common", + "mysql_common 0.32.4", "native-tls", "pem", "percent-encoding", @@ -10469,6 +10585,46 @@ dependencies = [ "url", ] +[[package]] +name = "mysql_common" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06f19e4cfa0ab5a76b627cec2d81331c49b034988eaf302c3bafeada684eadef" +dependencies = [ + "base64 0.21.7", + "bigdecimal", + "bindgen 0.69.4", + "bitflags 2.6.0", + "bitvec", + "btoi", + "byteorder", + "bytes", + "cc", + "chrono", + "cmake", + "crc32fast", + "flate2", + "frunk", + "lazy_static", + "mysql-common-derive 0.30.2", + "num-bigint", + "num-traits", + "rand 0.8.5", + "regex", + "rust_decimal", + "saturating", + "serde", + "serde_json", + "sha1", + "sha2", + "smallvec", + "subprocess", + "thiserror", + "time", + "uuid", + "zstd 0.12.4", +] + [[package]] name = "mysql_common" version = "0.32.4" @@ -10490,7 +10646,7 @@ dependencies = [ "flate2", "frunk", "lazy_static", - "mysql-common-derive", + "mysql-common-derive 0.31.1", "num-bigint", "num-traits", "rand 0.8.5", @@ -10803,7 +10959,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.58", @@ -10993,7 +11149,7 @@ dependencies = [ "async-trait", "byteorder", "chrono", - "mysql_common", + "mysql_common 0.32.4", "nom", "pin-project-lite", "tokio", @@ -11774,7 +11930,7 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a62fea1692d80a000126f9b28d865012a160b80000abb53ccf152b428222c155" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.58", @@ -11952,6 +12108,16 @@ dependencies = [ "elliptic-curve 0.13.8", ] +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit 0.19.15", +] + [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -14103,6 +14269,9 @@ name = "smallvec" version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +dependencies = [ + "serde", +] [[package]] name = "snafu" @@ -14314,6 +14483,15 @@ dependencies = [ "sqlparser_derive", ] +[[package]] +name = "sqlparser" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac" +dependencies = [ + "log", +] + [[package]] name = "sqlparser_derive" version = "0.2.2" @@ -14325,6 +14503,200 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "sqlx" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" +dependencies = [ + "atoi", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener 5.3.1", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.14.5", + "hashlink 0.9.1", + "hex", + "indexmap 2.4.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlformat", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.58", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" +dependencies = [ + "dotenvy", + "either", + "heck 0.5.0", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 2.0.58", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.6.0", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "rsa 0.9.6", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.6.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing", + "url", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -14398,6 +14770,17 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe895eb47f22e2ddd4dabc02bce419d2e643c8e3b585c78158b349195bc24d82" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "stringslice" version = "0.2.0" @@ -15673,6 +16056,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ea75f83c0137a9b98608359a5f1af8144876eb67bcb1ce837368e906a9f524" + [[package]] name = "unicode-segmentation" version = "1.11.0" @@ -16022,6 +16411,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasix" version = "0.12.21" @@ -16495,6 +16890,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall 0.5.3", + "wasite", +] + [[package]] name = "widestring" version = "1.1.0" diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index 033999b74d0d..84080a4ebe46 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -28,6 +28,7 @@ prost = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +sqlx = "0.8" tantivy = { workspace = true } thiserror = { workspace = true } tonic = { workspace = true } diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index c20736633799..cb18b0e451cc 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -441,3 +441,9 @@ impl From for tonic::Status { } } } + +impl From for ErrorCode { + fn from(error: sqlx::Error) -> Self { + ErrorCode::DictionarySourceError(format!("Dictionary Sqlx Error, cause: {}", error)) + } +} diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index e87c5db7bfc0..5f0bd0831a48 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -163,6 +163,7 @@ serde_stacker = { workspace = true } serde_urlencoded = "0.7.1" sha2 = { workspace = true } socket2 = "0.5.3" +sqlx = { version = "0.8", features = ["mysql", "runtime-tokio"] } strength_reduce = "0.2.4" sysinfo = "0.30" tempfile = "3.4.0" diff --git a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs index 86a5ab076d11..d5d760478350 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs @@ -26,8 +26,8 @@ use databend_common_meta_app::schema::GetSequenceNextValueReq; use databend_common_meta_app::schema::SequenceIdent; use databend_common_pipeline_transforms::processors::AsyncTransform; use databend_common_storages_fuse::TableContext; -use opendal::Operator; +use crate::pipelines::processors::transforms::transform_dictionary::DictionaryOperator; use crate::sessions::QueryContext; use crate::sql::executor::physical_plans::AsyncFunctionDesc; use crate::sql::plans::AsyncFunctionArgument; @@ -35,7 +35,7 @@ use crate::sql::plans::AsyncFunctionArgument; pub struct TransformAsyncFunction { ctx: Arc, // key is the index of async_func_desc - pub(crate) operators: BTreeMap>, + pub(crate) operators: BTreeMap>, async_func_descs: Vec, } @@ -43,7 +43,7 @@ impl TransformAsyncFunction { pub fn new( ctx: Arc, async_func_descs: Vec, - operators: BTreeMap>, + operators: BTreeMap>, ) -> Self { Self { ctx, diff --git a/src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs b/src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs index 5a4f41318a44..7082a4429be6 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs @@ -15,9 +15,16 @@ use std::collections::BTreeMap; use std::sync::Arc; +use chrono_tz::Tz; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::date::date_to_string; +use databend_common_expression::types::timestamp::timestamp_to_string; use databend_common_expression::types::DataType; +use databend_common_expression::types::Number; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberScalar; +use databend_common_expression::with_integer_mapped_type; use databend_common_expression::BlockEntry; use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; @@ -27,6 +34,7 @@ use databend_common_expression::Value; use databend_common_storage::build_operator; use opendal::services::Redis; use opendal::Operator; +use sqlx::MySqlPool; use crate::pipelines::processors::transforms::TransformAsyncFunction; use crate::sql::executor::physical_plans::AsyncFunctionDesc; @@ -35,10 +43,101 @@ use crate::sql::plans::DictGetFunctionArgument; use crate::sql::plans::DictionarySource; use crate::sql::IndexType; +pub(crate) enum DictionaryOperator { + Operator(Operator), + Mysql((MySqlPool, String)), +} + +impl DictionaryOperator { + fn format_key(&self, key: ScalarRef<'_>) -> String { + match key { + ScalarRef::String(s) => s.to_string(), + ScalarRef::Date(d) => format!("{}", date_to_string(d as i64, Tz::UTC)), + ScalarRef::Timestamp(t) => format!("{}", timestamp_to_string(t, Tz::UTC)), + _ => format!("{}", key), + } + } + + async fn dict_get(&self, key: ScalarRef<'_>, data_type: &DataType) -> Result> { + if key == ScalarRef::Null { + return Ok(None); + } + match self { + DictionaryOperator::Operator(op) => { + if let ScalarRef::String(key) = key { + let buffer = op.read(key).await; + match buffer { + Ok(res) => { + let value = + unsafe { String::from_utf8_unchecked(res.current().to_vec()) }; + Ok(Some(Scalar::String(value))) + } + Err(e) => { + if e.kind() == opendal::ErrorKind::NotFound { + Ok(None) + } else { + Err(ErrorCode::DictionarySourceError(format!( + "dictionary source error: {e}" + ))) + } + } + } + } else { + Ok(None) + } + } + DictionaryOperator::Mysql((pool, sql)) => match data_type.remove_nullable() { + DataType::Boolean => { + let value: Option = sqlx::query_scalar(sql) + .bind(self.format_key(key)) + .fetch_optional(pool) + .await?; + Ok(value.map(Scalar::Boolean)) + } + DataType::String => { + let value: Option = sqlx::query_scalar(sql) + .bind(self.format_key(key)) + .fetch_optional(pool) + .await?; + Ok(value.map(Scalar::String)) + } + DataType::Number(num_ty) => { + with_integer_mapped_type!(|NUM_TYPE| match num_ty { + NumberDataType::NUM_TYPE => { + let value: Option = sqlx::query_scalar(&sql) + .bind(self.format_key(key)) + .fetch_optional(pool) + .await?; + Ok(value.map(|v| Scalar::Number(NUM_TYPE::upcast_scalar(v)))) + } + NumberDataType::Float32 => { + let value: Option = sqlx::query_scalar(sql) + .bind(self.format_key(key)) + .fetch_optional(pool) + .await?; + Ok(value.map(|v| Scalar::Number(NumberScalar::Float32(v.into())))) + } + NumberDataType::Float64 => { + let value: Option = sqlx::query_scalar(sql) + .bind(self.format_key(key)) + .fetch_optional(pool) + .await?; + Ok(value.map(|v| Scalar::Number(NumberScalar::Float64(v.into())))) + } + }) + } + _ => Err(ErrorCode::DictionarySourceError(format!( + "unsupported value type {data_type}" + ))), + }, + } + } +} + impl TransformAsyncFunction { - pub fn init_operators( + pub(crate) fn init_operators( async_func_descs: &[AsyncFunctionDesc], - ) -> Result>> { + ) -> Result>> { let mut operators = BTreeMap::new(); for (i, async_func_desc) in async_func_descs.iter().enumerate() { if let AsyncFunctionArgument::DictGetFunction(dict_arg) = &async_func_desc.func_arg { @@ -55,10 +154,17 @@ impl TransformAsyncFunction { builder = builder.db(db_index); } let op = build_operator(builder)?; - operators.insert(i, Arc::new(op)); + operators.insert(i, Arc::new(DictionaryOperator::Operator(op))); } - DictionarySource::Mysql(_) => { - return Err(ErrorCode::Unimplemented("Mysql source is unsupported")); + DictionarySource::Mysql(sql_source) => { + let mysql_pool = databend_common_base::runtime::block_on( + sqlx::MySqlPool::connect(&sql_source.connection_url), + )?; + let sql = format!( + "SELECT {} FROM {} WHERE {} = ? LIMIT 1", + &sql_source.value_field, &sql_source.table, &sql_source.key_field + ); + operators.insert(i, Arc::new(DictionaryOperator::Mysql((mysql_pool, sql)))); } } } @@ -75,59 +181,26 @@ impl TransformAsyncFunction { arg_indices: &[IndexType], data_type: &DataType, ) -> Result<()> { - let op = self.operators.get(&i).unwrap().clone(); - + let op: &Arc = self.operators.get(&i).unwrap(); // only support one key field. let arg_index = arg_indices[0]; let entry = data_block.get_by_offset(arg_index); let value = match &entry.value { Value::Scalar(scalar) => { - if let Scalar::String(key) = scalar { - let buffer = op.read(key).await; - match buffer { - Ok(res) => { - let value = - unsafe { String::from_utf8_unchecked(res.current().to_vec()) }; - Value::Scalar(Scalar::String(value)) - } - Err(e) => { - if e.kind() == opendal::ErrorKind::NotFound { - Value::Scalar(dict_arg.default_value.clone()) - } else { - return Err(ErrorCode::DictionarySourceError(format!( - "dictionary source error: {e}" - ))); - } - } - } - } else { - Value::Scalar(dict_arg.default_value.clone()) - } + let value = op + .dict_get(scalar.as_ref(), data_type) + .await? + .unwrap_or(dict_arg.default_value.clone()); + Value::Scalar(value) } Value::Column(column) => { let mut builder = ColumnBuilder::with_capacity(data_type, column.len()); - for scalar in column.iter() { - if let ScalarRef::String(key) = scalar { - let buffer = op.read(key).await; - match buffer { - Ok(res) => { - let value = - unsafe { String::from_utf8_unchecked(res.current().to_vec()) }; - builder.push(ScalarRef::String(value.as_str())); - } - Err(e) => { - if e.kind() == opendal::ErrorKind::NotFound { - builder.push(dict_arg.default_value.as_ref()); - } else { - return Err(ErrorCode::DictionarySourceError(format!( - "dictionary source error: {e}" - ))); - } - } - }; - } else { - builder.push(dict_arg.default_value.as_ref()); - } + for scalar_ref in column.iter() { + let value = op + .dict_get(scalar_ref, data_type) + .await? + .unwrap_or(dict_arg.default_value.clone()); + builder.push(value.as_ref()); } Value::Column(builder.build()) } diff --git a/src/query/sql/src/planner/binder/ddl/dictionary.rs b/src/query/sql/src/planner/binder/ddl/dictionary.rs index 8bbe682c614a..6b54019fd5ee 100644 --- a/src/query/sql/src/planner/binder/ddl/dictionary.rs +++ b/src/query/sql/src/planner/binder/ddl/dictionary.rs @@ -197,14 +197,10 @@ fn validate_mysql_fields(schema: &TableSchema) -> Result<()> { for field in schema.fields() { if !matches!( field.data_type().remove_nullable(), - TableDataType::Boolean - | TableDataType::String - | TableDataType::Number(_) - | TableDataType::Date - | TableDataType::Timestamp + TableDataType::Boolean | TableDataType::String | TableDataType::Number(_) ) { return Err(ErrorCode::BadArguments( - "The type of Mysql field must be in [`boolean`, `string`, `number`, `timestamp`, `date`]", + "The type of Mysql field must be in [`boolean`, `string`, `number`]", )); } } diff --git a/tests/sqllogictests/Cargo.toml b/tests/sqllogictests/Cargo.toml index 23ba001cfae4..d2a52f4c25cb 100644 --- a/tests/sqllogictests/Cargo.toml +++ b/tests/sqllogictests/Cargo.toml @@ -20,13 +20,16 @@ databend-common-base = { workspace = true } databend-common-exception = { workspace = true } env_logger = "0.10.0" futures-util = { workspace = true } +msql-srv = "0.11.0" mysql_async = { workspace = true } +mysql_common = "0.32.4" rand = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } serde = "1.0.150" serde_json = { workspace = true } sqllogictest = "0.21.0" +sqlparser = "0.50.0" thiserror = { workspace = true } tokio = { workspace = true } walkdir = { workspace = true } diff --git a/tests/sqllogictests/src/main.rs b/tests/sqllogictests/src/main.rs index 3bf96392c43a..7345e8cc8039 100644 --- a/tests/sqllogictests/src/main.rs +++ b/tests/sqllogictests/src/main.rs @@ -18,6 +18,7 @@ use std::path::Path; use std::time::Instant; use clap::Parser; +use databend_sqllogictests::mock_source::run_mysql_source; use databend_sqllogictests::mock_source::run_redis_source; use futures_util::stream; use futures_util::StreamExt; @@ -76,10 +77,8 @@ impl sqllogictest::AsyncDB for Databend { pub async fn main() -> Result<()> { env_logger::init(); - // Run a mock Redis server for dictionary tests. - databend_common_base::runtime::spawn(async move { - run_redis_source().await; - }); + // Run mock sources for dictionary test. + run_mock_sources(); let args = SqlLogicTestArgs::parse(); let handlers = match &args.handlers { @@ -103,6 +102,17 @@ pub async fn main() -> Result<()> { Ok(()) } +fn run_mock_sources() { + // Run a mock Redis server. + databend_common_base::runtime::spawn(async move { + run_redis_source().await; + }); + // Run a mock MySQL server. + databend_common_base::runtime::Thread::spawn(move || { + run_mysql_source(); + }); +} + async fn run_mysql_client() -> Result<()> { println!( "MySQL client starts to run with: {:?}", diff --git a/tests/sqllogictests/src/mock_source/mod.rs b/tests/sqllogictests/src/mock_source/mod.rs index 05040f92fa14..e20814f6fe76 100644 --- a/tests/sqllogictests/src/mock_source/mod.rs +++ b/tests/sqllogictests/src/mock_source/mod.rs @@ -12,5 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod mysql_source; mod redis_source; +pub use mysql_source::run_mysql_source; pub use redis_source::run_redis_source; diff --git a/tests/sqllogictests/src/mock_source/mysql_source.rs b/tests/sqllogictests/src/mock_source/mysql_source.rs new file mode 100644 index 000000000000..d8a314c278ff --- /dev/null +++ b/tests/sqllogictests/src/mock_source/mysql_source.rs @@ -0,0 +1,360 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::io; +use std::net::TcpListener; + +use msql_srv::Column; +use msql_srv::ColumnFlags; +use msql_srv::ColumnType; +use msql_srv::MysqlIntermediary; +use msql_srv::MysqlShim; +use msql_srv::QueryResultWriter; +use msql_srv::StatementMetaWriter; +use mysql_common::Value; +use sqlparser::ast::BinaryOperator; +use sqlparser::ast::Expr; +use sqlparser::ast::SelectItem; +use sqlparser::ast::SetExpr; +use sqlparser::ast::Statement; +use sqlparser::ast::TableFactor; +use sqlparser::dialect::MySqlDialect; +use sqlparser::parser::Parser; + +pub fn run_mysql_source() { + // Bind the listener to the address + let listener = TcpListener::bind("0.0.0.0:3106").unwrap(); + + let backend = Backend::create(); + loop { + if let Ok((socket, _)) = listener.accept() { + let backend = backend.clone(); + databend_common_base::runtime::Thread::spawn(move || { + MysqlIntermediary::run_on_tcp(backend, socket).unwrap(); + }); + } + } +} + +// mock MySQL backend with a table `user`. +// +// CREATE TABLE `user`( +// id INT, +// name VARCHAR(100), +// age SMALLINT UNSIGNED, +// salary DOUBLE, +// active BOOL +// ); +// +// +------+-------+------+---------+--------+ +// | id | name | age | salary | active | +// +------+-------+------+---------+--------+ +// | 1 | Alice | 24 | 100 | 1 | +// | 2 | Bob | 35 | 200.1 | 0 | +// | 3 | Lily | 41 | 1000.2 | 1 | +// | 4 | Tom | 55 | 3000.55 | 0 | +// +------+-------+------+---------+--------+ +#[derive(Debug, Clone)] +struct Backend { + table: String, + schema: Vec, + block: Vec>, + + prepared_id: u32, + prepared: HashMap, +} + +impl Backend { + fn create() -> Self { + let table = "user".to_string(); + + let schema = vec![ + Column { + table: "user".to_string(), + column: "id".to_string(), + coltype: ColumnType::MYSQL_TYPE_LONG, + colflags: ColumnFlags::empty(), + }, + Column { + table: "user".to_string(), + column: "name".to_string(), + coltype: ColumnType::MYSQL_TYPE_VAR_STRING, + colflags: ColumnFlags::empty(), + }, + Column { + table: "user".to_string(), + column: "age".to_string(), + coltype: ColumnType::MYSQL_TYPE_SHORT, + colflags: ColumnFlags::UNSIGNED_FLAG, + }, + Column { + table: "user".to_string(), + column: "salary".to_string(), + coltype: ColumnType::MYSQL_TYPE_DOUBLE, + colflags: ColumnFlags::empty(), + }, + Column { + table: "user".to_string(), + column: "active".to_string(), + coltype: ColumnType::MYSQL_TYPE_TINY, + colflags: ColumnFlags::empty(), + }, + ]; + + let block = vec![ + vec![Value::Int(1), Value::Int(2), Value::Int(3), Value::Int(4)], + vec![ + Value::Bytes("Alice".as_bytes().to_vec()), + Value::Bytes("Bob".as_bytes().to_vec()), + Value::Bytes("Lily".as_bytes().to_vec()), + Value::Bytes("Tom".as_bytes().to_vec()), + ], + vec![ + Value::UInt(24), + Value::UInt(35), + Value::UInt(41), + Value::UInt(55), + ], + vec![ + Value::Double(100.0), + Value::Double(200.1), + Value::Double(1000.20), + Value::Double(3000.55), + ], + vec![Value::Int(1), Value::Int(0), Value::Int(1), Value::Int(0)], + ]; + + Self { + table, + schema, + block, + + prepared_id: 0, + prepared: HashMap::new(), + } + } +} + +impl MysqlShim for Backend { + type Error = io::Error; + + fn on_prepare(&mut self, sql: &str, info: StatementMetaWriter) -> io::Result<()> { + let dialect = MySqlDialect {}; + let asts = Parser::parse_sql(&dialect, sql).unwrap(); + + let mut table = None; + let mut key = None; + let mut value = None; + + // Only support simple SQL select one field with an equal filter. + // for example: SELECT name FROM user WHERE id = 1; + if asts.len() == 1 { + if let Statement::Query(query) = &asts[0] { + if let SetExpr::Select(select) = *query.body.clone() { + if let SelectItem::UnnamedExpr(Expr::Identifier(ident)) = &select.projection[0] + { + value = Some(ident.value.clone()); + } + if let TableFactor::Table { name, .. } = &select.from[0].relation { + table = Some(name.0[0].value.clone()); + } + if let Some(Expr::BinaryOp { left, op, .. }) = &select.selection { + if op == &BinaryOperator::Eq { + if let Expr::Identifier(ident) = *left.clone() { + key = Some(ident.value.clone()); + } + } + } + } + } + } + + self.prepared_id += 1; + let prepared_id = self.prepared_id; + + if table.is_some() && key.is_some() && value.is_some() { + let table = table.unwrap(); + let key = key.unwrap(); + let value = value.unwrap(); + + let key_col = &self + .schema + .iter() + .enumerate() + .find(|&(_, f)| f.column == key); + + let value_col = &self + .schema + .iter() + .enumerate() + .find(|&(_, f)| f.column == value); + + if table == self.table && key_col.is_some() && value_col.is_some() { + let (key_idx, key_col) = key_col.unwrap(); + let (value_idx, value_col) = value_col.unwrap(); + + let prepared_idices = (key_idx, value_idx); + self.prepared.insert(prepared_id, prepared_idices); + + // keys are bind as string type. + let mut key_col = key_col.clone(); + key_col.coltype = ColumnType::MYSQL_TYPE_VAR_STRING; + + let key_cols = vec![key_col]; + let value_cols = vec![value_col.clone()]; + + // add key and value columns for execute. + return info.reply(prepared_id, key_cols.as_slice(), value_cols.as_slice()); + } + } + + // ingore other unsupported SQLs. + info.reply(prepared_id, &[], &[]) + } + + fn on_execute( + &mut self, + id: u32, + param_parser: msql_srv::ParamParser, + results: QueryResultWriter, + ) -> io::Result<()> { + let params: Vec<_> = param_parser + .into_iter() + .map(|p| p.value) + .collect::>(); + + // ignore if params are empty. + if params.len() != 1 { + return results.completed(0, 0); + } + let param = params[0]; + + let (key_idx, value_idx) = self.prepared.get(&id).unwrap(); + + let key_field = self.schema[*key_idx].clone(); + let key_column = self.block[*key_idx].clone(); + + let mut row = None; + // find matched row by compare key params. + match key_field.coltype { + ColumnType::MYSQL_TYPE_TINY + | ColumnType::MYSQL_TYPE_SHORT + | ColumnType::MYSQL_TYPE_LONG + | ColumnType::MYSQL_TYPE_LONGLONG => { + let param: &str = param.into(); + let key = param.parse::().unwrap(); + let key_param = Value::Int(key); + for (i, key) in key_column.iter().enumerate() { + if key == &key_param { + row = Some(i); + break; + } + } + } + ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_DOUBLE => { + let param: &str = param.into(); + let key = param.parse::().unwrap(); + let key_param = Value::Double(key); + for (i, key) in key_column.iter().enumerate() { + if key == &key_param { + row = Some(i); + break; + } + } + } + ColumnType::MYSQL_TYPE_VAR_STRING => { + let param: &str = param.into(); + let key = param.as_bytes().to_vec(); + let key_param = Value::Bytes(key); + for (i, key) in key_column.iter().enumerate() { + if key == &key_param { + row = Some(i); + break; + } + } + } + _ => {} + } + + // return NULL if params not matched. + if row.is_none() { + return results.completed(0, 0); + } + let row = row.unwrap(); + + let value_field = self.schema[*value_idx].clone(); + let value_column = self.block[*value_idx].clone(); + let value = value_column[row].clone(); + + let cols = vec![value_field.clone()]; + + let mut rw = results.start(&cols)?; + match value { + Value::Bytes(v) => { + rw.write_col(v)?; + } + Value::Int(v) => match value_field.coltype { + ColumnType::MYSQL_TYPE_TINY => { + rw.write_col(v as i8)?; + } + ColumnType::MYSQL_TYPE_SHORT => { + rw.write_col(v as i16)?; + } + ColumnType::MYSQL_TYPE_LONG => { + rw.write_col(v as i32)?; + } + ColumnType::MYSQL_TYPE_LONGLONG => { + rw.write_col(v)?; + } + _ => { + unreachable!() + } + }, + Value::UInt(v) => match value_field.coltype { + ColumnType::MYSQL_TYPE_TINY => { + rw.write_col(v as u8)?; + } + ColumnType::MYSQL_TYPE_SHORT => { + rw.write_col(v as u16)?; + } + ColumnType::MYSQL_TYPE_LONG => { + rw.write_col(v as u32)?; + } + ColumnType::MYSQL_TYPE_LONGLONG => { + rw.write_col(v)?; + } + _ => { + unreachable!() + } + }, + Value::Float(v) => { + rw.write_col(v)?; + } + Value::Double(v) => { + rw.write_col(v)?; + } + _ => { + rw.write_col("")?; + } + } + rw.finish() + } + + fn on_close(&mut self, _: u32) {} + + fn on_query(&mut self, _sql: &str, results: QueryResultWriter) -> io::Result<()> { + results.completed(0, 0) + } +} diff --git a/tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test b/tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test index 3e45de55b4ee..42376f086bc9 100644 --- a/tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test +++ b/tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test @@ -28,4 +28,54 @@ select dict_get(test, 'value', 'b') query T SELECT dict_get(d, 'value', 1) ---- -1_value \ No newline at end of file +1_value + +statement ok +create or replace table t2(id int, name string) + +statement ok +insert into t2 values(1, 'Alice'),(2, 'Bob'),(3, 'Lily'),(4, 'Tom'),(5, 'Tim') + +statement ok +CREATE OR REPLACE DICTIONARY d2(id int, name string, age uint16, salary float, active bool) PRIMARY KEY id SOURCE(mysql(host='localhost' port='3106' username='root' password='123456' db='test' table='user')); + +query TIFT +select dict_get(d2, 'name', 1), dict_get(d2, 'age', 1), dict_get(d2, 'salary', 1), dict_get(d2, 'active', 1) +---- +Alice 24 100.0 1 + +query TIFT +select dict_get(d2, 'name', 5), dict_get(d2, 'age', 5), dict_get(d2, 'salary', 5), dict_get(d2, 'active', 5) +---- +NULL NULL NULL NULL + +query ITIFT +select id, dict_get(d2, 'name', id), dict_get(d2, 'age', id), dict_get(d2, 'salary', id), dict_get(d2, 'active', id) from t2 +---- +1 Alice 24 100.0 1 +2 Bob 35 200.1 0 +3 Lily 41 1000.2 1 +4 Tom 55 3000.55 0 +5 NULL NULL NULL NULL + +statement ok +CREATE OR REPLACE DICTIONARY d3(id int, name string, age uint16, salary float, active bool) PRIMARY KEY name SOURCE(mysql(host='localhost' port='3106' username='root' password='123456' db='test' table='user')); + +query TIFT +select dict_get(d3, 'id', 'Alice'), dict_get(d3, 'age', 'Alice'), dict_get(d3, 'salary', 'Alice'), dict_get(d3, 'active', 'Alice') +---- +1 24 100.0 1 + +query TIFT +select dict_get(d3, 'id', 'Nancy'), dict_get(d3, 'age', 'Nancy'), dict_get(d3, 'salary', 'Nancy'), dict_get(d3, 'active', 'Nancy') +---- +NULL NULL NULL NULL + +query ITIFT +select name, dict_get(d3, 'id', name), dict_get(d3, 'age', name), dict_get(d3, 'salary', name), dict_get(d3, 'active', name) from t2 +---- +Alice 1 24 100.0 1 +Bob 2 35 200.1 0 +Lily 3 41 1000.2 1 +Tom 4 55 3000.55 0 +Tim NULL NULL NULL NULL \ No newline at end of file