Skip to content

Commit

Permalink
fix(query): fix planner cache cause modify async function does not ta…
Browse files Browse the repository at this point in the history
…ke effect (#16510)

* fix(query): fix planner cache cause modify async function does not take effect

* add tests

* fix

* replace tantivy, ethnum to datafuse-extras
  • Loading branch information
b41sh committed Sep 24, 2024
1 parent dc5501c commit 110177e
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 28 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ serde_with = { version = "3.8.1" }
serfig = "0.1.0"
sled = { version = "0.34", default-features = false }
stream-more = "0.1.3"
tantivy = { git = "https://github.com/b41sh/tantivy", rev = "37aeac0" }
tantivy = "0.22.0"
tantivy-common = "0.7.0"
tantivy-jieba = "0.11.0"
thiserror = { version = "1" }
tikv-jemalloc-ctl = { version = "0.5.0", features = ["use_std"] }
tokio = { version = "1.35.0", features = ["full"] }
Expand Down Expand Up @@ -401,9 +403,12 @@ async-recursion = { git = "https://github.com/zhang2014/async-recursion.git", re
backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "72265be" }
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "57795da" }
ethnum = { git = "https://github.com/ariesdevil/ethnum-rs", rev = "4cb05f1" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" }
recursive = { git = "https://github.com/zhang2014/recursive.git", rev = "6af35a1" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "37aeac0" }
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "37aeac0", package = "tantivy-common" }
tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "124a8fc" }
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }
9 changes: 9 additions & 0 deletions src/query/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ pub fn is_builtin_function(name: &str) -> bool {
|| ASYNC_FUNCTIONS.contains(&name)
}

// The plan of search function, async function and udf contains some arguments defined in meta,
// which may be modified by user at any time. Those functions are not not suitable for caching.
pub fn is_cacheable_function(name: &str) -> bool {
BUILTIN_FUNCTIONS.contains(name)
|| AggregateFunctionFactory::instance().contains(name)
|| GENERAL_WINDOW_FUNCTIONS.contains(&name)
|| GENERAL_LAMBDA_FUNCTIONS.contains(&name)
}

#[ctor]
pub static BUILTIN_FUNCTIONS: FunctionRegistry = builtin_functions();

