diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 2cc50c7f82b6..8dc71b70e22b 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -35,7 +35,7 @@ use datafusion_expr::logical_plan::{ use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned}; use datafusion_expr::{ and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, - TableProviderFilterPushDown, + Projection, TableProviderFilterPushDown, }; use crate::optimizer::ApplyOrder; @@ -691,58 +691,46 @@ impl OptimizerRule for PushDownFilter { insert_below(LogicalPlan::SubqueryAlias(subquery_alias), new_filter) } LogicalPlan::Projection(projection) => { - // A projection is filter-commutable if it do not contain volatile predicates or contain volatile - // predicates that are not used in the filter. However, we should re-writes all predicate expressions. - // collect projection. - let (volatile_map, non_volatile_map): (HashMap<_, _>, HashMap<_, _>) = - projection - .schema - .iter() - .zip(projection.expr.iter()) - .map(|((qualifier, field), expr)| { - // strip alias, as they should not be part of filters - let expr = expr.clone().unalias(); - - (qualified_name(qualifier, field.name()), expr) - }) - .partition(|(_, value)| value.is_volatile().unwrap_or(true)); - - let mut push_predicates = vec![]; - let mut keep_predicates = vec![]; - for expr in split_conjunction_owned(filter.predicate.clone()) { - if contain(&expr, &volatile_map) { - keep_predicates.push(expr); - } else { - push_predicates.push(expr); + let (new_projection, keep_predicate) = + rewrite_projection(filter.predicate.clone(), projection)?; + if new_projection.transformed { + match keep_predicate { + None => Ok(new_projection), + Some(keep_predicate) => new_projection.map_data(|child_plan| { + Filter::try_new(keep_predicate, Arc::new(child_plan)) + .map(LogicalPlan::Filter) + }), } + } else { + filter.input = Arc::new(new_projection.data); + Ok(Transformed::no(LogicalPlan::Filter(filter))) } - - match conjunction(push_predicates) { - Some(expr) => { - // re-write all filters based on this projection - // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1" - let new_filter = LogicalPlan::Filter(Filter::try_new( - replace_cols_by_name(expr, &non_volatile_map)?, - Arc::clone(&projection.input), - )?); - - match conjunction(keep_predicates) { - None => insert_below( - LogicalPlan::Projection(projection), - new_filter, - ), - Some(keep_predicate) => insert_below( - LogicalPlan::Projection(projection), - new_filter, - )? - .map_data(|child_plan| { - Filter::try_new(keep_predicate, Arc::new(child_plan)) - .map(LogicalPlan::Filter) - }), + } + LogicalPlan::Unnest(mut unnest) => { + // Unnest is built above Projection, so we only take Projection into consideration + match unwrap_arc(unnest.input) { + LogicalPlan::Projection(projection) => { + let (new_projection, keep_predicate) = + rewrite_projection(filter.predicate.clone(), projection)?; + unnest.input = Arc::new(new_projection.data); + + if new_projection.transformed { + match keep_predicate { + None => Ok(Transformed::yes(LogicalPlan::Unnest(unnest))), + Some(keep_predicate) => Ok(Transformed::yes( + LogicalPlan::Filter(Filter::try_new( + keep_predicate, + Arc::new(LogicalPlan::Unnest(unnest)), + )?), + )), + } + } else { + filter.input = Arc::new(LogicalPlan::Unnest(unnest)); + Ok(Transformed::no(LogicalPlan::Filter(filter))) } } - None => { - filter.input = Arc::new(LogicalPlan::Projection(projection)); + child => { + filter.input = Arc::new(child); Ok(Transformed::no(LogicalPlan::Filter(filter))) } } @@ -951,6 +939,76 @@ impl OptimizerRule for PushDownFilter { } } +/// Attempts to push `predicate` into a `FilterExec` below `projection +/// +/// # Returns +/// (plan, remaining_predicate) +/// +/// `plan` is a LogicalPlan for `projection` with possibly a new FilterExec below it. +/// `remaining_predicate` is any part of the predicate that could not be pushed down +/// +/// # Example +/// +/// Pushing a predicate like `foo=5 AND bar=6` with an input plan like this: +/// +/// ```text +/// Projection(foo, c+d as bar) +/// ``` +/// +/// Might result in returning `remaining_predicate` of `bar=6` and a plan like +/// +/// ```text +/// Projection(foo, c+d as bar) +/// Filter(foo=5) +/// ... +/// ``` +fn rewrite_projection( + predicate: Expr, + projection: Projection, +) -> Result<(Transformed, Option)> { + // A projection is filter-commutable if it do not contain volatile predicates or contain volatile + // predicates that are not used in the filter. However, we should re-writes all predicate expressions. + // collect projection. + let (volatile_map, non_volatile_map): (HashMap<_, _>, HashMap<_, _>) = projection + .schema + .iter() + .zip(projection.expr.iter()) + .map(|((qualifier, field), expr)| { + // strip alias, as they should not be part of filters + let expr = expr.clone().unalias(); + + (qualified_name(qualifier, field.name()), expr) + }) + .partition(|(_, value)| value.is_volatile().unwrap_or(true)); + + let mut push_predicates = vec![]; + let mut keep_predicates = vec![]; + for expr in split_conjunction_owned(predicate) { + if contain(&expr, &volatile_map) { + keep_predicates.push(expr); + } else { + push_predicates.push(expr); + } + } + + match conjunction(push_predicates) { + Some(expr) => { + // re-write all filters based on this projection + // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1" + let new_filter = LogicalPlan::Filter(Filter::try_new( + replace_cols_by_name(expr, &non_volatile_map)?, + Arc::clone(&projection.input), + )?); + + Ok(( + insert_below(LogicalPlan::Projection(projection), new_filter)?, + conjunction(keep_predicates), + )) + } + None => Ok((Transformed::no(LogicalPlan::Projection(projection)), None)), + } +} + /// Creates a new LogicalPlan::Filter node. pub fn make_filter(predicate: Expr, input: Arc) -> Result { Filter::try_new(predicate, input).map(LogicalPlan::Filter) diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt new file mode 100644 index 000000000000..1d72e11f4352 --- /dev/null +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test push down filter + +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]); + +query I +select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2; +---- +3 +4 +5 + +# test push down filter for unnest with filter on non-unnest column +# filter plan is pushed down into projection plan +query TT +explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2; +---- +logical_plan +01)Projection: unnest(v.column2) AS uc2 +02)--Unnest: lists[unnest(v.column2)] structs[] +03)----Projection: v.column2 AS unnest(v.column2), v.column1 +04)------Filter: v.column1 = Int64(2) +05)--------TableScan: v projection=[column1, column2] + +# TODO: fix the query +query error DataFusion error: External error: Arrow error: Invalid argument error: Invalid comparison operation: List\(Field \{ name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\) > Int64 +select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3; + +# test push down filter for unnest with filter on unnest column +# query TT +# explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;