From 485d7d571e82967a2cf414d086fb40bb9c62ad9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 28 Sep 2023 14:47:39 +0200 Subject: [PATCH] Don't add filters to projection in TableScan (#213) --- datafusion/core/src/dataframe.rs | 9 +- datafusion/core/src/datasource/view.rs | 27 +-- datafusion/expr/src/logical_plan/builder.rs | 11 +- datafusion/optimizer/src/push_down_filter.rs | 175 ++++++------------ .../optimizer/src/push_down_projection.rs | 34 +++- .../simplify_expressions/simplify_exprs.rs | 3 +- .../src/unwrap_cast_in_comparison.rs | 8 +- 7 files changed, 127 insertions(+), 140 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 209fb11571b0..c56ad0478546 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1115,15 +1115,16 @@ impl TableProvider for DataFrameTableProvider { limit: Option, ) -> Result> { let mut expr = LogicalPlanBuilder::from(self.plan.clone()); - if let Some(p) = projection { - expr = expr.select(p.iter().copied())? - } - // Add filter when given let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); if let Some(filter) = filter { expr = expr.filter(filter)? } + + if let Some(p) = projection { + expr = expr.select(p.iter().copied())? + } + // add a limit if given if let Some(l) = limit { expr = expr.limit(0, Some(l))? diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 210acad18c93..d58284d1bac5 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -108,12 +108,20 @@ impl TableProvider for ViewTable { filters: &[Expr], limit: Option, ) -> Result> { - let plan = if let Some(projection) = projection { + let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); + let plan = self.logical_plan().clone(); + let mut plan = LogicalPlanBuilder::from(plan); + + if let Some(filter) = filter { + plan = plan.filter(filter)?; + } + + let mut plan = if let Some(projection) = projection { // avoiding adding a redundant projection (e.g. SELECT * FROM view) let current_projection = - (0..self.logical_plan.schema().fields().len()).collect::>(); + (0..plan.schema().fields().len()).collect::>(); if projection == ¤t_projection { - self.logical_plan().clone() + plan } else { let fields: Vec = projection .iter() @@ -123,19 +131,11 @@ impl TableProvider for ViewTable { ) }) .collect(); - LogicalPlanBuilder::from(self.logical_plan.clone()) - .project(fields)? - .build()? + plan.project(fields)? } } else { - self.logical_plan().clone() + plan }; - let mut plan = LogicalPlanBuilder::from(plan); - let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); - - if let Some(filter) = filter { - plan = plan.filter(filter)?; - } if let Some(limit) = limit { plan = plan.limit(0, Some(limit))?; @@ -439,6 +439,7 @@ mod tests { .select_columns(&["bool_col", "int_col"])?; let plan = df.explain(false, false)?.collect().await?; + // Filters all the way to Parquet let formatted = arrow::util::pretty::pretty_format_batches(&plan) .unwrap() diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 07437b8248dd..3166c1e6f57f 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -25,7 +25,9 @@ use crate::expr_rewriter::{ }; use crate::type_coercion::binary::comparison_coercion; use crate::utils::{columnize_expr, compare_sort_expr, exprlist_to_fields}; -use crate::{and, binary_expr, DmlStatement, Operator, WriteOp}; +use crate::{ + and, binary_expr, DmlStatement, Operator, TableProviderFilterPushDown, WriteOp, +}; use crate::{ logical_plan::{ Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join, @@ -1369,6 +1371,13 @@ impl TableSource for LogicalTableSource { fn schema(&self) -> SchemaRef { self.table_schema.clone() } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } } /// Create an unnest plan. diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 115b539614ce..494323da7b38 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -952,8 +952,7 @@ mod tests { // filter is before projection let expected = "\ Projection: test.a, test.b\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -980,9 +979,7 @@ mod tests { let plan = LogicalPlanBuilder::from(table_scan) .filter(lit(0i64).eq(lit(1i64)))? .build()?; - let expected = "\ - Filter: Int64(0) = Int64(1)\ - \n TableScan: test"; + let expected = "TableScan: test, full_filters=[Int64(0) = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -998,8 +995,7 @@ mod tests { let expected = "\ Projection: test.c, test.b\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1013,8 +1009,7 @@ mod tests { // filter of key aggregation is commutative let expected = "\ Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS total_salary]]\ - \n Filter: test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1039,8 +1034,7 @@ mod tests { .build()?; let expected = "Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\ - \n Filter: test.b + test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.b + test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1070,8 +1064,7 @@ mod tests { // filter is before projection let expected = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1115,8 +1108,7 @@ mod tests { // filter is before projection let expected = "\ Projection: test.a * Int32(2) + test.c AS b, test.c\ - \n Filter: test.a * Int32(2) + test.c = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a * Int32(2) + test.c = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1148,8 +1140,7 @@ mod tests { let expected = "\ Projection: b * Int32(3) AS a, test.c\ \n Projection: test.a * Int32(2) + test.c AS b, test.c\ - \n Filter: (test.a * Int32(2) + test.c) * Int32(3) = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[(test.a * Int32(2) + test.c) * Int32(3) = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1212,8 +1203,7 @@ mod tests { // Push filter below NoopPlan let expected = "\ NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected)?; let custom_plan = LogicalPlan::Extension(Extension { @@ -1230,8 +1220,7 @@ mod tests { let expected = "\ Filter: test.c = Int64(2)\ \n NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected)?; let custom_plan = LogicalPlan::Extension(Extension { @@ -1247,10 +1236,8 @@ mod tests { // Push filter below NoopPlan for each child branch let expected = "\ NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]\ + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected)?; let custom_plan = LogicalPlan::Extension(Extension { @@ -1267,10 +1254,8 @@ mod tests { let expected = "\ Filter: test.c = Int64(2)\ \n NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]\ + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1303,8 +1288,7 @@ mod tests { Filter: SUM(test.c) > Int64(10)\ \n Aggregate: groupBy=[[b]], aggr=[[SUM(test.c)]]\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1338,8 +1322,7 @@ mod tests { Filter: SUM(test.c) > Int64(10) AND SUM(test.c) < Int64(20)\ \n Aggregate: groupBy=[[b]], aggr=[[SUM(test.c)]]\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1375,10 +1358,8 @@ mod tests { .build()?; // filter appears below Union let expected = "Union\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ - \n Filter: test2.a = Int64(1)\ - \n TableScan: test2"; + \n TableScan: test, full_filters=[test.a = Int64(1)]\ + \n TableScan: test2, full_filters=[test2.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1398,12 +1379,10 @@ mod tests { // filter appears below Union let expected = "Union\n SubqueryAlias: test2\ \n Projection: test.a AS b\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a = Int64(1)]\ \n SubqueryAlias: test2\ \n Projection: test.a AS b\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1431,11 +1410,9 @@ mod tests { let expected = "Projection: test.a, test1.d\ \n CrossJoin:\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a = Int32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a = Int32(1)]\ \n Projection: test1.d, test1.e, test1.f\ - \n Filter: test1.d > Int32(2)\ - \n TableScan: test1"; + \n TableScan: test1, full_filters=[test1.d > Int32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1460,11 +1437,9 @@ mod tests { let expected = "Projection: test.a, test1.a\ \n CrossJoin:\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a = Int32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a = Int32(1)]\ \n Projection: test1.a, test1.b, test1.c\ - \n Filter: test1.a > Int32(2)\ - \n TableScan: test1"; + \n TableScan: test1, full_filters=[test1.a > Int32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1497,8 +1472,7 @@ mod tests { \n Filter: test.a >= Int64(1)\ \n Limit: skip=0, fetch=1\ \n Projection: test.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1552,6 +1526,10 @@ mod tests { // not part of the test assert_eq!(format!("{plan:?}"), expected); + let expected = "\ + TestUserDefined\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]"; + assert_optimized_plan_eq(&plan, expected) } @@ -1587,11 +1565,9 @@ mod tests { // filter sent to side before the join let expected = "\ Inner Join: test.a = test2.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]\ \n Projection: test2.a\ - \n Filter: test2.a <= Int64(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1626,11 +1602,9 @@ mod tests { // filter sent to side before the join let expected = "\ Inner Join: Using test.a = test2.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]\ \n Projection: test2.a\ - \n Filter: test2.a <= Int64(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1712,8 +1686,7 @@ mod tests { let expected = "\ Inner Join: test.a = test2.a\ \n Projection: test.a, test.b\ - \n Filter: test.b <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.b <= Int64(1)]\ \n Projection: test2.a, test2.c\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) @@ -1828,8 +1801,7 @@ mod tests { // filter sent to left side of the join, not the right let expected = "\ Left Join: Using test.a = test2.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]\ \n Projection: test2.a\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) @@ -1869,8 +1841,7 @@ mod tests { Right Join: Using test.a = test2.a\ \n TableScan: test\ \n Projection: test2.a\ - \n Filter: test2.a <= Int64(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1911,11 +1882,9 @@ mod tests { let expected = "\ Inner Join: test.a = test2.a Filter: test.b < test2.b\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.c > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.c > UInt32(1)]\ \n Projection: test2.a, test2.b, test2.c\ - \n Filter: test2.c > UInt32(4)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.c > UInt32(4)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1955,11 +1924,9 @@ mod tests { let expected = "\ Inner Join: test.a = test2.a\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.b > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.b > UInt32(1)]\ \n Projection: test2.a, test2.b, test2.c\ - \n Filter: test2.c > UInt32(4)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.c > UInt32(4)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1997,11 +1964,9 @@ mod tests { let expected = "\ Inner Join: test.a = test2.b\ \n Projection: test.a\ - \n Filter: test.a > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > UInt32(1)]\ \n Projection: test2.b\ - \n Filter: test2.b > UInt32(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2044,8 +2009,7 @@ mod tests { \n Projection: test.a, test.b, test.c\ \n TableScan: test\ \n Projection: test2.a, test2.b, test2.c\ - \n Filter: test2.c > UInt32(4)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.c > UInt32(4)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2086,8 +2050,7 @@ mod tests { let expected = "\ Right Join: test.a = test2.a Filter: test.b < test2.b AND test2.c > UInt32(4)\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > UInt32(1)]\ \n Projection: test2.a, test2.b, test2.c\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) @@ -2313,8 +2276,7 @@ Projection: a, b // rewrite filter col b to test.a let expected = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10) AND test.c > Int64(10)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > Int64(10), test.c > Int64(10)]\ "; assert_optimized_plan_eq(&plan, expected) @@ -2346,8 +2308,7 @@ Projection: a, b let expected = "\ Projection: b, test.c\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10) AND test.c > Int64(10)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > Int64(10), test.c > Int64(10)]\ "; assert_optimized_plan_eq(&plan, expected) @@ -2373,9 +2334,7 @@ Projection: a, b // rewrite filter col b to test.a, col d to test.c let expected = "\ Projection: test.a AS b, test.c AS d\ - \n Filter: test.a > Int64(10) AND test.c > Int64(10)\ - \n TableScan: test\ - "; + \n TableScan: test, full_filters=[test.a > Int64(10), test.c > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2414,11 +2373,9 @@ Projection: a, b let expected = "\ Inner Join: c = d\ \n Projection: test.a AS c\ - \n Filter: test.a > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > UInt32(1)]\ \n Projection: test2.b AS d\ - \n Filter: test2.b > UInt32(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2446,9 +2403,7 @@ Projection: a, b // rewrite filter col b to test.a let expected = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ - \n TableScan: test\ - "; + \n TableScan: test, full_filters=[test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])]"; assert_optimized_plan_eq(&plan, expected) } @@ -2480,9 +2435,7 @@ Projection: a, b let expected = "\ Projection: b, test.c\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ - \n TableScan: test\ - "; + \n TableScan: test, full_filters=[test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])]"; assert_optimized_plan_eq(&plan, expected) } @@ -2516,11 +2469,10 @@ Projection: a, b // rewrite filter col b to test.a let expected_after = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a IN ()\ + \n TableScan: test, full_filters=[test.a IN ()]\ \n Subquery:\ \n Projection: sq.c\ - \n TableScan: sq\ - \n TableScan: test"; + \n TableScan: sq"; assert_optimized_plan_eq(&plan, expected_after) } @@ -2581,8 +2533,7 @@ Projection: a, b Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\ \n CrossJoin:\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.b > UInt32(1) OR test.c < UInt32(10)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\ \n Projection: test1.a AS d, test1.a AS e\ \n TableScan: test1"; assert_optimized_plan_eq_with_rewrite_predicate(&plan, expected)?; @@ -2630,11 +2581,9 @@ Projection: a, b // Both side will be pushed down. let expected = "\ LeftSemi Join: test1.a = test2.a\ - \n Filter: test1.b > UInt32(1)\ - \n TableScan: test1\ + \n TableScan: test1, full_filters=[test1.b > UInt32(1)]\ \n Projection: test2.a, test2.b\ - \n Filter: test2.b > UInt32(2)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2673,11 +2622,9 @@ Projection: a, b // Both side will be pushed down. let expected = "\ RightSemi Join: test1.a = test2.a\ - \n Filter: test1.b > UInt32(1)\ - \n TableScan: test1\ + \n TableScan: test1, full_filters=[test1.b > UInt32(1)]\ \n Projection: test2.a, test2.b\ - \n Filter: test2.b > UInt32(2)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2723,8 +2670,7 @@ Projection: a, b \n Projection: test1.a, test1.b\ \n TableScan: test1\ \n Projection: test2.a, test2.b\ - \n Filter: test2.b > UInt32(2)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2767,8 +2713,7 @@ Projection: a, b // For right anti, filter of the left side can be pushed down. let expected = "RightAnti Join: test1.a = test2.a Filter: test2.b > UInt32(2)\ \n Projection: test1.a, test1.b\ - \n Filter: test1.b > UInt32(1)\ - \n TableScan: test1\ + \n TableScan: test1, full_filters=[test1.b > UInt32(1)]\ \n Projection: test2.a, test2.b\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index eb9ae3c981d9..daa2569bc51c 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -147,10 +147,6 @@ impl OptimizerRule for PushDownProjection { if !scan.projected_schema.fields().is_empty() => { let mut used_columns: HashSet = HashSet::new(); - // filter expr may not exist in expr in projection. - // like: TableScan: t1 projection=[bool_col, int_col], full_filters=[t1.id = Int32(1)] - // projection=[bool_col, int_col] don't contain `ti.id`. - exprlist_to_columns(&scan.filters, &mut used_columns)?; if projection_is_empty { used_columns .insert(scan.projected_schema.fields()[0].qualified_column()); @@ -570,6 +566,7 @@ mod tests { use crate::OptimizerContext; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::DFSchema; + use datafusion_expr::builder::table_scan_with_filters; use datafusion_expr::expr; use datafusion_expr::expr::Cast; use datafusion_expr::WindowFrame; @@ -979,6 +976,35 @@ mod tests { assert_optimized_plan_eq(&plan, expected) } + #[test] + fn table_full_filter_pushdown() -> Result<()> { + let schema = Schema::new(test_table_scan_fields()); + + let table_scan = table_scan_with_filters( + Some("test"), + &schema, + None, + vec![col("b").eq(lit(1))], + )? + .build()?; + assert_eq!(3, table_scan.schema().fields().len()); + assert_fields_eq(&table_scan, vec!["a", "b", "c"]); + + // there is no need for the first projection + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("b")])? + .project(vec![lit(1).alias("a")])? + .build()?; + + assert_fields_eq(&plan, vec!["a"]); + + let expected = "\ + Projection: Int32(1) AS a\ + \n TableScan: test projection=[a], full_filters=[b = Int32(1)]"; + + assert_optimized_plan_eq(&plan, expected) + } + /// tests that optimizing twice yields same plan #[test] fn test_double_optimization() -> Result<()> { diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index c65768bb8b11..d48607ee56b7 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -833,8 +833,7 @@ mod tests { // before simplify: t.g = power(t.f, 1.0) // after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)" - let expected = - "TableScan: test, unsupported_filters=[g = f AS g = power(f,Float64(1))]"; + let expected = "TableScan: test, full_filters=[g = f AS g = power(f,Float64(1))]"; assert_optimized_plan_eq(&plan, expected) } diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index daa695f77144..dd175c0aca94 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{ }; use arrow::temporal_conversions::{MICROSECONDS, MILLISECONDS, NANOSECONDS}; use datafusion_common::tree_node::{RewriteRecursion, TreeNodeRewriter}; -use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue}; +use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue}; use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast}; use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::utils::from_plan; @@ -90,6 +90,12 @@ impl OptimizerRule for UnwrapCastInComparison { ) -> Result> { let mut schema = merge_schema(plan.inputs()); + if let LogicalPlan::TableScan(ts) = plan { + let source_schema = + DFSchema::try_from_qualified_schema(&ts.table_name, &ts.source.schema())?; + schema.merge(&source_schema); + } + schema.merge(plan.schema()); let mut expr_rewriter = UnwrapCastExprRewriter {