Skip to content

Commit

Permalink
fix: wrong order by result (#16422)
Browse files Browse the repository at this point in the history
* fix: bind order by at last

* update

* fix

* fix m cte

* fix decorrate subquery

* fix join order

* fix project set

* fix lateral & lazy materialization

* fix native tests

* fix agg index

* fix

* fix

* fix ut

* fix cluster test
  • Loading branch information
xudong963 committed Sep 18, 2024
1 parent 1a904c0 commit cbbd4e3
Show file tree
Hide file tree
Showing 30 changed files with 754 additions and 597 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRefExt;
use databend_common_meta_app::schema::CreateOption;
use databend_common_sql::optimizer::agg_index;
use databend_common_sql::optimizer::OptimizerContext;
use databend_common_sql::optimizer::RecursiveOptimizer;
use databend_common_sql::optimizer::RuleID;
use databend_common_sql::optimizer::SExpr;
use databend_common_sql::optimizer::DEFAULT_REWRITE_RULES;
use databend_common_sql::plans::AggIndexInfo;
Expand Down Expand Up @@ -516,28 +516,32 @@ async fn test_query_rewrite_impl(format: &str) -> Result<()> {
let _ = interpreter.execute(ctx.clone()).await?;

let test_suites = get_test_suites();
for suite in test_suites {
let (query, _, metadata) = plan_sql(ctx.clone(), suite.query, true).await?;
for suite in test_suites.into_iter() {
let (index, _, _) = plan_sql(ctx.clone(), suite.index, false).await?;
let meta = metadata.read();
let base_columns = meta.columns_by_table_index(0);
let result = agg_index::try_rewrite(0, "t", &base_columns, &query, &[(
0,
suite.index.to_string(),
index,
)])?;
let (mut query, _, metadata) = plan_sql(ctx.clone(), suite.query, true).await?;
{
let mut metadata = metadata.write();
metadata.add_agg_indexes("default.default.t".to_string(), vec![(
0,
suite.index.to_string(),
index,
)]);
}
query.clear_applied_rules();
let result = RecursiveOptimizer::new(
&[RuleID::TryApplyAggIndex],
&OptimizerContext::new(ctx.clone(), metadata.clone()),
)
.run(&query)?;
let agg_index = find_push_down_index_info(&result)?;
assert_eq!(
suite.is_matched,
result.is_some(),
agg_index.is_some(),
"query: {}, index: {}",
suite.query,
suite.index
);
if let Some(result) = result {
let agg_index = find_push_down_index_info(&result)?;
assert!(agg_index.is_some());
let agg_index = agg_index.as_ref().unwrap();

if let Some(agg_index) = agg_index {
let selection = format_selection(agg_index);
assert_eq!(
suite.index_selection, selection,
Expand Down
12 changes: 3 additions & 9 deletions src/query/sql/src/planner/binder/bind_query/bind_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,12 @@ impl Binder {
)?;
}

s_expr = self.bind_projection(&mut from_context, &projections, &scalar_items, s_expr)?;

if !order_by.is_empty() {
s_expr = self.bind_order_by(
&from_context,
order_items,
&select_list,
&mut scalar_items,
s_expr,
)?;
s_expr = self.bind_order_by(&from_context, order_items, &select_list, s_expr)?;
}

s_expr = self.bind_projection(&mut from_context, &projections, &scalar_items, s_expr)?;

if from_context.have_async_func {
// rewrite async function to async function plan
let mut async_func_rewriter = AsyncFunctionRewriter::new(self.metadata.clone());
Expand Down
3 changes: 2 additions & 1 deletion src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ impl Binder {
bind_context.planning_agg_index = true;
let plan = if let Statement::Query(_) = &stmt {
let select_plan = self.bind_statement(bind_context, &stmt).await?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_planning_agg_index();
Ok(optimize(opt_ctx, select_plan).await?)
} else {
Err(ErrorCode::UnsupportedIndex("statement is not query"))
Expand Down
32 changes: 1 addition & 31 deletions src/query/sql/src/planner/binder/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -33,7 +32,6 @@ use crate::optimizer::SExpr;
use crate::planner::semantic::GroupingChecker;
use crate::plans::BoundColumnRef;
use crate::plans::CastExpr;
use crate::plans::EvalScalar;
use crate::plans::FunctionCall;
use crate::plans::LambdaFunc;
use crate::plans::ScalarExpr;
Expand All @@ -44,7 +42,6 @@ use crate::plans::UDFCall;
use crate::plans::VisitorMut as _;
use crate::BindContext;
use crate::IndexType;
use crate::WindowChecker;

#[derive(Debug)]
pub struct OrderItems {
Expand Down Expand Up @@ -186,12 +183,9 @@ impl Binder {
from_context: &BindContext,
order_by: OrderItems,
select_list: &SelectList<'_>,
scalar_items: &mut HashMap<IndexType, ScalarItem>,
child: SExpr,
) -> Result<SExpr> {
let mut order_by_items = Vec::with_capacity(order_by.items.len());
let mut scalars = vec![];

for order in order_by.items {
if from_context.in_grouping {
let mut group_checker = GroupingChecker::new(from_context);
Expand All @@ -206,23 +200,6 @@ impl Binder {
}
}

if let Entry::Occupied(entry) = scalar_items.entry(order.index) {
let need_eval = !matches!(entry.get().scalar, ScalarExpr::BoundColumnRef(_));
if need_eval {
// Remove the entry to avoid bind again in later process (bind_projection).
let (index, item) = entry.remove_entry();
let mut scalar = item.scalar;
if from_context.in_grouping {
let mut group_checker = GroupingChecker::new(from_context);
group_checker.visit(&mut scalar)?;
} else if !from_context.windows.window_functions.is_empty() {
let mut window_checker = WindowChecker::new(from_context);
window_checker.visit(&mut scalar)?;
}
scalars.push(ScalarItem { scalar, index });
}
}

let order_by_item = SortItem {
index: order.index,
asc: order.asc,
Expand All @@ -232,21 +209,14 @@ impl Binder {
order_by_items.push(order_by_item);
}

let mut new_expr = if !scalars.is_empty() {
let eval_scalar = EvalScalar { items: scalars };
SExpr::create_unary(Arc::new(eval_scalar.into()), Arc::new(child))
} else {
child
};

let sort_plan = Sort {
items: order_by_items,
limit: None,
after_exchange: None,
pre_projection: None,
window_partition: vec![],
};
new_expr = SExpr::create_unary(Arc::new(sort_plan.into()), Arc::new(new_expr));
let new_expr = SExpr::create_unary(Arc::new(sort_plan.into()), Arc::new(child));
Ok(new_expr)
}

Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ impl Dataframe {
&self.bind_context,
order_items,
&select_list,
&mut scalar_items,
self.s_expr,
)?;

Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/planner/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ impl Metadata {
self.agg_indexes.get(table).map(|v| v.as_slice())
}

pub fn has_agg_indexes(&self) -> bool {
!self.agg_indexes.is_empty()
}

#[allow(clippy::too_many_arguments)]
pub fn add_table(
&mut self,
Expand Down
13 changes: 11 additions & 2 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct OptimizerContext {
enable_distributed_optimization: bool,
enable_join_reorder: bool,
enable_dphyp: bool,
planning_agg_index: bool,
#[educe(Debug(ignore))]
sample_executor: Option<Arc<dyn QuerySampleExecutor>>,
}
Expand All @@ -81,6 +82,7 @@ impl OptimizerContext {
enable_join_reorder: true,
enable_dphyp: true,
sample_executor: None,
planning_agg_index: false,
}
}

Expand All @@ -106,6 +108,11 @@ impl OptimizerContext {
self.sample_executor = sample_executor;
self
}

pub fn with_planning_agg_index(mut self) -> Self {
self.planning_agg_index = true;
self
}
}

/// A recursive optimizer that will apply the given rules recursively.
Expand Down Expand Up @@ -410,8 +417,10 @@ pub async fn optimize_query(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) -
}
};

s_expr =
RecursiveOptimizer::new([RuleID::EliminateEvalScalar].as_slice(), opt_ctx).run(&s_expr)?;
if !opt_ctx.planning_agg_index {
s_expr = RecursiveOptimizer::new([RuleID::EliminateEvalScalar].as_slice(), opt_ctx)
.run(&s_expr)?;
}

Ok(s_expr)
}
Expand Down
6 changes: 5 additions & 1 deletion src/query/sql/src/planner/optimizer/rule/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::rewrite::RulePushDownFilterWindow;
use super::rewrite::RulePushDownLimitAggregate;
use super::rewrite::RulePushDownLimitEvalScalar;
use super::rewrite::RulePushDownPrewhere;
use super::rewrite::RulePushDownSortEvalScalar;
use super::rewrite::RuleTryApplyAggIndex;
use crate::optimizer::rule::rewrite::RuleEliminateFilter;
use crate::optimizer::rule::rewrite::RuleEliminateSort;
Expand Down Expand Up @@ -57,7 +58,7 @@ pub const MAX_PUSH_DOWN_LIMIT: usize = 10000;
impl RuleFactory {
pub fn create_rule(id: RuleID, metadata: MetadataRef) -> Result<RulePtr> {
match id {
RuleID::EliminateEvalScalar => Ok(Box::new(RuleEliminateEvalScalar::new())),
RuleID::EliminateEvalScalar => Ok(Box::new(RuleEliminateEvalScalar::new(metadata))),
RuleID::PushDownFilterUnion => Ok(Box::new(RulePushDownFilterUnion::new())),
RuleID::PushDownFilterEvalScalar => Ok(Box::new(RulePushDownFilterEvalScalar::new())),
RuleID::PushDownFilterJoin => Ok(Box::new(RulePushDownFilterJoin::new(metadata))),
Expand All @@ -68,6 +69,9 @@ impl RuleFactory {
RuleID::PushDownLimitUnion => Ok(Box::new(RulePushDownLimitUnion::new())),
RuleID::PushDownLimitScan => Ok(Box::new(RulePushDownLimitScan::new())),
RuleID::PushDownSortScan => Ok(Box::new(RulePushDownSortScan::new())),
RuleID::PushDownSortEvalScalar => {
Ok(Box::new(RulePushDownSortEvalScalar::new(metadata)))
}
RuleID::PushDownLimitOuterJoin => Ok(Box::new(RulePushDownLimitOuterJoin::new())),
RuleID::PushDownLimitEvalScalar => Ok(Box::new(RulePushDownLimitEvalScalar::new())),
RuleID::PushDownLimitSort => {
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod rule_push_down_limit_sort;
mod rule_push_down_limit_union;
mod rule_push_down_limit_window;
mod rule_push_down_prewhere;
mod rule_push_down_sort_expression;
mod rule_push_down_sort_scan;
mod rule_semi_to_inner_join;
mod rule_split_aggregate;
Expand Down Expand Up @@ -70,6 +71,7 @@ pub use rule_push_down_limit_sort::RulePushDownLimitSort;
pub use rule_push_down_limit_union::RulePushDownLimitUnion;
pub use rule_push_down_limit_window::RulePushDownLimitWindow;
pub use rule_push_down_prewhere::RulePushDownPrewhere;
pub use rule_push_down_sort_expression::RulePushDownSortEvalScalar;
pub use rule_push_down_sort_scan::RulePushDownSortScan;
pub use rule_semi_to_inner_join::RuleSemiToInnerJoin;
pub use rule_split_aggregate::RuleSplitAggregate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ use crate::optimizer::extract::Matcher;
use crate::optimizer::rule::Rule;
use crate::optimizer::rule::RuleID;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::plans::EvalScalar;
use crate::plans::Operator;
use crate::plans::RelOp;
use crate::ColumnSet;
use crate::MetadataRef;

pub struct RuleEliminateEvalScalar {
id: RuleID,
matchers: Vec<Matcher>,
metadata: MetadataRef,
}

impl RuleEliminateEvalScalar {
pub fn new() -> Self {
pub fn new(metadata: MetadataRef) -> Self {
Self {
id: RuleID::EliminateEvalScalar,
// EvalScalar
Expand All @@ -38,6 +43,7 @@ impl RuleEliminateEvalScalar {
op_type: RelOp::EvalScalar,
children: vec![Matcher::Leaf],
}],
metadata,
}
}
}
Expand All @@ -48,13 +54,29 @@ impl Rule for RuleEliminateEvalScalar {
}

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let eval_scalar: EvalScalar = s_expr.plan().clone().try_into()?;

// Eliminate empty EvalScalar
let eval_scalar: EvalScalar = s_expr.plan().clone().try_into()?;
if eval_scalar.items.is_empty() {
state.add_result(s_expr.child(0)?.clone());
return Ok(());
}

if self.metadata.read().has_agg_indexes() {
return Ok(());
}

let child = s_expr.child(0)?;
let child_output_cols = child
.plan()
.derive_relational_prop(&RelExpr::with_s_expr(child))?
.output_columns
.clone();
let eval_scalar_output_cols: ColumnSet =
eval_scalar.items.iter().map(|x| x.index).collect();
if eval_scalar_output_cols.is_subset(&child_output_cols) {
state.add_result(s_expr.child(0)?.clone());
return Ok(());
}
Ok(())
}

Expand Down
Loading

0 comments on commit cbbd4e3

Please sign in to comment.