Skip to content

Commit

Permalink
Push down filter for Unnest plan (apache#10974)
Browse files Browse the repository at this point in the history
* add unnset

Signed-off-by: jayzhan211 <[email protected]>

* improve doc and tset

Signed-off-by: jayzhan211 <[email protected]>

* fix test

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Jun 19, 2024
1 parent 8fda4a6 commit 4109f58
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 49 deletions.
156 changes: 107 additions & 49 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_expr::logical_plan::{
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned};
use datafusion_expr::{
and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator,
TableProviderFilterPushDown,
Projection, TableProviderFilterPushDown,
};

use crate::optimizer::ApplyOrder;
Expand Down Expand Up @@ -691,58 +691,46 @@ impl OptimizerRule for PushDownFilter {
insert_below(LogicalPlan::SubqueryAlias(subquery_alias), new_filter)
}
LogicalPlan::Projection(projection) => {
// A projection is filter-commutable if it do not contain volatile predicates or contain volatile
// predicates that are not used in the filter. However, we should re-writes all predicate expressions.
// collect projection.
let (volatile_map, non_volatile_map): (HashMap<_, _>, HashMap<_, _>) =
projection
.schema
.iter()
.zip(projection.expr.iter())
.map(|((qualifier, field), expr)| {
// strip alias, as they should not be part of filters
let expr = expr.clone().unalias();

(qualified_name(qualifier, field.name()), expr)
})
.partition(|(_, value)| value.is_volatile().unwrap_or(true));

let mut push_predicates = vec![];
let mut keep_predicates = vec![];
for expr in split_conjunction_owned(filter.predicate.clone()) {
if contain(&expr, &volatile_map) {
keep_predicates.push(expr);
} else {
push_predicates.push(expr);
let (new_projection, keep_predicate) =
rewrite_projection(filter.predicate.clone(), projection)?;
if new_projection.transformed {
match keep_predicate {
None => Ok(new_projection),
Some(keep_predicate) => new_projection.map_data(|child_plan| {
Filter::try_new(keep_predicate, Arc::new(child_plan))
.map(LogicalPlan::Filter)
}),
}
} else {
filter.input = Arc::new(new_projection.data);
Ok(Transformed::no(LogicalPlan::Filter(filter)))
}

match conjunction(push_predicates) {
Some(expr) => {
// re-write all filters based on this projection
// E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
let new_filter = LogicalPlan::Filter(Filter::try_new(
replace_cols_by_name(expr, &non_volatile_map)?,
Arc::clone(&projection.input),
)?);

match conjunction(keep_predicates) {
None => insert_below(
LogicalPlan::Projection(projection),
new_filter,
),
Some(keep_predicate) => insert_below(
LogicalPlan::Projection(projection),
new_filter,
)?
.map_data(|child_plan| {
Filter::try_new(keep_predicate, Arc::new(child_plan))
.map(LogicalPlan::Filter)
}),
}
LogicalPlan::Unnest(mut unnest) => {
// Unnest is built above Projection, so we only take Projection into consideration
match unwrap_arc(unnest.input) {
LogicalPlan::Projection(projection) => {
let (new_projection, keep_predicate) =
rewrite_projection(filter.predicate.clone(), projection)?;
unnest.input = Arc::new(new_projection.data);

if new_projection.transformed {
match keep_predicate {
None => Ok(Transformed::yes(LogicalPlan::Unnest(unnest))),
Some(keep_predicate) => Ok(Transformed::yes(
LogicalPlan::Filter(Filter::try_new(
keep_predicate,
Arc::new(LogicalPlan::Unnest(unnest)),
)?),
)),
}
} else {
filter.input = Arc::new(LogicalPlan::Unnest(unnest));
Ok(Transformed::no(LogicalPlan::Filter(filter)))
}
}
None => {
filter.input = Arc::new(LogicalPlan::Projection(projection));
child => {
filter.input = Arc::new(child);
Ok(Transformed::no(LogicalPlan::Filter(filter)))
}
}
Expand Down Expand Up @@ -951,6 +939,76 @@ impl OptimizerRule for PushDownFilter {
}
}

/// Attempts to push `predicate` into a `FilterExec` below `projection
///
/// # Returns
/// (plan, remaining_predicate)
///
/// `plan` is a LogicalPlan for `projection` with possibly a new FilterExec below it.
/// `remaining_predicate` is any part of the predicate that could not be pushed down
///
/// # Example
///
/// Pushing a predicate like `foo=5 AND bar=6` with an input plan like this:
///
/// ```text
/// Projection(foo, c+d as bar)
/// ```
///
/// Might result in returning `remaining_predicate` of `bar=6` and a plan like
///
/// ```text
/// Projection(foo, c+d as bar)
/// Filter(foo=5)
/// ...
/// ```
fn rewrite_projection(
predicate: Expr,
projection: Projection,
) -> Result<(Transformed<LogicalPlan>, Option<Expr>)> {
// A projection is filter-commutable if it do not contain volatile predicates or contain volatile
// predicates that are not used in the filter. However, we should re-writes all predicate expressions.
// collect projection.
let (volatile_map, non_volatile_map): (HashMap<_, _>, HashMap<_, _>) = projection
.schema
.iter()
.zip(projection.expr.iter())
.map(|((qualifier, field), expr)| {
// strip alias, as they should not be part of filters
let expr = expr.clone().unalias();

(qualified_name(qualifier, field.name()), expr)
})
.partition(|(_, value)| value.is_volatile().unwrap_or(true));

let mut push_predicates = vec![];
let mut keep_predicates = vec![];
for expr in split_conjunction_owned(predicate) {
if contain(&expr, &volatile_map) {
keep_predicates.push(expr);
} else {
push_predicates.push(expr);
}
}

match conjunction(push_predicates) {
Some(expr) => {
// re-write all filters based on this projection
// E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
let new_filter = LogicalPlan::Filter(Filter::try_new(
replace_cols_by_name(expr, &non_volatile_map)?,
Arc::clone(&projection.input),
)?);

Ok((
insert_below(LogicalPlan::Projection(projection), new_filter)?,
conjunction(keep_predicates),
))
}
None => Ok((Transformed::no(LogicalPlan::Projection(projection)), None)),
}
}

/// Creates a new LogicalPlan::Filter node.
pub fn make_filter(predicate: Expr, input: Arc<LogicalPlan>) -> Result<LogicalPlan> {
Filter::try_new(predicate, input).map(LogicalPlan::Filter)
Expand Down
51 changes: 51 additions & 0 deletions datafusion/sqllogictest/test_files/push_down_filter.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Test push down filter

statement ok
set datafusion.explain.logical_plan_only = true;

statement ok
CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);

query I
select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2;
----
3
4
5

# test push down filter for unnest with filter on non-unnest column
# filter plan is pushed down into projection plan
query TT
explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2;
----
logical_plan
01)Projection: unnest(v.column2) AS uc2
02)--Unnest: lists[unnest(v.column2)] structs[]
03)----Projection: v.column2 AS unnest(v.column2), v.column1
04)------Filter: v.column1 = Int64(2)
05)--------TableScan: v projection=[column1, column2]

# TODO: fix the query
query error DataFusion error: External error: Arrow error: Invalid argument error: Invalid comparison operation: List\(Field \{ name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\) > Int64
select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;

# test push down filter for unnest with filter on unnest column
# query TT
# explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;

0 comments on commit 4109f58

Please sign in to comment.