From f478344c42b9da8931d65415e500a01a37fc34f2 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 2 Sep 2024 18:41:15 +0000 Subject: [PATCH 1/3] Removes min/max/count comparison based on name in aggregate statistics --- datafusion/expr/src/udaf.rs | 26 +++++++++++++ datafusion/functions-aggregate/src/count.rs | 4 ++ datafusion/functions-aggregate/src/min_max.rs | 8 ++++ .../src/aggregate_statistics.rs | 39 +++---------------- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 7b4b3bb95c46..87be2321c590 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -262,6 +262,19 @@ impl AggregateUDF { self.inner.is_descending() } + /// Returns true if the function is min. Used by the optimizer + pub fn is_min(&self) -> bool { + self.inner.is_min() + } + + /// Returns true if the function is max. Used by the optimizer + pub fn is_max(&self) -> bool { + self.inner.is_max() + } + /// Returns true if the function is count. Used by the optimizer + pub fn is_count(&self) -> bool { + self.inner.is_count() + } /// See [`AggregateUDFImpl::default_value`] for more details. pub fn default_value(&self, data_type: &DataType) -> Result { self.inner.default_value(data_type) @@ -575,6 +588,19 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { None } + // Returns true if the function is min. Used by the optimizer + fn is_min(&self) -> bool { + false + } + // Returns true if the function is max. Used by the optimizer + fn is_max(&self) -> bool { + false + } + // Returns true if the function is count. Used by the optimizer + fn is_count(&self) -> bool { + false + } + /// Returns default value of the function given the input is all `null`. /// /// Most of the aggregate function return Null if input is Null, diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 417e28e72a71..dd61eb999831 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -291,6 +291,10 @@ impl AggregateUDFImpl for Count { fn default_value(&self, _data_type: &DataType) -> Result { Ok(ScalarValue::Int64(Some(0))) } + + fn is_count(&self) -> bool { + true + } } #[derive(Debug)] diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 961e8639604c..5a69fe9ff36b 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -272,6 +272,10 @@ impl AggregateUDFImpl for Max { fn is_descending(&self) -> Option { Some(true) } + + fn is_max(&self) -> bool { + true + } fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } @@ -1052,6 +1056,10 @@ impl AggregateUDFImpl for Min { Some(false) } + fn is_min(&self) -> bool { + true + } + fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 863c5ab2d288..35b12765f550 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -141,7 +141,7 @@ fn take_optimizable_column_and_table_count( stats: &Statistics, ) -> Option<(ScalarValue, String)> { let col_stats = &stats.column_statistics; - if is_non_distinct_count(agg_expr) { + if agg_expr.fun().is_count() && !agg_expr.is_distinct() { if let Precision::Exact(num_rows) = stats.num_rows { let exprs = agg_expr.expressions(); if exprs.len() == 1 { @@ -181,7 +181,7 @@ fn take_optimizable_min( match *num_rows { 0 => { // MIN/MAX with 0 rows is always null - if is_min(agg_expr) { + if agg_expr.fun().is_min() { if let Ok(min_data_type) = ScalarValue::try_from(agg_expr.field().data_type()) { @@ -191,7 +191,7 @@ fn take_optimizable_min( } value if value > 0 => { let col_stats = &stats.column_statistics; - if is_min(agg_expr) { + if agg_expr.fun().is_min() { let exprs = agg_expr.expressions(); if exprs.len() == 1 { // TODO optimize with exprs other than Column @@ -227,7 +227,7 @@ fn take_optimizable_max( match *num_rows { 0 => { // MIN/MAX with 0 rows is always null - if is_max(agg_expr) { + if agg_expr.fun().is_max() { if let Ok(max_data_type) = ScalarValue::try_from(agg_expr.field().data_type()) { @@ -237,7 +237,7 @@ fn take_optimizable_max( } value if value > 0 => { let col_stats = &stats.column_statistics; - if is_max(agg_expr) { + if agg_expr.fun().is_max() { let exprs = agg_expr.expressions(); if exprs.len() == 1 { // TODO optimize with exprs other than Column @@ -263,32 +263,3 @@ fn take_optimizable_max( } None } - -// TODO: Move this check into AggregateUDFImpl -// https://github.com/apache/datafusion/issues/11153 -fn is_non_distinct_count(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { - return true; - } - false -} - -// TODO: Move this check into AggregateUDFImpl -// https://github.com/apache/datafusion/issues/11153 -fn is_min(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.fun().name().to_lowercase() == "min" { - return true; - } - false -} - -// TODO: Move this check into AggregateUDFImpl -// https://github.com/apache/datafusion/issues/11153 -fn is_max(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.fun().name().to_lowercase() == "max" { - return true; - } - false -} - -// See tests in datafusion/core/tests/physical_optimizer From 751e35a10136c5eb7f3e32f805bb416adf2b1a44 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Fri, 13 Sep 2024 23:17:18 +0000 Subject: [PATCH 2/3] Abstracting away value from statistics --- datafusion/expr/src/udaf.rs | 42 +++--- datafusion/functions-aggregate/src/count.rs | 32 ++++- datafusion/functions-aggregate/src/min_max.rs | 90 +++++++++++- .../src/aggregate_statistics.rs | 132 ++---------------- 4 files changed, 146 insertions(+), 150 deletions(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 87be2321c590..9dda0edb421c 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -25,7 +25,8 @@ use std::vec; use arrow::datatypes::{DataType, Field}; -use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; +use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue, Statistics}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use crate::expr::AggregateFunction; use crate::function::{ @@ -262,19 +263,16 @@ impl AggregateUDF { self.inner.is_descending() } - /// Returns true if the function is min. Used by the optimizer - pub fn is_min(&self) -> bool { - self.inner.is_min() + pub fn value_from_stats( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + self.inner + .value_from_stats(statistics, &data_type, arguments) } - /// Returns true if the function is max. Used by the optimizer - pub fn is_max(&self) -> bool { - self.inner.is_max() - } - /// Returns true if the function is count. Used by the optimizer - pub fn is_count(&self) -> bool { - self.inner.is_count() - } /// See [`AggregateUDFImpl::default_value`] for more details. pub fn default_value(&self, data_type: &DataType) -> Result { self.inner.default_value(data_type) @@ -587,18 +585,14 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { fn is_descending(&self) -> Option { None } - - // Returns true if the function is min. Used by the optimizer - fn is_min(&self) -> bool { - false - } - // Returns true if the function is max. Used by the optimizer - fn is_max(&self) -> bool { - false - } - // Returns true if the function is count. Used by the optimizer - fn is_count(&self) -> bool { - false + // Return the value of the current UDF from the statistics + fn value_from_stats( + &self, + _statistics: &Statistics, + _data_type: &DataType, + _arguments: &[Arc], + ) -> Option { + None } /// Returns default value of the function given the input is all `null`. diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index dd61eb999831..e62500d7af12 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,7 +16,9 @@ // under the License. use ahash::RandomState; +use datafusion_common::stats::Precision; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; +use datafusion_physical_expr::expressions; use std::collections::HashSet; use std::ops::BitAnd; use std::{fmt::Debug, sync::Arc}; @@ -54,6 +56,7 @@ use datafusion_functions_aggregate_common::aggregate::count_distinct::{ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_indices; use datafusion_physical_expr_common::binary_map::OutputType; +use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; make_udaf_expr_and_func!( Count, count, @@ -292,8 +295,33 @@ impl AggregateUDFImpl for Count { Ok(ScalarValue::Int64(Some(0))) } - fn is_count(&self) -> bool { - true + fn value_from_stats( + &self, + statistics: &datafusion_common::Statistics, + _data_type: &DataType, + arguments: &[Arc], + ) -> Option { + if let Precision::Exact(num_rows) = statistics.num_rows { + if arguments.len() == 1 { + // TODO optimize with exprs other than Column + if let Some(col_expr) = + arguments[0].as_any().downcast_ref::() + { + let current_val = + &statistics.column_statistics[col_expr.index()].null_count; + if let &Precision::Exact(val) = current_val { + return Some(ScalarValue::Int64(Some((num_rows - val) as i64))); + } + } else if let Some(lit_expr) = + arguments[0].as_any().downcast_ref::() + { + if lit_expr.value() == &COUNT_STAR_EXPANSION { + return Some(ScalarValue::Int64(Some(num_rows as i64))); + } + } + } + } + None } } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 5a69fe9ff36b..659fca60447a 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -49,11 +49,15 @@ use arrow::datatypes::{ UInt8Type, }; use arrow_schema::IntervalUnit; +use datafusion_common::stats::Precision; use datafusion_common::{ - downcast_value, exec_err, internal_err, DataFusionError, Result, + downcast_value, exec_err, internal_err, ColumnStatistics, DataFusionError, Result, + Statistics, }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use datafusion_physical_expr::{expressions, PhysicalExpr}; use std::fmt::Debug; +use std::sync::Arc; use arrow::datatypes::i256; use arrow::datatypes::{ @@ -147,6 +151,55 @@ macro_rules! instantiate_min_accumulator { }}; } +trait FromColumnStatistics { + fn value_from_column_statistics( + &self, + stats: &ColumnStatistics, + ) -> Option; + + fn value_from_statistics( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + if let Precision::Exact(num_rows) = &statistics.num_rows { + match *num_rows { + 0 => return ScalarValue::try_from(data_type).ok(), + value if value > 0 => { + let col_stats = &statistics.column_statistics; + if arguments.len() == 1 { + // TODO optimize with exprs other than Column + if let Some(col_expr) = + arguments[0].as_any().downcast_ref::() + { + return self.value_from_column_statistics( + &col_stats[col_expr.index()], + ); + } + } + } + _ => {} + } + } + None + } +} + +impl FromColumnStatistics for Max { + fn value_from_column_statistics( + &self, + col_stats: &ColumnStatistics, + ) -> Option { + if let Precision::Exact(ref val) = col_stats.max_value { + if !val.is_null() { + return Some(val.clone()); + } + } + None + } +} + impl AggregateUDFImpl for Max { fn as_any(&self) -> &dyn std::any::Any { self @@ -273,9 +326,6 @@ impl AggregateUDFImpl for Max { Some(true) } - fn is_max(&self) -> bool { - true - } fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } @@ -286,6 +336,14 @@ impl AggregateUDFImpl for Max { fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF { datafusion_expr::ReversedUDAF::Identical } + fn value_from_stats( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + self.value_from_statistics(statistics, data_type, arguments) + } } // Statically-typed version of min/max(array) -> ScalarValue for string types @@ -930,6 +988,20 @@ impl Default for Min { } } +impl FromColumnStatistics for Min { + fn value_from_column_statistics( + &self, + col_stats: &ColumnStatistics, + ) -> Option { + if let Precision::Exact(ref val) = col_stats.min_value { + if !val.is_null() { + return Some(val.clone()); + } + } + None + } +} + impl AggregateUDFImpl for Min { fn as_any(&self) -> &dyn std::any::Any { self @@ -1056,10 +1128,14 @@ impl AggregateUDFImpl for Min { Some(false) } - fn is_min(&self) -> bool { - true + fn value_from_stats( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + self.value_from_statistics(statistics, data_type, arguments) } - fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 35b12765f550..fd04f9e2998e 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -61,10 +61,10 @@ impl PhysicalOptimizerRule for AggregateStatistics { take_optimizable_column_and_table_count(expr, &stats) { projections.push((expressions::lit(non_null_rows), name.to_owned())); - } else if let Some((min, name)) = take_optimizable_min(expr, &stats) { - projections.push((expressions::lit(min), name.to_owned())); - } else if let Some((max, name)) = take_optimizable_max(expr, &stats) { - projections.push((expressions::lit(max), name.to_owned())); + } else if let Some((min_or_max, name)) = + take_optimizable_value_from_statistics(expr, &stats) + { + projections.push((expressions::lit(min_or_max), name.to_owned())); } else { // TODO: we need all aggr_expr to be resolved (cf TODO fullres) break; @@ -140,126 +140,24 @@ fn take_optimizable_column_and_table_count( agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { - let col_stats = &stats.column_statistics; - if agg_expr.fun().is_count() && !agg_expr.is_distinct() { - if let Precision::Exact(num_rows) = stats.num_rows { - let exprs = agg_expr.expressions(); - if exprs.len() == 1 { - // TODO optimize with exprs other than Column - if let Some(col_expr) = - exprs[0].as_any().downcast_ref::() - { - let current_val = &col_stats[col_expr.index()].null_count; - if let &Precision::Exact(val) = current_val { - return Some(( - ScalarValue::Int64(Some((num_rows - val) as i64)), - agg_expr.name().to_string(), - )); - } - } else if let Some(lit_expr) = - exprs[0].as_any().downcast_ref::() - { - if lit_expr.value() == &COUNT_STAR_EXPANSION { - return Some(( - ScalarValue::Int64(Some(num_rows as i64)), - agg_expr.name().to_string(), - )); - } - } - } - } - } - None -} - -/// If this agg_expr is a min that is exactly defined in the statistics, return it. -fn take_optimizable_min( - agg_expr: &AggregateFunctionExpr, - stats: &Statistics, -) -> Option<(ScalarValue, String)> { - if let Precision::Exact(num_rows) = &stats.num_rows { - match *num_rows { - 0 => { - // MIN/MAX with 0 rows is always null - if agg_expr.fun().is_min() { - if let Ok(min_data_type) = - ScalarValue::try_from(agg_expr.field().data_type()) - { - return Some((min_data_type, agg_expr.name().to_string())); - } - } - } - value if value > 0 => { - let col_stats = &stats.column_statistics; - if agg_expr.fun().is_min() { - let exprs = agg_expr.expressions(); - if exprs.len() == 1 { - // TODO optimize with exprs other than Column - if let Some(col_expr) = - exprs[0].as_any().downcast_ref::() - { - if let Precision::Exact(val) = - &col_stats[col_expr.index()].min_value - { - if !val.is_null() { - return Some(( - val.clone(), - agg_expr.name().to_string(), - )); - } - } - } - } - } - } - _ => {} + if !agg_expr.is_distinct() { + if let Some((val, name)) = take_optimizable_value_from_statistics(agg_expr, stats) + { + return Some((val, name)); } } None } /// If this agg_expr is a max that is exactly defined in the statistics, return it. -fn take_optimizable_max( +fn take_optimizable_value_from_statistics( agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { - if let Precision::Exact(num_rows) = &stats.num_rows { - match *num_rows { - 0 => { - // MIN/MAX with 0 rows is always null - if agg_expr.fun().is_max() { - if let Ok(max_data_type) = - ScalarValue::try_from(agg_expr.field().data_type()) - { - return Some((max_data_type, agg_expr.name().to_string())); - } - } - } - value if value > 0 => { - let col_stats = &stats.column_statistics; - if agg_expr.fun().is_max() { - let exprs = agg_expr.expressions(); - if exprs.len() == 1 { - // TODO optimize with exprs other than Column - if let Some(col_expr) = - exprs[0].as_any().downcast_ref::() - { - if let Precision::Exact(val) = - &col_stats[col_expr.index()].max_value - { - if !val.is_null() { - return Some(( - val.clone(), - agg_expr.name().to_string(), - )); - } - } - } - } - } - } - _ => {} - } - } - None + let value = agg_expr.fun().value_from_stats( + &stats, + agg_expr.field().data_type(), + agg_expr.expressions().as_slice(), + ); + value.map(|val| (val, agg_expr.name().to_string())) } From 016decf5d73425ad60a40abbca31a4e437ba8450 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Fri, 13 Sep 2024 23:21:39 +0000 Subject: [PATCH 3/3] Removing imports --- datafusion/physical-optimizer/src/aggregate_statistics.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index fd04f9e2998e..0fcd5bf2c95c 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -26,9 +26,7 @@ use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::{expressions, ExecutionPlan, Statistics}; use crate::PhysicalOptimizerRule; -use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::udaf::AggregateFunctionExpr;