From 654a53a74eb73c667de662216332716874a0f01c Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Mon, 26 Aug 2024 16:30:02 +0300 Subject: [PATCH] Add `ignore_nulls` flag to `AggrFn` --- .../physical-expr/src/aggregate/array_agg.rs | 4 +++ .../src/aggregate/array_agg_distinct.rs | 4 +++ .../proto/src/physical_plan/to_proto.rs | 27 +++++++++++++------ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index 544d94a60fa4..ccfa67f6b7ac 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -79,6 +79,10 @@ impl ArrayAgg { ignore_nulls, } } + + pub fn ignore_nulls(&self) -> bool { + self.ignore_nulls + } } impl AggregateExpr for ArrayAgg { diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 2eb041eb262b..53e6ec8ec7e6 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -67,6 +67,10 @@ impl DistinctArrayAgg { ignore_nulls, } } + + pub fn ignore_nulls(&self) -> bool { + self.ignore_nulls + } } impl AggregateExpr for DistinctArrayAgg { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 520c9e61c515..e30866ab1e18 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -80,8 +80,9 @@ pub fn serialize_physical_aggr_expr( } let AggrFn { - inner: aggr_function, + inner, distinct, + ignore_nulls, } = aggr_expr_to_aggr_fn(aggr_expr.as_ref())?; Ok(protobuf::PhysicalExprNode { @@ -89,13 +90,13 @@ pub fn serialize_physical_aggr_expr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some( physical_aggregate_expr_node::AggregateFunction::AggrFunction( - aggr_function as i32, + inner.into(), ), ), expr: expressions, ordering_req, distinct, - ignore_nulls: false, + ignore_nulls, fun_definition: None, }, )), @@ -124,7 +125,9 @@ fn serialize_physical_window_aggr_expr( (!buf.is_empty()).then_some(buf), )) } else { - let AggrFn { inner, distinct } = aggr_expr_to_aggr_fn(aggr_expr)?; + let AggrFn { + inner, distinct, .. + } = aggr_expr_to_aggr_fn(aggr_expr)?; if distinct { return not_impl_err!( "Distinct aggregate functions not supported in window expressions" @@ -138,7 +141,7 @@ fn serialize_physical_window_aggr_expr( } Ok(( - physical_window_expr_node::WindowFunction::AggrFunction(inner as i32), + physical_window_expr_node::WindowFunction::AggrFunction(inner.into()), None, )) } @@ -260,11 +263,13 @@ pub fn serialize_physical_window_expr( struct AggrFn { inner: protobuf::AggregateFunction, distinct: bool, + ignore_nulls: bool, } fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { let aggr_expr = expr.as_any(); let mut distinct = false; + let mut ignore_nulls = false; let inner = if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Count @@ -293,10 +298,12 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { protobuf::AggregateFunction::Sum } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::ApproxDistinct - } else if aggr_expr.downcast_ref::().is_some() { + } else if let Some(array_agg) = aggr_expr.downcast_ref::() { + ignore_nulls = array_agg.ignore_nulls(); protobuf::AggregateFunction::ArrayAgg - } else if aggr_expr.downcast_ref::().is_some() { + } else if let Some(array_agg) = aggr_expr.downcast_ref::() { distinct = true; + ignore_nulls = array_agg.ignore_nulls(); protobuf::AggregateFunction::ArrayAgg } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::ArrayAgg @@ -343,7 +350,11 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { return not_impl_err!("Aggregate function not supported: {expr:?}"); }; - Ok(AggrFn { inner, distinct }) + Ok(AggrFn { + inner, + distinct, + ignore_nulls, + }) } pub fn serialize_physical_sort_exprs(