Expand Down
4 changes: 1 addition & 3 deletions src/query/sql/src/planner/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,9 +688,7 @@ impl Binder {
let f = |scalar: &ScalarExpr| {
matches!(
scalar,
ScalarExpr::AggregateFunction(_)
| ScalarExpr::WindowFunction(_)
| ScalarExpr::AsyncFunctionCall(_)
ScalarExpr::AggregateFunction(_) | ScalarExpr::WindowFunction(_)
)
};

Expand Down
5 changes: 0 additions & 5 deletions src/query/sql/src/planner/binder/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ impl Binder {
ScalarExpr::WindowFunction(win) => {
find_replaced_window_function(window_info, win, &item.alias).unwrap()
}
ScalarExpr::AsyncFunctionCall(async_func) => self.create_derived_column_binding(
async_func.display_name.clone(),
async_func.return_type.as_ref().clone(),
Some(item.scalar.clone()),
),
_ => self.create_derived_column_binding(
item.alias.clone(),
item.scalar.data_type()?,
Expand Down
4 changes: 1 addition & 3 deletions src/query/sql/src/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ impl Binder {
let f = |scalar: &ScalarExpr| {
matches!(
scalar,
ScalarExpr::AggregateFunction(_)
| ScalarExpr::WindowFunction(_)
| ScalarExpr::AsyncFunctionCall(_)
ScalarExpr::AggregateFunction(_) | ScalarExpr::WindowFunction(_)
)
};

Expand Down
6 changes: 4 additions & 2 deletions src/query/sql/src/planner/planner_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_ast::ast::TableReference;
use databend_common_catalog::table_context::TableContext;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchemaRef;
use databend_common_functions::is_cacheable_function;
use databend_common_settings::ChangeValue;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CacheValue;
Expand Down Expand Up @@ -162,8 +163,9 @@ impl TableRefVisitor {
return;
}

// If the function is score, we should not cache the plan
if func.name.name.to_lowercase() == "score" {
let func_name = func.name.name.to_lowercase();
// If the function is not suitable for caching, we should not cache the plan
if !is_cacheable_function(&func_name) || func_name == "score" {
self.cache_miss = true;
}
}
Expand Down
29 changes: 29 additions & 0 deletions src/query/sql/src/planner/semantic/async_function_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,29 @@ impl AsyncFunctionRewriter {
let new_expr = SExpr::create_unary(Arc::new(plan.into()), child_expr);
Ok(new_expr)
}
RelOperator::Filter(mut plan) => {
for scalar in &mut plan.predicates {
self.visit(scalar)?;
}
let child_expr = self.create_async_func_expr(s_expr.children[0].clone());
let new_expr = SExpr::create_unary(Arc::new(plan.into()), child_expr);
Ok(new_expr)
}
RelOperator::Mutation(mut plan) => {
for matched_evaluator in plan.matched_evaluators.iter_mut() {
if let Some(condition) = matched_evaluator.condition.as_mut() {
self.visit(condition)?;
}
if let Some(update) = matched_evaluator.update.as_mut() {
for (_, scalar) in update.iter_mut() {
self.visit(scalar)?;
}
}
}
let child_expr = self.create_async_func_expr(s_expr.children[0].clone());
let new_expr = SExpr::create_unary(Arc::new(plan.into()), child_expr);
Ok(new_expr)
}
_ => Ok(s_expr),
}
}
Expand Down Expand Up @@ -136,6 +159,12 @@ impl<'a> VisitorMut<'a> for AsyncFunctionRewriter {
}

fn visit_async_function_call(&mut self, async_func: &'a mut AsyncFunctionCall) -> Result<()> {
if self
.async_functions_map
.contains_key(&async_func.display_name)
{
return Ok(());
}
for (i, arg) in async_func.arguments.iter_mut().enumerate() {
self.visit(arg)?;

Expand Down
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/semantic/udf_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ impl<'a> VisitorMut<'a> for UdfRewriter {
if !udf.udf_type.match_type(self.script_udf) {
return Ok(());
}
if self.udf_functions_map.contains_key(&udf.display_name) {
return Ok(());
}

let mut udf_arguments = Vec::with_capacity(udf.arguments.len());

Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/common/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ parquet = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = { workspace = true }
tantivy-common = { git = "https://github.com/b41sh/tantivy", rev = "37aeac0", package = "tantivy-common" }
tantivy-common = { workspace = true }
tantivy-fst = "0.5"
thiserror = { workspace = true }
xorfilter-rs = { workspace = true, features = ["cbordata"] }
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ siphasher = "0.3.10"
sys-info = "0.9"
tantivy = { workspace = true }
tantivy-fst = "0.5"
tantivy-jieba = { git = "https://github.com/b41sh/tantivy-jieba", rev = "af84361" }
tantivy-jieba = { workspace = true }
thrift = "0.17.0"
typetag = { workspace = true }
uuid = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ select id, dict_get(d2, 'name', id), dict_get(d2, 'age', id), dict_get(d2, 'sala
4 Tom 55 3000.55 0
5 NULL NULL NULL NULL

query ITI
select id, name, dict_get(d2, 'age', id) as age from t2 where age > 35
----
3 Lily 41
4 Tom 55

statement ok
CREATE OR REPLACE DICTIONARY d3(id int, name string, age uint16, salary float, active bool) PRIMARY KEY name SOURCE(mysql(host='localhost' port='3106' username='root' password='123456' db='test' table='user'));

Expand Down

0 comments on commit 110177e

Please sign in to comment.