Skip to content

Commit

Permalink
Add ignore_nulls flag to AggrFn
Browse files Browse the repository at this point in the history
  • Loading branch information
joroKr21 committed Aug 26, 2024
1 parent 7a27881 commit 654a53a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl ArrayAgg {
ignore_nulls,
}
}

pub fn ignore_nulls(&self) -> bool {
self.ignore_nulls
}
}

impl AggregateExpr for ArrayAgg {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl DistinctArrayAgg {
ignore_nulls,
}
}

pub fn ignore_nulls(&self) -> bool {
self.ignore_nulls
}
}

impl AggregateExpr for DistinctArrayAgg {
Expand Down
27 changes: 19 additions & 8 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,23 @@ pub fn serialize_physical_aggr_expr(
}

let AggrFn {
inner: aggr_function,
inner,
distinct,
ignore_nulls,
} = aggr_expr_to_aggr_fn(aggr_expr.as_ref())?;

Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
protobuf::PhysicalAggregateExprNode {
aggregate_function: Some(
physical_aggregate_expr_node::AggregateFunction::AggrFunction(
aggr_function as i32,
inner.into(),
),
),
expr: expressions,
ordering_req,
distinct,
ignore_nulls: false,
ignore_nulls,
fun_definition: None,
},
)),
Expand Down Expand Up @@ -124,7 +125,9 @@ fn serialize_physical_window_aggr_expr(
(!buf.is_empty()).then_some(buf),
))
} else {
let AggrFn { inner, distinct } = aggr_expr_to_aggr_fn(aggr_expr)?;
let AggrFn {
inner, distinct, ..
} = aggr_expr_to_aggr_fn(aggr_expr)?;
if distinct {
return not_impl_err!(
"Distinct aggregate functions not supported in window expressions"
Expand All @@ -138,7 +141,7 @@ fn serialize_physical_window_aggr_expr(
}

Ok((
physical_window_expr_node::WindowFunction::AggrFunction(inner as i32),
physical_window_expr_node::WindowFunction::AggrFunction(inner.into()),
None,
))
}
Expand Down Expand Up @@ -260,11 +263,13 @@ pub fn serialize_physical_window_expr(
struct AggrFn {
inner: protobuf::AggregateFunction,
distinct: bool,
ignore_nulls: bool,
}

fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result<AggrFn> {
let aggr_expr = expr.as_any();
let mut distinct = false;
let mut ignore_nulls = false;

let inner = if aggr_expr.downcast_ref::<Count>().is_some() {
protobuf::AggregateFunction::Count
Expand Down Expand Up @@ -293,10 +298,12 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result<AggrFn> {
protobuf::AggregateFunction::Sum
} else if aggr_expr.downcast_ref::<ApproxDistinct>().is_some() {
protobuf::AggregateFunction::ApproxDistinct
} else if aggr_expr.downcast_ref::<ArrayAgg>().is_some() {
} else if let Some(array_agg) = aggr_expr.downcast_ref::<ArrayAgg>() {
ignore_nulls = array_agg.ignore_nulls();
protobuf::AggregateFunction::ArrayAgg
} else if aggr_expr.downcast_ref::<DistinctArrayAgg>().is_some() {
} else if let Some(array_agg) = aggr_expr.downcast_ref::<DistinctArrayAgg>() {
distinct = true;
ignore_nulls = array_agg.ignore_nulls();
protobuf::AggregateFunction::ArrayAgg
} else if aggr_expr.downcast_ref::<OrderSensitiveArrayAgg>().is_some() {
protobuf::AggregateFunction::ArrayAgg
Expand Down Expand Up @@ -343,7 +350,11 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result<AggrFn> {
return not_impl_err!("Aggregate function not supported: {expr:?}");
};

Ok(AggrFn { inner, distinct })
Ok(AggrFn {
inner,
distinct,
ignore_nulls,
})
}

pub fn serialize_physical_sort_exprs<I>(
Expand Down

0 comments on commit 654a53a

Please sign in to comment.