From 4e9f2d5260ecaca261076d2e0a626d3f595ff5a5 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Mon, 15 Apr 2024 21:13:07 +0800 Subject: [PATCH 1/4] Move conversion of FIRST/LAST Aggregate function to independent physical optimizer rule (#10061) * move out the ordering ruel Signed-off-by: jayzhan211 * introduce rule Signed-off-by: jayzhan211 * revert test result Signed-off-by: jayzhan211 * pass mulit order test Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * with new childes Signed-off-by: jayzhan211 * revert slt Signed-off-by: jayzhan211 * revert back Signed-off-by: jayzhan211 * rm rewrite in new child Signed-off-by: jayzhan211 * backup Signed-off-by: jayzhan211 * only move conversion to optimizer Signed-off-by: jayzhan211 * find test that do reverse Signed-off-by: jayzhan211 * add test for first and last Signed-off-by: jayzhan211 * pass all test Signed-off-by: jayzhan211 * upd test Signed-off-by: jayzhan211 * upd test Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * add aggregate test Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * final draft Signed-off-by: jayzhan211 * cleanup again Signed-off-by: jayzhan211 * pull out finer ordering code and reuse Signed-off-by: jayzhan211 * clippy Signed-off-by: jayzhan211 * remove finer in optimize rule Signed-off-by: jayzhan211 * add comments and clenaup Signed-off-by: jayzhan211 * rename fun Signed-off-by: jayzhan211 * rename fun Signed-off-by: jayzhan211 * fmt Signed-off-by: jayzhan211 * avoid unnecessary recursion and rename Signed-off-by: jayzhan211 * remove unnecessary rule Signed-off-by: jayzhan211 * fix merge Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 --- .../physical_optimizer/convert_first_last.rs | 260 ++++++++++++++++++ datafusion/core/src/physical_optimizer/mod.rs | 1 + .../core/src/physical_optimizer/optimizer.rs | 3 + .../core/tests/data/convert_first_last.csv | 11 + .../physical-plan/src/aggregates/mod.rs | 117 ++++---- datafusion/physical-plan/src/tree_node.rs | 1 + datafusion/physical-plan/src/windows/mod.rs | 2 +- .../sqllogictest/test_files/aggregate.slt | 42 +++ .../sqllogictest/test_files/explain.slt | 3 + .../sqllogictest/test_files/group_by.slt | 4 +- 10 files changed, 372 insertions(+), 72 deletions(-) create mode 100644 datafusion/core/src/physical_optimizer/convert_first_last.rs create mode 100644 datafusion/core/tests/data/convert_first_last.csv diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs b/datafusion/core/src/physical_optimizer/convert_first_last.rs new file mode 100644 index 000000000000..4102313d3126 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::Result; +use datafusion_common::{ + config::ConfigOptions, + tree_node::{Transformed, TransformedResult, TreeNode}, +}; +use datafusion_physical_expr::expressions::{FirstValue, LastValue}; +use datafusion_physical_expr::{ + equivalence::ProjectionMapping, reverse_order_bys, AggregateExpr, + EquivalenceProperties, PhysicalSortRequirement, +}; +use datafusion_physical_plan::aggregates::concat_slices; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode}, + ExecutionPlan, ExecutionPlanProperties, InputOrderMode, +}; +use std::sync::Arc; + +use datafusion_physical_plan::windows::get_ordered_partition_by_indices; + +use super::PhysicalOptimizerRule; + +/// The optimizer rule check the ordering requirements of the aggregate expressions. +/// And convert between FIRST_VALUE and LAST_VALUE if possible. +/// For example, If we have an ascending values and we want LastValue from the descending requirement, +/// it is equivalent to FirstValue with the current ascending ordering. +/// +/// The concrete example is that, says we have values c1 with [1, 2, 3], which is an ascending order. +/// If we want LastValue(c1 order by desc), which is the first value of reversed c1 [3, 2, 1], +/// so we can convert the aggregate expression to FirstValue(c1 order by asc), +/// since the current ordering is already satisfied, it saves our time! +#[derive(Default)] +pub struct OptimizeAggregateOrder {} + +impl OptimizeAggregateOrder { + pub fn new() -> Self { + Self::default() + } +} + +impl PhysicalOptimizerRule for OptimizeAggregateOrder { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_up(&get_common_requirement_of_aggregate_input) + .data() + } + + fn name(&self) -> &str { + "OptimizeAggregateOrder" + } + + fn schema_check(&self) -> bool { + true + } +} + +fn get_common_requirement_of_aggregate_input( + plan: Arc, +) -> Result>> { + if let Some(aggr_exec) = plan.as_any().downcast_ref::() { + let input = aggr_exec.input(); + let mut aggr_expr = try_get_updated_aggr_expr_from_child(aggr_exec); + let group_by = aggr_exec.group_by(); + let mode = aggr_exec.mode(); + + let input_eq_properties = input.equivalence_properties(); + let groupby_exprs = group_by.input_exprs(); + // If existing ordering satisfies a prefix of the GROUP BY expressions, + // prefix requirements with this section. In this case, aggregation will + // work more efficiently. + let indices = get_ordered_partition_by_indices(&groupby_exprs, input); + let requirement = indices + .iter() + .map(|&idx| PhysicalSortRequirement { + expr: groupby_exprs[idx].clone(), + options: None, + }) + .collect::>(); + + try_convert_first_last_if_better( + &requirement, + &mut aggr_expr, + input_eq_properties, + )?; + + let required_input_ordering = (!requirement.is_empty()).then_some(requirement); + + let input_order_mode = + if indices.len() == groupby_exprs.len() && !indices.is_empty() { + InputOrderMode::Sorted + } else if !indices.is_empty() { + InputOrderMode::PartiallySorted(indices) + } else { + InputOrderMode::Linear + }; + let projection_mapping = + ProjectionMapping::try_new(group_by.expr(), &input.schema())?; + + let cache = AggregateExec::compute_properties( + input, + plan.schema().clone(), + &projection_mapping, + mode, + &input_order_mode, + ); + + let aggr_exec = aggr_exec.new_with_aggr_expr_and_ordering_info( + required_input_ordering, + aggr_expr, + cache, + input_order_mode, + ); + + Ok(Transformed::yes( + Arc::new(aggr_exec) as Arc + )) + } else { + Ok(Transformed::no(plan)) + } +} + +/// In `create_initial_plan` for LogicalPlan::Aggregate, we have a nested AggregateExec where the first layer +/// is in Partial mode and the second layer is in Final or Finalpartitioned mode. +/// If the first layer of aggregate plan is transformed, we need to update the child of the layer with final mode. +/// Therefore, we check it and get the updated aggregate expressions. +/// +/// If AggregateExec is created from elsewhere, we skip the check and return the original aggregate expressions. +fn try_get_updated_aggr_expr_from_child( + aggr_exec: &AggregateExec, +) -> Vec> { + let input = aggr_exec.input(); + if aggr_exec.mode() == &AggregateMode::Final + || aggr_exec.mode() == &AggregateMode::FinalPartitioned + { + // Some aggregators may be modified during initialization for + // optimization purposes. For example, a FIRST_VALUE may turn + // into a LAST_VALUE with the reverse ordering requirement. + // To reflect such changes to subsequent stages, use the updated + // `AggregateExpr`/`PhysicalSortExpr` objects. + // + // The bottom up transformation is the mirror of LogicalPlan::Aggregate creation in [create_initial_plan] + if let Some(c_aggr_exec) = input.as_any().downcast_ref::() { + if c_aggr_exec.mode() == &AggregateMode::Partial { + // If the input is an AggregateExec in Partial mode, then the + // input is a CoalescePartitionsExec. In this case, the + // AggregateExec is the second stage of aggregation. The + // requirements of the second stage are the requirements of + // the first stage. + return c_aggr_exec.aggr_expr().to_vec(); + } + } + } + + aggr_exec.aggr_expr().to_vec() +} + +/// Get the common requirement that satisfies all the aggregate expressions. +/// +/// # Parameters +/// +/// - `aggr_exprs`: A slice of `Arc` containing all the +/// aggregate expressions. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the +/// physical GROUP BY expression. +/// - `eq_properties`: A reference to an `EquivalenceProperties` instance +/// representing equivalence properties for ordering. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the +/// mode of aggregation. +/// +/// # Returns +/// +/// A `LexRequirement` instance, which is the requirement that satisfies all the +/// aggregate requirements. Returns an error in case of conflicting requirements. +/// +/// Similar to the one in datafusion/physical-plan/src/aggregates/mod.rs, but this +/// function care only the possible conversion between FIRST_VALUE and LAST_VALUE +fn try_convert_first_last_if_better( + prefix_requirement: &[PhysicalSortRequirement], + aggr_exprs: &mut [Arc], + eq_properties: &EquivalenceProperties, +) -> Result<()> { + for aggr_expr in aggr_exprs.iter_mut() { + let aggr_req = aggr_expr.order_bys().unwrap_or(&[]); + let reverse_aggr_req = reverse_order_bys(aggr_req); + let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req); + let reverse_aggr_req = + PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req); + + if let Some(first_value) = aggr_expr.as_any().downcast_ref::() { + let mut first_value = first_value.clone(); + + if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &aggr_req, + )) { + first_value = first_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(first_value) as _; + } else if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &reverse_aggr_req, + )) { + // Converting to LAST_VALUE enables more efficient execution + // given the existing ordering: + let mut last_value = first_value.convert_to_last(); + last_value = last_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(last_value) as _; + } else { + // Requirement is not satisfied with existing ordering. + first_value = first_value.with_requirement_satisfied(false); + *aggr_expr = Arc::new(first_value) as _; + } + continue; + } + if let Some(last_value) = aggr_expr.as_any().downcast_ref::() { + let mut last_value = last_value.clone(); + if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &aggr_req, + )) { + last_value = last_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(last_value) as _; + } else if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &reverse_aggr_req, + )) { + // Converting to FIRST_VALUE enables more efficient execution + // given the existing ordering: + let mut first_value = last_value.convert_to_first(); + first_value = first_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(first_value) as _; + } else { + // Requirement is not satisfied with existing ordering. + last_value = last_value.with_requirement_satisfied(false); + *aggr_expr = Arc::new(last_value) as _; + } + continue; + } + } + + Ok(()) +} diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index e990fead610d..c80668c6da74 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -24,6 +24,7 @@ pub mod aggregate_statistics; pub mod coalesce_batches; pub mod combine_partial_final_agg; +mod convert_first_last; pub mod enforce_distribution; pub mod enforce_sorting; pub mod join_selection; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 48da68cb2e37..08cbf68fa617 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use super::convert_first_last::OptimizeAggregateOrder; use super::projection_pushdown::ProjectionPushdown; use crate::config::ConfigOptions; use crate::physical_optimizer::aggregate_statistics::AggregateStatistics; @@ -101,6 +102,8 @@ impl PhysicalOptimizer { // Note that one should always run this rule after running the EnforceDistribution rule // as the latter may break local sorting requirements. Arc::new(EnforceSorting::new()), + // Run once after the local sorting requirement is changed + Arc::new(OptimizeAggregateOrder::new()), // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. Arc::new(ProjectionPushdown::new()), // The CoalesceBatches rule will not influence the distribution and ordering of the diff --git a/datafusion/core/tests/data/convert_first_last.csv b/datafusion/core/tests/data/convert_first_last.csv new file mode 100644 index 000000000000..059b631e5711 --- /dev/null +++ b/datafusion/core/tests/data/convert_first_last.csv @@ -0,0 +1,11 @@ +c1,c2,c3 +1,9,0 +2,8,1 +3,7,2 +4,6,3 +5,5,4 +6,4,5 +7,3,6 +8,2,7 +9,1,8 +10,0,9 \ No newline at end of file diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 98c44e23c6c7..ba9a6b1be0ef 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -39,12 +39,15 @@ use datafusion_common::stats::Precision; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Accumulator; +use datafusion_physical_expr::aggregate::is_order_sensitive; +use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ - aggregate::is_order_sensitive, - equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::{Column, FirstValue, LastValue, Max, Min, UnKnownColumn}, - physical_exprs_contains, reverse_order_bys, AggregateExpr, EquivalenceProperties, - LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortRequirement, + equivalence::ProjectionMapping, + expressions::{Column, Max, Min, UnKnownColumn}, + AggregateExpr, LexRequirement, PhysicalExpr, +}; +use datafusion_physical_expr::{ + physical_exprs_contains, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; use itertools::Itertools; @@ -269,6 +272,36 @@ pub struct AggregateExec { } impl AggregateExec { + /// Function used in `ConvertFirstLast` optimizer rule, + /// where we need parts of the new value, others cloned from the old one + pub fn new_with_aggr_expr_and_ordering_info( + &self, + required_input_ordering: Option, + aggr_expr: Vec>, + cache: PlanProperties, + input_order_mode: InputOrderMode, + ) -> Self { + Self { + aggr_expr, + required_input_ordering, + metrics: ExecutionPlanMetricsSet::new(), + input_order_mode, + cache, + // clone the rest of the fields + mode: self.mode, + group_by: self.group_by.clone(), + filter_expr: self.filter_expr.clone(), + limit: self.limit, + input: self.input.clone(), + schema: self.schema.clone(), + input_schema: self.input_schema.clone(), + } + } + + pub fn cache(&self) -> &PlanProperties { + &self.cache + } + /// Create a new hash aggregate execution plan pub fn try_new( mode: AggregateMode, @@ -336,8 +369,7 @@ impl AggregateExec { }) .collect::>(); - let req = get_aggregate_exprs_requirement( - &new_requirement, + let req = get_finer_aggregate_exprs_requirement( &mut aggr_expr, &group_by, input_eq_properties, @@ -369,6 +401,7 @@ impl AggregateExec { &mode, &input_order_mode, ); + Ok(AggregateExec { mode, group_by, @@ -507,7 +540,7 @@ impl AggregateExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( + pub fn compute_properties( input: &Arc, schema: SchemaRef, projection_mapping: &ProjectionMapping, @@ -683,9 +716,9 @@ impl ExecutionPlan for AggregateExec { children[0].clone(), self.input_schema.clone(), self.schema.clone(), - //self.original_schema.clone(), )?; me.limit = self.limit; + Ok(Arc::new(me)) } @@ -870,7 +903,7 @@ fn finer_ordering( } /// Concatenates the given slices. -fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { +pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { [lhs, rhs].concat() } @@ -891,8 +924,7 @@ fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. -fn get_aggregate_exprs_requirement( - prefix_requirement: &[PhysicalSortRequirement], +fn get_finer_aggregate_exprs_requirement( aggr_exprs: &mut [Arc], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, @@ -900,60 +932,6 @@ fn get_aggregate_exprs_requirement( ) -> Result { let mut requirement = vec![]; for aggr_expr in aggr_exprs.iter_mut() { - let aggr_req = aggr_expr.order_bys().unwrap_or(&[]); - let reverse_aggr_req = reverse_order_bys(aggr_req); - let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req); - let reverse_aggr_req = - PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req); - - if let Some(first_value) = aggr_expr.as_any().downcast_ref::() { - let mut first_value = first_value.clone(); - if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &aggr_req, - )) { - first_value = first_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(first_value) as _; - } else if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &reverse_aggr_req, - )) { - // Converting to LAST_VALUE enables more efficient execution - // given the existing ordering: - let mut last_value = first_value.convert_to_last(); - last_value = last_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(last_value) as _; - } else { - // Requirement is not satisfied with existing ordering. - first_value = first_value.with_requirement_satisfied(false); - *aggr_expr = Arc::new(first_value) as _; - } - continue; - } - if let Some(last_value) = aggr_expr.as_any().downcast_ref::() { - let mut last_value = last_value.clone(); - if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &aggr_req, - )) { - last_value = last_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(last_value) as _; - } else if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &reverse_aggr_req, - )) { - // Converting to FIRST_VALUE enables more efficient execution - // given the existing ordering: - let mut first_value = last_value.convert_to_first(); - first_value = first_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(first_value) as _; - } else { - // Requirement is not satisfied with existing ordering. - last_value = last_value.with_requirement_satisfied(false); - *aggr_expr = Arc::new(last_value) as _; - } - continue; - } if let Some(finer_ordering) = finer_ordering(&requirement, aggr_expr, group_by, eq_properties, agg_mode) { @@ -1003,6 +981,7 @@ fn get_aggregate_exprs_requirement( continue; } } + // Neither the existing requirement and current aggregate requirement satisfy the other, this means // requirements are conflicting. Currently, we do not support // conflicting requirements. @@ -1010,6 +989,7 @@ fn get_aggregate_exprs_requirement( "Conflicting ordering requirements in aggregate functions is not supported" ); } + Ok(PhysicalSortRequirement::from_sort_exprs(&requirement)) } @@ -1235,7 +1215,7 @@ mod tests { use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{ - lit, ApproxDistinct, Count, LastValue, Median, OrderSensitiveArrayAgg, + lit, ApproxDistinct, Count, FirstValue, LastValue, Median, OrderSensitiveArrayAgg, }; use datafusion_physical_expr::{ reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr, @@ -2171,8 +2151,7 @@ mod tests { }) .collect::>(); let group_by = PhysicalGroupBy::new_single(vec![]); - let res = get_aggregate_exprs_requirement( - &[], + let res = get_finer_aggregate_exprs_requirement( &mut aggr_exprs, &group_by, &eq_properties, diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 52a52f81bdaf..46460cbb6684 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -64,6 +64,7 @@ impl PlanContext { pub fn update_plan_from_children(mut self) -> Result { let children_plans = self.children.iter().map(|c| c.plan.clone()).collect(); self.plan = with_new_children_if_necessary(self.plan, children_plans)?; + Ok(self) } } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index c5c845614c7b..e01ee06a12b8 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -373,7 +373,7 @@ pub(crate) fn calc_requirements< /// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used, /// this vector will be [1, 0]. It means that when we iterate b, a columns with the order [1, 0] /// resulting vector (a, b) is a preset of the existing ordering (a, b, c). -pub(crate) fn get_ordered_partition_by_indices( +pub fn get_ordered_partition_by_indices( partition_by_exprs: &[Arc], input: &Arc, ) -> Vec { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 30d5c7243f26..3d24fe3888d7 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -3455,3 +3455,45 @@ SELECT LAST_VALUE(column1 ORDER BY column2 DESC) IGNORE NULLS FROM t; statement ok DROP TABLE t; + +# Test Convert FirstLast optimizer rule +statement ok +CREATE EXTERNAL TABLE convert_first_last_table ( +c1 INT NOT NULL, +c2 INT NOT NULL, +c3 INT NOT NULL +) +STORED AS CSV +WITH HEADER ROW +WITH ORDER (c1 ASC) +WITH ORDER (c2 DESC) +WITH ORDER (c3 ASC) +LOCATION '../core/tests/data/convert_first_last.csv'; + +# test first to last, the result does not show difference, we need to check the conversion by `explain` +query TT +explain select first_value(c1 order by c3 desc) from convert_first_last_table; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]] +02)--TableScan: convert_first_last_table projection=[c1, c3] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], has_header=true + +# test last to first +query TT +explain select last_value(c1 order by c2 asc) from convert_first_last_table; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]] +02)--TableScan: convert_first_last_table projection=[c1, c2] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], has_header=true diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index c357391e70b5..d8c8dcd41b6a 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -248,6 +248,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE +physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true @@ -304,6 +305,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE +physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements @@ -340,6 +342,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE +physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 1acdcde9c8ee..5c5bf58dd049 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2805,7 +2805,7 @@ logical_plan 04)------TableScan: sales_global projection=[country, ts, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] 03)----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort @@ -3800,7 +3800,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan 01)ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] -02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] From a165b7f57946c7c4e40259e982a2a0aad3ee456c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Apr 2024 10:57:19 -0400 Subject: [PATCH 2/4] Avoid copies in `CountWildcardRule` via TreeNode API (#10066) * Avoid copies in `CountWildcardRule` via TreeNode API --- .../src/analyzer/count_wildcard_rule.rs | 241 +++++------------- .../src/analyzer/function_rewrite.rs | 4 +- 2 files changed, 66 insertions(+), 179 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index 273766edac34..080ec074d3c3 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -15,23 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - use crate::analyzer::AnalyzerRule; +use crate::utils::NamePreserver; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRewriter, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; -use datafusion_expr::expr::{AggregateFunction, AggregateFunctionDefinition, InSubquery}; -use datafusion_expr::expr_rewriter::rewrite_preserving_name; -use datafusion_expr::utils::COUNT_STAR_EXPANSION; -use datafusion_expr::Expr::ScalarSubquery; -use datafusion_expr::{ - aggregate_function, expr, lit, Aggregate, Expr, Filter, LogicalPlan, - LogicalPlanBuilder, Projection, Sort, Subquery, +use datafusion_expr::expr::{ + AggregateFunction, AggregateFunctionDefinition, WindowFunction, }; +use datafusion_expr::utils::COUNT_STAR_EXPANSION; +use datafusion_expr::{lit, Expr, LogicalPlan, WindowFunctionDefinition}; /// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`. /// @@ -47,7 +41,8 @@ impl CountWildcardRule { impl AnalyzerRule for CountWildcardRule { fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - plan.transform_down(&analyze_internal).data() + plan.transform_down_with_subqueries(&analyze_internal) + .data() } fn name(&self) -> &str { @@ -55,173 +50,53 @@ impl AnalyzerRule for CountWildcardRule { } } -fn analyze_internal(plan: LogicalPlan) -> Result> { - let mut rewriter = CountWildcardRewriter {}; - match plan { - LogicalPlan::Window(window) => { - let window_expr = window - .window_expr - .iter() - .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) - .collect::>>()?; - - Ok(Transformed::yes( - LogicalPlanBuilder::from((*window.input).clone()) - .window(window_expr)? - .build()?, - )) - } - LogicalPlan::Aggregate(agg) => { - let aggr_expr = agg - .aggr_expr - .iter() - .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) - .collect::>>()?; - - Ok(Transformed::yes(LogicalPlan::Aggregate( - Aggregate::try_new(agg.input.clone(), agg.group_expr, aggr_expr)?, - ))) - } - LogicalPlan::Sort(Sort { expr, input, fetch }) => { - let sort_expr = expr - .iter() - .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) - .collect::>>()?; - Ok(Transformed::yes(LogicalPlan::Sort(Sort { - expr: sort_expr, - input, - fetch, - }))) - } - LogicalPlan::Projection(projection) => { - let projection_expr = projection - .expr - .iter() - .map(|expr| rewrite_preserving_name(expr.clone(), &mut rewriter)) - .collect::>>()?; - Ok(Transformed::yes(LogicalPlan::Projection( - Projection::try_new(projection_expr, projection.input)?, - ))) - } - LogicalPlan::Filter(Filter { - predicate, input, .. - }) => { - let predicate = rewrite_preserving_name(predicate, &mut rewriter)?; - Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new( - predicate, input, - )?))) - } - - _ => Ok(Transformed::no(plan)), - } +fn is_wildcard(expr: &Expr) -> bool { + matches!(expr, Expr::Wildcard { qualifier: None }) } -struct CountWildcardRewriter {} - -impl TreeNodeRewriter for CountWildcardRewriter { - type Node = Expr; - - fn f_up(&mut self, old_expr: Expr) -> Result> { - Ok(match old_expr.clone() { - Expr::WindowFunction(expr::WindowFunction { - fun: - expr::WindowFunctionDefinition::AggregateFunction( - aggregate_function::AggregateFunction::Count, - ), - args, - partition_by, - order_by, - window_frame, - null_treatment, - }) if args.len() == 1 => match args[0] { - Expr::Wildcard { qualifier: None } => { - Transformed::yes(Expr::WindowFunction(expr::WindowFunction { - fun: expr::WindowFunctionDefinition::AggregateFunction( - aggregate_function::AggregateFunction::Count, - ), - args: vec![lit(COUNT_STAR_EXPANSION)], - partition_by, - order_by, - window_frame, - null_treatment, - })) - } +fn is_count_star_aggregate(aggregate_function: &AggregateFunction) -> bool { + matches!( + &aggregate_function.func_def, + AggregateFunctionDefinition::BuiltIn( + datafusion_expr::aggregate_function::AggregateFunction::Count, + ) + ) && aggregate_function.args.len() == 1 + && is_wildcard(&aggregate_function.args[0]) +} - _ => Transformed::no(old_expr), - }, - Expr::AggregateFunction(AggregateFunction { - func_def: - AggregateFunctionDefinition::BuiltIn( - aggregate_function::AggregateFunction::Count, - ), - args, - distinct, - filter, - order_by, - null_treatment, - }) if args.len() == 1 => match args[0] { - Expr::Wildcard { qualifier: None } => { - Transformed::yes(Expr::AggregateFunction(AggregateFunction::new( - aggregate_function::AggregateFunction::Count, - vec![lit(COUNT_STAR_EXPANSION)], - distinct, - filter, - order_by, - null_treatment, - ))) - } - _ => Transformed::no(old_expr), - }, +fn is_count_star_window_aggregate(window_function: &WindowFunction) -> bool { + matches!( + &window_function.fun, + WindowFunctionDefinition::AggregateFunction( + datafusion_expr::aggregate_function::AggregateFunction::Count, + ) + ) && window_function.args.len() == 1 + && is_wildcard(&window_function.args[0]) +} - ScalarSubquery(Subquery { - subquery, - outer_ref_columns, - }) => subquery - .as_ref() - .clone() - .transform_down(&analyze_internal)? - .update_data(|new_plan| { - ScalarSubquery(Subquery { - subquery: Arc::new(new_plan), - outer_ref_columns, - }) - }), - Expr::InSubquery(InSubquery { - expr, - subquery, - negated, - }) => subquery - .subquery - .as_ref() - .clone() - .transform_down(&analyze_internal)? - .update_data(|new_plan| { - Expr::InSubquery(InSubquery::new( - expr, - Subquery { - subquery: Arc::new(new_plan), - outer_ref_columns: subquery.outer_ref_columns, - }, - negated, - )) - }), - Expr::Exists(expr::Exists { subquery, negated }) => subquery - .subquery - .as_ref() - .clone() - .transform_down(&analyze_internal)? - .update_data(|new_plan| { - Expr::Exists(expr::Exists { - subquery: Subquery { - subquery: Arc::new(new_plan), - outer_ref_columns: subquery.outer_ref_columns, - }, - negated, - }) - }), - _ => Transformed::no(old_expr), - }) - } +fn analyze_internal(plan: LogicalPlan) -> Result> { + let name_preserver = NamePreserver::new(&plan); + plan.map_expressions(|expr| { + let original_name = name_preserver.save(&expr)?; + let transformed_expr = expr.transform_up(&|expr| match expr { + Expr::WindowFunction(mut window_function) + if is_count_star_window_aggregate(&window_function) => + { + window_function.args = vec![lit(COUNT_STAR_EXPANSION)]; + Ok(Transformed::yes(Expr::WindowFunction(window_function))) + } + Expr::AggregateFunction(mut aggregate_function) + if is_count_star_aggregate(&aggregate_function) => + { + aggregate_function.args = vec![lit(COUNT_STAR_EXPANSION)]; + Ok(Transformed::yes(Expr::AggregateFunction( + aggregate_function, + ))) + } + _ => Ok(Transformed::no(expr)), + })?; + transformed_expr.map_data(|data| original_name.restore(data)) + }) } #[cfg(test)] @@ -233,9 +108,10 @@ mod tests { use datafusion_expr::expr::Sort; use datafusion_expr::{ col, count, exists, expr, in_subquery, lit, logical_plan::LogicalPlanBuilder, - max, out_ref_col, scalar_subquery, wildcard, AggregateFunction, Expr, + max, out_ref_col, scalar_subquery, sum, wildcard, AggregateFunction, Expr, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; + use std::sync::Arc; fn assert_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_analyzed_plan_eq_display_indent( @@ -381,6 +257,17 @@ mod tests { assert_plan_eq(&plan, expected) } + #[test] + fn test_count_wildcard_on_non_count_aggregate() -> Result<()> { + let table_scan = test_table_scan()?; + let err = LogicalPlanBuilder::from(table_scan) + .aggregate(Vec::::new(), vec![sum(wildcard())]) + .unwrap_err() + .to_string(); + assert!(err.contains("Error during planning: No function matches the given name and argument types 'SUM(Null)'."), "{err}"); + Ok(()) + } + #[test] fn test_count_wildcard_on_nesting() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/optimizer/src/analyzer/function_rewrite.rs b/datafusion/optimizer/src/analyzer/function_rewrite.rs index deb493e09953..4dd3222a32cf 100644 --- a/datafusion/optimizer/src/analyzer/function_rewrite.rs +++ b/datafusion/optimizer/src/analyzer/function_rewrite.rs @@ -64,7 +64,7 @@ impl ApplyFunctionRewrites { let original_name = name_preserver.save(&expr)?; // recursively transform the expression, applying the rewrites at each step - let result = expr.transform_up(&|expr| { + let transformed_expr = expr.transform_up(&|expr| { let mut result = Transformed::no(expr); for rewriter in self.function_rewrites.iter() { result = result.transform_data(|expr| { @@ -74,7 +74,7 @@ impl ApplyFunctionRewrites { Ok(result) })?; - result.map_data(|expr| original_name.restore(expr)) + transformed_expr.map_data(|expr| original_name.restore(expr)) }) } } From 6ca9d104b8092ab7abac5c3603601cdb03fa07fd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 15 Apr 2024 12:03:44 -0700 Subject: [PATCH 3/4] Coerce Dictionary types for scalar functions (#10077) * Coerce Dictionary types for scalar functions * Fix * Fix format * Add test --- datafusion/expr/src/built_in_function.rs | 15 +++++++++++ .../expr/src/type_coercion/functions.rs | 25 +++++++++++++++++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 5bfec00ea3b3..7ec544a57edb 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -277,4 +277,19 @@ mod tests { .unwrap(); assert_eq!(return_type, DataType::Date32); } + + #[test] + fn test_coalesce_return_types_dictionary() { + let coalesce = BuiltinScalarFunction::Coalesce; + let return_type = coalesce + .return_type(&[ + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + DataType::Utf8, + ]) + .unwrap(); + assert_eq!( + return_type, + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + ); + } } diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 34b607d0884d..37eeb7d464b8 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -314,8 +314,13 @@ fn coerced_from<'a>( // match Dictionary first match (type_into, type_from) { // coerced dictionary first - (cur_type, Dictionary(_, value_type)) | (Dictionary(_, value_type), cur_type) - if coerced_from(cur_type, value_type).is_some() => + (_, Dictionary(_, value_type)) + if coerced_from(type_into, value_type).is_some() => + { + Some(type_into.clone()) + } + (Dictionary(_, value_type), _) + if coerced_from(value_type, type_from).is_some() => { Some(type_into.clone()) } @@ -624,4 +629,20 @@ mod tests { Ok(()) } + + #[test] + fn test_coerced_from_dictionary() { + let type_into = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::UInt32)); + let type_from = DataType::Int64; + assert_eq!(coerced_from(&type_into, &type_from), None); + + let type_from = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::UInt32)); + let type_into = DataType::Int64; + assert_eq!( + coerced_from(&type_into, &type_from), + Some(type_into.clone()) + ); + } } From 6be3be4d0104abf4d8cdfe29d2e189e9c1568df6 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 16 Apr 2024 04:48:15 +0200 Subject: [PATCH 4/4] Refactor `UnwrapCastInComparison` to use `rewrite()` (#10087) --- .../src/unwrap_cast_in_comparison.rs | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index fda390f37961..5ede43a05134 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -23,14 +23,14 @@ use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; +use crate::utils::NamePreserver; use arrow::datatypes::{ DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION, }; use arrow::temporal_conversions::{MICROSECONDS, MILLISECONDS, NANOSECONDS}; -use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{internal_err, DFSchema, DFSchemaRef, Result, ScalarValue}; use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast}; -use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::utils::merge_schema; use datafusion_expr::{ binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator, @@ -85,12 +85,32 @@ impl UnwrapCastInComparison { impl OptimizerRule for UnwrapCastInComparison { fn try_optimize( &self, - plan: &LogicalPlan, + _plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { + internal_err!("Should have called UnwrapCastInComparison::rewrite") + } + + fn name(&self) -> &str { + "unwrap_cast_in_comparison" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { let mut schema = merge_schema(plan.inputs()); - if let LogicalPlan::TableScan(ts) = plan { + if let LogicalPlan::TableScan(ts) = &plan { let source_schema = DFSchema::try_from_qualified_schema( ts.table_name.clone(), &ts.source.schema(), @@ -104,22 +124,12 @@ impl OptimizerRule for UnwrapCastInComparison { schema: Arc::new(schema), }; - let new_exprs = plan - .expressions() - .into_iter() - .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter)) - .collect::>>()?; - - let inputs = plan.inputs().into_iter().cloned().collect(); - plan.with_new_exprs(new_exprs, inputs).map(Some) - } - - fn name(&self) -> &str { - "unwrap_cast_in_comparison" - } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::BottomUp) + let name_preserver = NamePreserver::new(&plan); + plan.map_expressions(|expr| { + let original_name = name_preserver.save(&expr)?; + expr.rewrite(&mut expr_rewriter)? + .map_data(|expr| original_name.restore(expr)) + }) } }