From 58075e2329e989ebe3cede1088ea8849f9eb25bd Mon Sep 17 00:00:00 2001 From: HuSen Date: Fri, 16 Aug 2024 16:43:13 +0800 Subject: [PATCH 1/8] Fix: support NULL input for like operations (#12025) --- .../expr-common/src/type_coercion/binary.rs | 1 + datafusion/sqllogictest/test_files/regexp.slt | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index fd97f9af1328..6d2fb660f669 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -1045,6 +1045,7 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Date: Fri, 16 Aug 2024 04:43:55 -0400 Subject: [PATCH 2/8] Minor: Add error tests for min/max with 2 arguments (#12024) --- datafusion/sqllogictest/test_files/aggregate.slt | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 322ddcdb047b..462acaa266ae 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1881,6 +1881,12 @@ SELECT MIN(c1), MIN(c2) FROM test ---- 0 1 +query error min/max was called with 2 arguments. It requires only 1. +SELECT MIN(c1, c2) FROM test + +query error min/max was called with 2 arguments. It requires only 1. +SELECT MAX(c1, c2) FROM test + # aggregate_grouped query II SELECT c1, SUM(c2) FROM test GROUP BY c1 order by c1 From 5db036e90fd76ebeb3ccf7aab747613895c01abe Mon Sep 17 00:00:00 2001 From: Jonah Gao Date: Fri, 16 Aug 2024 16:44:41 +0800 Subject: [PATCH 3/8] fix: incorrect aggregation result of `bool_and` (#12017) --- .../aggregate/groups_accumulator/bool_op.rs | 13 ++++-- .../functions-aggregate/src/bool_and_or.rs | 9 ++-- .../sqllogictest/test_files/aggregate.slt | 45 +++++++++++++++++++ 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index f4b4c0c93215..149312e5a9c0 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -47,17 +47,22 @@ where /// Function that computes the output bool_fn: F, + + /// The identity element for the boolean operation. + /// Any value combined with this returns the original value. + identity: bool, } impl BooleanGroupsAccumulator where F: Fn(bool, bool) -> bool + Send + Sync, { - pub fn new(bitop_fn: F) -> Self { + pub fn new(bool_fn: F, identity: bool) -> Self { Self { values: BooleanBufferBuilder::new(0), null_state: NullState::new(), - bool_fn: bitop_fn, + bool_fn, + identity, } } } @@ -78,7 +83,9 @@ where if self.values.len() < total_num_groups { let new_groups = total_num_groups - self.values.len(); - self.values.append_n(new_groups, Default::default()); + // Fill with the identity element, so that when the first non-null value is encountered, + // it will combine with the identity and the result will be the first non-null value itself. + self.values.append_n(new_groups, self.identity); } // NullState dispatches / handles tracking nulls and groups that saw no values diff --git a/datafusion/functions-aggregate/src/bool_and_or.rs b/datafusion/functions-aggregate/src/bool_and_or.rs index b993b2a4979c..7cc7d9ff7fec 100644 --- a/datafusion/functions-aggregate/src/bool_and_or.rs +++ b/datafusion/functions-aggregate/src/bool_and_or.rs @@ -151,7 +151,7 @@ impl AggregateUDFImpl for BoolAnd { ) -> Result> { match args.return_type { DataType::Boolean => { - Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y))) + Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y, true))) } _ => not_impl_err!( "GroupsAccumulator not supported for {} with {}", @@ -270,9 +270,10 @@ impl AggregateUDFImpl for BoolOr { args: AccumulatorArgs, ) -> Result> { match args.return_type { - DataType::Boolean => { - Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y))) - } + DataType::Boolean => Ok(Box::new(BooleanGroupsAccumulator::new( + |x, y| x || y, + false, + ))), _ => not_impl_err!( "GroupsAccumulator not supported for {} with {}", args.name, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 462acaa266ae..0cda24d6ff5e 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -3730,6 +3730,51 @@ SELECT bool_or(distinct c1), bool_or(distinct c2), bool_or(distinct c3), bool_or ---- true true true false true true false NULL +# Test issue: https://github.com/apache/datafusion/issues/11846 +statement ok +create table t1(v1 int, v2 boolean); + +statement ok +insert into t1 values (1, true), (1, true); + +statement ok +insert into t1 values (3, null), (3, true); + +statement ok +insert into t1 values (2, false), (2, true); + +statement ok +insert into t1 values (6, false), (6, false); + +statement ok +insert into t1 values (4, null), (4, null); + +statement ok +insert into t1 values (5, false), (5, null); + +query IB +select v1, bool_and(v2) from t1 group by v1 order by v1; +---- +1 true +2 false +3 true +4 NULL +5 false +6 false + +query IB +select v1, bool_or(v2) from t1 group by v1 order by v1; +---- +1 true +2 true +3 true +4 NULL +5 false +6 false + +statement ok +drop table t1; + # All supported timestamp types # "nanos" --> TimestampNanosecondArray From bb921812fa523717774122c8639b654d04bac705 Mon Sep 17 00:00:00 2001 From: Tai Le Manh <49281946+tlm365@users.noreply.github.com> Date: Fri, 16 Aug 2024 17:21:21 +0700 Subject: [PATCH 4/8] Improve performance of REPEAT functions (#12015) * Improve performance of REPEAT functions Signed-off-by: Tai Le Manh * Improve performance of REPEAT functions Signed-off-by: Tai Le Manh * Fix cargo fmt Signed-off-by: Tai Le Manh --------- Signed-off-by: Tai Le Manh --- datafusion/functions/Cargo.toml | 5 + datafusion/functions/benches/repeat.rs | 136 ++++++++++++++++++++++ datafusion/functions/src/string/common.rs | 21 +++- datafusion/functions/src/string/repeat.rs | 84 ++++++------- datafusion/functions/src/unicode/lpad.rs | 19 +-- 5 files changed, 207 insertions(+), 58 deletions(-) create mode 100644 datafusion/functions/benches/repeat.rs diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 688563baecfa..2b3f80fc930b 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -151,3 +151,8 @@ required-features = ["string_expressions"] harness = false name = "pad" required-features = ["unicode_expressions"] + +[[bench]] +harness = false +name = "repeat" +required-features = ["string_expressions"] diff --git a/datafusion/functions/benches/repeat.rs b/datafusion/functions/benches/repeat.rs new file mode 100644 index 000000000000..916c8374e5fb --- /dev/null +++ b/datafusion/functions/benches/repeat.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, OffsetSizeTrait}; +use arrow::util::bench_util::{ + create_string_array_with_len, create_string_view_array_with_len, +}; +use criterion::{black_box, criterion_group, criterion_main, Criterion, SamplingMode}; +use datafusion_expr::ColumnarValue; +use datafusion_functions::string; +use std::sync::Arc; +use std::time::Duration; + +fn create_args( + size: usize, + str_len: usize, + repeat_times: i64, + use_string_view: bool, +) -> Vec { + let number_array = Arc::new(Int64Array::from( + (0..size).map(|_| repeat_times).collect::>(), + )); + + if use_string_view { + let string_array = + Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); + vec![ + ColumnarValue::Array(string_array), + ColumnarValue::Array(number_array), + ] + } else { + let string_array = + Arc::new(create_string_array_with_len::(size, 0.1, str_len)); + + vec![ + ColumnarValue::Array(string_array), + ColumnarValue::Array(Arc::clone(&number_array) as ArrayRef), + ] + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let repeat = string::repeat(); + for size in [1024, 4096] { + // REPEAT 3 TIMES + let repeat_times = 3; + let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); + + let args = create_args::(size, 32, repeat_times, true); + group.bench_function( + &format!( + "repeat_string_view [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_large_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + group.finish(); + + // REPEAT 30 TIMES + let repeat_times = 30; + let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); + + let args = create_args::(size, 32, repeat_times, true); + group.bench_function( + &format!( + "repeat_string_view [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_large_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + group.finish(); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/string/common.rs b/datafusion/functions/src/string/common.rs index 7037c1d1c3c3..54aebb039046 100644 --- a/datafusion/functions/src/string/common.rs +++ b/datafusion/functions/src/string/common.rs @@ -19,8 +19,9 @@ use std::fmt::{Display, Formatter}; use std::sync::Arc; use arrow::array::{ - new_null_array, Array, ArrayDataBuilder, ArrayRef, GenericStringArray, - GenericStringBuilder, OffsetSizeTrait, StringArray, + new_null_array, Array, ArrayAccessor, ArrayDataBuilder, ArrayIter, ArrayRef, + GenericStringArray, GenericStringBuilder, OffsetSizeTrait, StringArray, + StringViewArray, }; use arrow::buffer::{Buffer, MutableBuffer, NullBuffer}; use arrow::datatypes::DataType; @@ -251,6 +252,22 @@ impl<'a> ColumnarValueRef<'a> { } } +pub trait StringArrayType<'a>: ArrayAccessor + Sized { + fn iter(&self) -> ArrayIter; +} + +impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { + fn iter(&self) -> ArrayIter { + GenericStringArray::::iter(self) + } +} + +impl<'a> StringArrayType<'a> for &'a StringViewArray { + fn iter(&self) -> ArrayIter { + StringViewArray::iter(self) + } +} + /// Optimized version of the StringBuilder in Arrow that: /// 1. Precalculating the expected length of the result, avoiding reallocations. /// 2. Avoids creating / incrementally creating a `NullBufferBuilder` diff --git a/datafusion/functions/src/string/repeat.rs b/datafusion/functions/src/string/repeat.rs index a377dee06f41..20e4462784b8 100644 --- a/datafusion/functions/src/string/repeat.rs +++ b/datafusion/functions/src/string/repeat.rs @@ -18,17 +18,20 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait, StringArray}; +use arrow::array::{ + ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, + OffsetSizeTrait, StringViewArray, +}; use arrow::datatypes::DataType; +use arrow::datatypes::DataType::{Int64, LargeUtf8, Utf8, Utf8View}; -use datafusion_common::cast::{ - as_generic_string_array, as_int64_array, as_string_view_array, -}; +use datafusion_common::cast::as_int64_array; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; +use crate::string::common::StringArrayType; use crate::utils::{make_scalar_function, utf8_to_str_type}; #[derive(Debug)] @@ -44,7 +47,6 @@ impl Default for RepeatFunc { impl RepeatFunc { pub fn new() -> Self { - use DataType::*; Self { signature: Signature::one_of( vec![ @@ -79,51 +81,53 @@ impl ScalarUDFImpl for RepeatFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8View => make_scalar_function(repeat_utf8view, vec![])(args), - DataType::Utf8 => make_scalar_function(repeat::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(repeat::, vec![])(args), - other => exec_err!("Unsupported data type {other:?} for function repeat. Expected Utf8, Utf8View or LargeUtf8"), - } + make_scalar_function(repeat, vec![])(args) } } /// Repeats string the specified number of times. /// repeat('Pg', 4) = 'PgPgPgPg' -fn repeat(args: &[ArrayRef]) -> Result { - let string_array = as_generic_string_array::(&args[0])?; +fn repeat(args: &[ArrayRef]) -> Result { let number_array = as_int64_array(&args[1])?; - - let result = string_array - .iter() - .zip(number_array.iter()) - .map(|(string, number)| repeat_common(string, number)) - .collect::>(); - - Ok(Arc::new(result) as ArrayRef) + match args[0].data_type() { + Utf8View => { + let string_view_array = args[0].as_string_view(); + repeat_impl::(string_view_array, number_array) + } + Utf8 => { + let string_array = args[0].as_string::(); + repeat_impl::>(string_array, number_array) + } + LargeUtf8 => { + let string_array = args[0].as_string::(); + repeat_impl::>(string_array, number_array) + } + other => exec_err!( + "Unsupported data type {other:?} for function repeat. \ + Expected Utf8, Utf8View or LargeUtf8." + ), + } } -fn repeat_utf8view(args: &[ArrayRef]) -> Result { - let string_view_array = as_string_view_array(&args[0])?; - let number_array = as_int64_array(&args[1])?; - - let result = string_view_array +fn repeat_impl<'a, T, S>(string_array: S, number_array: &Int64Array) -> Result +where + T: OffsetSizeTrait, + S: StringArrayType<'a>, +{ + let mut builder: GenericStringBuilder = GenericStringBuilder::new(); + string_array .iter() .zip(number_array.iter()) - .map(|(string, number)| repeat_common(string, number)) - .collect::(); - - Ok(Arc::new(result) as ArrayRef) -} - -fn repeat_common(string: Option<&str>, number: Option) -> Option { - match (string, number) { - (Some(string), Some(number)) if number >= 0 => { - Some(string.repeat(number as usize)) - } - (Some(_), Some(_)) => Some("".to_string()), - _ => None, - } + .for_each(|(string, number)| match (string, number) { + (Some(string), Some(number)) if number >= 0 => { + builder.append_value(string.repeat(number as usize)) + } + (Some(_), Some(_)) => builder.append_value(""), + _ => builder.append_null(), + }); + let array = builder.finish(); + + Ok(Arc::new(array) as ArrayRef) } #[cfg(test)] diff --git a/datafusion/functions/src/unicode/lpad.rs b/datafusion/functions/src/unicode/lpad.rs index 521cdc5d0ff0..e102673c4253 100644 --- a/datafusion/functions/src/unicode/lpad.rs +++ b/datafusion/functions/src/unicode/lpad.rs @@ -20,8 +20,8 @@ use std::fmt::Write; use std::sync::Arc; use arrow::array::{ - Array, ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, - GenericStringBuilder, Int64Array, OffsetSizeTrait, StringViewArray, + Array, ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, + OffsetSizeTrait, StringViewArray, }; use arrow::datatypes::DataType; use unicode_segmentation::UnicodeSegmentation; @@ -32,6 +32,7 @@ use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use crate::string::common::StringArrayType; use crate::utils::{make_scalar_function, utf8_to_str_type}; #[derive(Debug)] @@ -248,20 +249,6 @@ where Ok(Arc::new(array) as ArrayRef) } -trait StringArrayType<'a>: ArrayAccessor + Sized { - fn iter(&self) -> ArrayIter; -} -impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { - fn iter(&self) -> ArrayIter { - GenericStringArray::::iter(self) - } -} -impl<'a> StringArrayType<'a> for &'a StringViewArray { - fn iter(&self) -> ArrayIter { - StringViewArray::iter(self) - } -} - #[cfg(test)] mod tests { use crate::unicode::lpad::LPadFunc; From 300a08c622588c935bd481e6565f97556c3e629a Mon Sep 17 00:00:00 2001 From: Dmitry Bugakov Date: Fri, 16 Aug 2024 13:13:52 +0200 Subject: [PATCH 5/8] support Utf8View (#12019) --- datafusion/functions/src/unicode/substr.rs | 124 +++++++++++++++--- .../sqllogictest/test_files/string_view.slt | 28 +++- 2 files changed, 129 insertions(+), 23 deletions(-) diff --git a/datafusion/functions/src/unicode/substr.rs b/datafusion/functions/src/unicode/substr.rs index 9d15920bb655..9fd8c75eab23 100644 --- a/datafusion/functions/src/unicode/substr.rs +++ b/datafusion/functions/src/unicode/substr.rs @@ -19,10 +19,12 @@ use std::any::Any; use std::cmp::max; use std::sync::Arc; -use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; +use arrow::array::{ + ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, OffsetSizeTrait, +}; use arrow::datatypes::DataType; -use datafusion_common::cast::{as_generic_string_array, as_int64_array}; +use datafusion_common::cast::as_int64_array; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; @@ -51,6 +53,8 @@ impl SubstrFunc { Exact(vec![LargeUtf8, Int64]), Exact(vec![Utf8, Int64, Int64]), Exact(vec![LargeUtf8, Int64, Int64]), + Exact(vec![Utf8View, Int64]), + Exact(vec![Utf8View, Int64, Int64]), ], Volatility::Immutable, ), @@ -77,11 +81,7 @@ impl ScalarUDFImpl for SubstrFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8 => make_scalar_function(substr::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(substr::, vec![])(args), - other => exec_err!("Unsupported data type {other:?} for function substr"), - } + make_scalar_function(substr, vec![])(args) } fn aliases(&self) -> &[String] { @@ -89,18 +89,39 @@ impl ScalarUDFImpl for SubstrFunc { } } +pub fn substr(args: &[ArrayRef]) -> Result { + match args[0].data_type() { + DataType::Utf8 => { + let string_array = args[0].as_string::(); + calculate_substr::<_, i32>(string_array, &args[1..]) + } + DataType::LargeUtf8 => { + let string_array = args[0].as_string::(); + calculate_substr::<_, i64>(string_array, &args[1..]) + } + DataType::Utf8View => { + let string_array = args[0].as_string_view(); + calculate_substr::<_, i32>(string_array, &args[1..]) + } + other => exec_err!("Unsupported data type {other:?} for function substr"), + } +} + /// Extracts the substring of string starting at the start'th character, and extending for count characters if that is specified. (Same as substring(string from start for count).) /// substr('alphabet', 3) = 'phabet' /// substr('alphabet', 3, 2) = 'ph' /// The implementation uses UTF-8 code points as characters -pub fn substr(args: &[ArrayRef]) -> Result { +fn calculate_substr<'a, V, T>(string_array: V, args: &[ArrayRef]) -> Result +where + V: ArrayAccessor, + T: OffsetSizeTrait, +{ match args.len() { - 2 => { - let string_array = as_generic_string_array::(&args[0])?; - let start_array = as_int64_array(&args[1])?; + 1 => { + let iter = ArrayIter::new(string_array); + let start_array = as_int64_array(&args[0])?; - let result = string_array - .iter() + let result = iter .zip(start_array.iter()) .map(|(string, start)| match (string, start) { (Some(string), Some(start)) => { @@ -113,16 +134,14 @@ pub fn substr(args: &[ArrayRef]) -> Result { _ => None, }) .collect::>(); - Ok(Arc::new(result) as ArrayRef) } - 3 => { - let string_array = as_generic_string_array::(&args[0])?; - let start_array = as_int64_array(&args[1])?; - let count_array = as_int64_array(&args[2])?; + 2 => { + let iter = ArrayIter::new(string_array); + let start_array = as_int64_array(&args[0])?; + let count_array = as_int64_array(&args[1])?; - let result = string_array - .iter() + let result = iter .zip(start_array.iter()) .zip(count_array.iter()) .map(|((string, start), count)| match (string, start, count) { @@ -162,6 +181,71 @@ mod tests { #[test] fn test_functions() -> Result<()> { + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(None)), + ColumnarValue::Scalar(ScalarValue::from(1i64)), + ], + Ok(None), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "alphabet" + )))), + ColumnarValue::Scalar(ScalarValue::from(0i64)), + ], + Ok(Some("alphabet")), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "joséésoj" + )))), + ColumnarValue::Scalar(ScalarValue::from(5i64)), + ], + Ok(Some("ésoj")), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "alphabet" + )))), + ColumnarValue::Scalar(ScalarValue::from(3i64)), + ColumnarValue::Scalar(ScalarValue::from(2i64)), + ], + Ok(Some("ph")), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "alphabet" + )))), + ColumnarValue::Scalar(ScalarValue::from(3i64)), + ColumnarValue::Scalar(ScalarValue::from(20i64)), + ], + Ok(Some("phabet")), + &str, + Utf8, + StringArray + ); test_function!( SubstrFunc::new(), &[ diff --git a/datafusion/sqllogictest/test_files/string_view.slt b/datafusion/sqllogictest/test_files/string_view.slt index e094bcaf1b5d..82a714a432ba 100644 --- a/datafusion/sqllogictest/test_files/string_view.slt +++ b/datafusion/sqllogictest/test_files/string_view.slt @@ -521,7 +521,30 @@ logical_plan 01)Projection: test.column1_utf8view LIKE Utf8View("foo") AS like, test.column1_utf8view ILIKE Utf8View("foo") AS ilike 02)--TableScan: test projection=[column1_utf8view] +## Ensure no casts for SUBSTR +query TT +EXPLAIN SELECT + SUBSTR(column1_utf8view, 1, 3) as c1, + SUBSTR(column2_utf8, 1, 3) as c2, + SUBSTR(column2_large_utf8, 1, 3) as c3 +FROM test; +---- +logical_plan +01)Projection: substr(test.column1_utf8view, Int64(1), Int64(3)) AS c1, substr(test.column2_utf8, Int64(1), Int64(3)) AS c2, substr(test.column2_large_utf8, Int64(1), Int64(3)) AS c3 +02)--TableScan: test projection=[column2_utf8, column2_large_utf8, column1_utf8view] + +query TTT +SELECT + SUBSTR(column1_utf8view, 1, 3) as c1, + SUBSTR(column2_utf8, 1, 3) as c2, + SUBSTR(column2_large_utf8, 1, 3) as c3 +FROM test; +---- +And X X +Xia Xia Xia +Rap R R +NULL R R ## Ensure no casts for ASCII @@ -1047,9 +1070,8 @@ EXPLAIN SELECT FROM test; ---- logical_plan -01)Projection: substr(__common_expr_1, Int64(1)) AS c, substr(__common_expr_1, Int64(1), Int64(2)) AS c2 -02)--Projection: CAST(test.column1_utf8view AS Utf8) AS __common_expr_1 -03)----TableScan: test projection=[column1_utf8view] +01)Projection: substr(test.column1_utf8view, Int64(1)) AS c, substr(test.column1_utf8view, Int64(1), Int64(2)) AS c2 +02)--TableScan: test projection=[column1_utf8view] ## Ensure no casts for SUBSTRINDEX query TT From 08f6e54074ba3207fc68665675c33faaffc7282a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Fri, 16 Aug 2024 23:32:34 +0800 Subject: [PATCH 6/8] Minor: Remove wrong comment on `Accumulator::evaluate` and `Accumulator::state` (#12001) * Remove wrong comment * Remove wrong comment on Accumulator::state * Not call twice comment * Adjust comment order --- datafusion/expr-common/src/accumulator.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index 262646d8ba3a..eac91c4f8efc 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -64,8 +64,8 @@ pub trait Accumulator: Send + Sync + Debug { /// For example, the `SUM` accumulator maintains a running sum, /// and `evaluate` will produce that running sum as its output. /// - /// After this call, the accumulator's internal state should be - /// equivalent to when it was first created. + /// This function should not be called twice, otherwise it will + /// result in potentially non-deterministic behavior. /// /// This function gets `&mut self` to allow for the accumulator to build /// arrow compatible internal state that can be returned without copying @@ -85,8 +85,8 @@ pub trait Accumulator: Send + Sync + Debug { /// Returns the intermediate state of the accumulator, consuming the /// intermediate state. /// - /// After this call, the accumulator's internal state should be - /// equivalent to when it was first created. + /// This function should not be called twice, otherwise it will + /// result in potentially non-deterministic behavior. /// /// This function gets `&mut self` to allow for the accumulator to build /// arrow compatible internal state that can be returned without copying From dc84fa56f53534dcb418e01ad3b37cfadcb493e7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 16 Aug 2024 11:38:33 -0400 Subject: [PATCH 7/8] Minor: cleanup `.gitignore` (#12035) --- .gitignore | 46 +--------------------------------------------- 1 file changed, 1 insertion(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index 05479fd0f07d..05570eacf630 100644 --- a/.gitignore +++ b/.gitignore @@ -16,45 +16,11 @@ # under the License. apache-rat-*.jar -arrow-src.tar -arrow-src.tar.gz - -# Compiled source -*.a -*.dll -*.o -*.py[ocd] -*.so -*.so.* -*.bundle -*.dylib -.build_cache_dir -dependency-reduced-pom.xml -MANIFEST -compile_commands.json -build.ninja - -# Generated Visual Studio files -*.vcxproj -*.vcxproj.* -*.sln -*.iml # Linux perf sample data perf.data perf.data.old -cpp/.idea/ -.clangd/ -cpp/.clangd/ -cpp/apidoc/xml/ -docs/example.gz -docs/example1.dat -docs/example3.dat -python/.eggs/ -python/doc/ -# Egg metadata -*.egg-info .vscode .idea/ @@ -66,16 +32,9 @@ docker_cache .*.swp .*.swo -site/ - -# R files -**/.Rproj.user -**/*.Rcheck/ -**/.Rhistory -.Rproj.user +venv/* # macOS -cpp/Brewfile.lock.json .DS_Store # docker volumes used for caching @@ -90,9 +49,6 @@ rusty-tags.vi .history .flatbuffers/ -.vscode -venv/* - # apache release artifacts dev/dist From 2a16704db7af0045d465cda39b90d1a17e68dbe8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 16 Aug 2024 13:35:01 -0400 Subject: [PATCH 8/8] Improve documentation about `ParquetExec` / Parquet predicate pushdown (#11994) * Minor: improve ParquetExec docs * typo * clippy * fix whitespace so rustdoc does not treat as tests * Apply suggestions from code review Co-authored-by: Oleks V * expound upon column rewriting in the context of schema evolution --------- Co-authored-by: Oleks V --- datafusion/common/src/tree_node.rs | 3 + .../datasource/physical_plan/parquet/mod.rs | 60 ++++-- .../physical_plan/parquet/row_filter.rs | 194 ++++++++++++++---- 3 files changed, 192 insertions(+), 65 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index bcf4d7664acc..88300e3edd0e 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -486,6 +486,9 @@ pub trait TreeNodeVisitor<'n>: Sized { /// A [Visitor](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively /// rewriting [`TreeNode`]s via [`TreeNode::rewrite`]. /// +/// For example you can implement this trait on a struct to rewrite `Expr` or +/// `LogicalPlan` that needs to track state during the rewrite. +/// /// See [`TreeNode`] for more details on available APIs /// /// When passed to [`TreeNode::rewrite`], [`TreeNodeRewriter::f_down`] and diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 72aabefba595..cb026522cfa8 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -116,13 +116,12 @@ pub use writer::plan_to_parquet; /// /// Supports the following optimizations: /// -/// * Concurrent reads: Can read from one or more files in parallel as multiple +/// * Concurrent reads: reads from one or more files in parallel as multiple /// partitions, including concurrently reading multiple row groups from a single /// file. /// -/// * Predicate push down: skips row groups and pages based on -/// min/max/null_counts in the row group metadata, the page index and bloom -/// filters. +/// * Predicate push down: skips row groups, pages, rows based on metadata +/// and late materialization. See "Predicate Pushdown" below. /// /// * Projection pushdown: reads and decodes only the columns required. /// @@ -132,9 +131,8 @@ pub use writer::plan_to_parquet; /// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more /// details. /// -/// * Schema adapters: read parquet files with different schemas into a unified -/// table schema. This can be used to implement "schema evolution". See -/// [`SchemaAdapterFactory`] for more details. +/// * Schema evolution: read parquet files with different schemas into a unified +/// table schema. See [`SchemaAdapterFactory`] for more details. /// /// * metadata_size_hint: controls the number of bytes read from the end of the /// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a @@ -144,6 +142,29 @@ pub use writer::plan_to_parquet; /// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages /// based on external information. See "Implementing External Indexes" below /// +/// # Predicate Pushdown +/// +/// `ParquetExec` uses the provided [`PhysicalExpr`] predicate as a filter to +/// skip reading unnecessary data and improve query performance using several techniques: +/// +/// * Row group pruning: skips entire row groups based on min/max statistics +/// found in [`ParquetMetaData`] and any Bloom filters that are present. +/// +/// * Page pruning: skips individual pages within a ColumnChunk using the +/// [Parquet PageIndex], if present. +/// +/// * Row filtering: skips rows within a page using a form of late +/// materialization. When possible, predicates are applied by the parquet +/// decoder *during* decode (see [`ArrowPredicate`] and [`RowFilter`] for more +/// details). This is only enabled if `ParquetScanOptions::pushdown_filters` is set to true. +/// +/// Note: If the predicate can not be used to accelerate the scan, it is ignored +/// (no error is raised on predicate evaluation errors). +/// +/// [`ArrowPredicate`]: parquet::arrow::arrow_reader::ArrowPredicate +/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter +/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// /// # Implementing External Indexes /// /// It is possible to restrict the row groups and selections within those row @@ -199,10 +220,11 @@ pub use writer::plan_to_parquet; /// applying predicates to metadata. The plan and projections are used to /// determine what pages must be read. /// -/// * Step 4: The stream begins reading data, fetching the required pages -/// and incrementally decoding them. +/// * Step 4: The stream begins reading data, fetching the required parquet +/// pages incrementally decoding them, and applying any row filters (see +/// [`Self::with_pushdown_filters`]). /// -/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a +/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a /// [`SchemaAdapter`] to match the table schema. By default missing columns are /// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. /// @@ -268,13 +290,10 @@ impl ParquetExecBuilder { } } - /// Set the predicate for the scan. - /// - /// The ParquetExec uses this predicate to filter row groups and data pages - /// using the Parquet statistics and bloom filters. + /// Set the filter predicate when reading. /// - /// If the predicate can not be used to prune the scan, it is ignored (no - /// error is raised). + /// See the "Predicate Pushdown" section of the [`ParquetExec`] documenation + /// for more details. pub fn with_predicate(mut self, predicate: Arc) -> Self { self.predicate = Some(predicate); self @@ -291,7 +310,7 @@ impl ParquetExecBuilder { self } - /// Set the table parquet options that control how the ParquetExec reads. + /// Set the options for controlling how the ParquetExec reads parquet files. /// /// See also [`Self::new_with_options`] pub fn with_table_parquet_options( @@ -480,11 +499,8 @@ impl ParquetExec { self } - /// If true, any filter [`Expr`]s on the scan will converted to a - /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the - /// `ParquetRecordBatchStream`. These filters are applied by the - /// parquet decoder to skip unecessairly decoding other columns - /// which would not pass the predicate. Defaults to false + /// If true, the predicate will be used during the parquet scan. + /// Defaults to false /// /// [`Expr`]: datafusion_expr::Expr pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 9de132169389..23fdadc2cdee 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -15,6 +15,50 @@ // specific language governing permissions and limitations // under the License. +//! Utilities to push down of DataFusion filter predicates (any DataFusion +//! `PhysicalExpr` that evaluates to a [`BooleanArray`]) to the parquet decoder +//! level in `arrow-rs`. +//! +//! DataFusion will use a `ParquetRecordBatchStream` to read data from parquet +//! into [`RecordBatch`]es. +//! +//! The `ParquetRecordBatchStream` takes an optional `RowFilter` which is itself +//! a Vec of `Box`. During decoding, the predicates are +//! evaluated in order, to generate a mask which is used to avoid decoding rows +//! in projected columns which do not pass the filter which can significantly +//! reduce the amount of compute required for decoding and thus improve query +//! performance. +//! +//! Since the predicates are applied serially in the order defined in the +//! `RowFilter`, the optimal ordering depends on the exact filters. The best +//! filters to execute first have two properties: +//! +//! 1. They are relatively inexpensive to evaluate (e.g. they read +//! column chunks which are relatively small) +//! +//! 2. They filter many (contiguous) rows, reducing the amount of decoding +//! required for subsequent filters and projected columns +//! +//! If requested, this code will reorder the filters based on heuristics try and +//! reduce the evaluation cost. +//! +//! The basic algorithm for constructing the `RowFilter` is as follows +//! +//! 1. Break conjunctions into separate predicates. An expression +//! like `a = 1 AND (b = 2 AND c = 3)` would be +//! separated into the expressions `a = 1`, `b = 2`, and `c = 3`. +//! 2. Determine whether each predicate can be evaluated as an `ArrowPredicate`. +//! 3. Determine, for each predicate, the total compressed size of all +//! columns required to evaluate the predicate. +//! 4. Determine, for each predicate, whether all columns required to +//! evaluate the expression are sorted. +//! 5. Re-order the predicate by total size (from step 3). +//! 6. Partition the predicates according to whether they are sorted (from step 4) +//! 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`. +//! 8. Build the `RowFilter` with the sorted predicates followed by +//! the unsorted predicates. Within each partition, predicates are +//! still be sorted by size. + use std::collections::BTreeSet; use std::sync::Arc; @@ -40,41 +84,24 @@ use crate::physical_plan::metrics; use super::ParquetFileMetrics; -/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which -/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`. -/// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet into arrow `RecordBatch`es. -/// When constructing the `ParquetRecordBatchStream` you can provide a `RowFilter` which is itself just a vector -/// of `Box`. During decoding, the predicates are evaluated to generate a mask which is used -/// to avoid decoding rows in projected columns which are not selected which can significantly reduce the amount -/// of compute required for decoding. +/// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform +/// row-level filtering during parquet decoding. /// -/// Since the predicates are applied serially in the order defined in the `RowFilter`, the optimal ordering -/// will depend on the exact filters. The best filters to execute first have two properties: -/// 1. The are relatively inexpensive to evaluate (e.g. they read column chunks which are relatively small) -/// 2. They filter a lot of rows, reducing the amount of decoding required for subsequent filters and projected columns +/// See the module level documentation for more information. /// -/// Given the metadata exposed by parquet, the selectivity of filters is not easy to estimate so the heuristics we use here primarily -/// focus on the evaluation cost. +/// Implements the `ArrowPredicate` trait used by the parquet decoder /// -/// The basic algorithm for constructing the `RowFilter` is as follows -/// 1. Recursively break conjunctions into separate predicates. An expression like `a = 1 AND (b = 2 AND c = 3)` would be -/// separated into the expressions `a = 1`, `b = 2`, and `c = 3`. -/// 2. Determine whether each predicate is suitable as an `ArrowPredicate`. As long as the predicate does not reference any projected columns -/// or columns with non-primitive types, then it is considered suitable. -/// 3. Determine, for each predicate, the total compressed size of all columns required to evaluate the predicate. -/// 4. Determine, for each predicate, whether all columns required to evaluate the expression are sorted. -/// 5. Re-order the predicate by total size (from step 3). -/// 6. Partition the predicates according to whether they are sorted (from step 4) -/// 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`. -/// 8. Build the `RowFilter` with the sorted predicates followed by the unsorted predicates. Within each partition -/// the predicates will still be sorted by size. - -/// A predicate which can be passed to `ParquetRecordBatchStream` to perform row-level -/// filtering during parquet decoding. +/// An expression can be evaluated as a `DatafusionArrowPredicate` if it: +/// * Does not reference any projected columns +/// * Does not reference columns with non-primitive types (e.g. structs / lists) #[derive(Debug)] pub(crate) struct DatafusionArrowPredicate { + /// the filter expression physical_expr: Arc, + /// Path to the columns in the parquet schema required to evaluate the + /// expression projection_mask: ProjectionMask, + /// Columns required to evaluate the expression in the arrow schema projection: Vec, /// how many rows were filtered out by this predicate rows_filtered: metrics::Count, @@ -85,6 +112,7 @@ pub(crate) struct DatafusionArrowPredicate { } impl DatafusionArrowPredicate { + /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate` pub fn try_new( candidate: FilterCandidate, schema: &Schema, @@ -152,9 +180,12 @@ impl ArrowPredicate for DatafusionArrowPredicate { } } -/// A candidate expression for creating a `RowFilter` contains the -/// expression as well as data to estimate the cost of evaluating -/// the resulting expression. +/// A candidate expression for creating a `RowFilter`. +/// +/// Each candidate contains the expression as well as data to estimate the cost +/// of evaluating the resulting expression. +/// +/// See the module level documentation for more information. pub(crate) struct FilterCandidate { expr: Arc, required_bytes: usize, @@ -162,19 +193,55 @@ pub(crate) struct FilterCandidate { projection: Vec, } -/// Helper to build a `FilterCandidate`. This will do several things +/// Helper to build a `FilterCandidate`. +/// +/// This will do several things /// 1. Determine the columns required to evaluate the expression /// 2. Calculate data required to estimate the cost of evaluating the filter -/// 3. Rewrite column expressions in the predicate which reference columns not in the particular file schema. -/// This is relevant in the case where we have determined the table schema by merging all individual file schemas -/// and any given file may or may not contain all columns in the merged schema. If a particular column is not present -/// we replace the column expression with a literal expression that produces a null value. +/// 3. Rewrite column expressions in the predicate which reference columns not +/// in the particular file schema. +/// +/// # Schema Rewrite +/// +/// When parquet files are read in the context of "schema evolution" there are +/// potentially wo schemas: +/// +/// 1. The table schema (the columns of the table that the parquet file is part of) +/// 2. The file schema (the columns actually in the parquet file) +/// +/// There are times when the table schema contains columns that are not in the +/// file schema, such as when new columns have been added in new parquet files +/// but old files do not have the columns. +/// +/// When a file is missing a column from the table schema, the value of the +/// missing column is filled in with `NULL` via a `SchemaAdapter`. +/// +/// When a predicate is pushed down to the parquet reader, the predicate is +/// evaluated in the context of the file schema. If the predicate references a +/// column that is in the table schema but not in the file schema, the column +/// reference must be rewritten to a literal expression that represents the +/// `NULL` value that would be produced by the `SchemaAdapter`. +/// +/// For example, if: +/// * The table schema is `id, name, address` +/// * The file schema is `id, name` (missing the `address` column) +/// * predicate is `address = 'foo'` +/// +/// When evaluating the predicate as a filter on the parquet file, the predicate +/// must be rewritten to `NULL = 'foo'` as the `address` column will be filled +/// in with `NULL` values during the rest of the evaluation. struct FilterCandidateBuilder<'a> { expr: Arc, + /// The schema of this parquet file file_schema: &'a Schema, + /// The schema of the table (merged schema) -- columns may be in different + /// order than in the file and have columns that are not in the file schema table_schema: &'a Schema, required_column_indices: BTreeSet, + /// Does the expression require any non-primitive columns (like structs)? non_primitive_columns: bool, + /// Does the expression reference any columns that are in the table + /// schema but not in the file schema? projected_columns: bool, } @@ -194,6 +261,13 @@ impl<'a> FilterCandidateBuilder<'a> { } } + /// Attempt to build a `FilterCandidate` from the expression + /// + /// # Return values + /// + /// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter + /// * `Ok(None)` if the expression cannot be used as an ArrowFilter + /// * `Err(e)` if an error occurs while building the candidate pub fn build( mut self, metadata: &ParquetMetaData, @@ -217,9 +291,13 @@ impl<'a> FilterCandidateBuilder<'a> { } } +/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that +/// walks the expression tree and rewrites it in preparation of becoming +/// `FilterCandidate`. impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> { type Node = Arc; + /// Called before visiting each child fn f_down( &mut self, node: Arc, @@ -243,13 +321,19 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> { Ok(Transformed::no(node)) } + /// After visiting all children, rewrite column references to nulls if + /// they are not in the file schema fn f_up( &mut self, expr: Arc, ) -> Result>> { + // if the expression is a column, is it in the file schema? if let Some(column) = expr.as_any().downcast_ref::() { if self.file_schema.field_with_name(column.name()).is_err() { - // the column expr must be in the table schema + // Replace the column reference with a NULL (using the type from the table schema) + // e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'` + // + // See comments on `FilterCandidateBuilder` for more information return match self.table_schema.field_with_name(column.name()) { Ok(field) => { // return the null value corresponding to the data type @@ -294,9 +378,11 @@ fn remap_projection(src: &[usize]) -> Vec { projection } -/// Calculate the total compressed size of all `Column's required for -/// predicate `Expr`. This should represent the total amount of file IO -/// required to evaluate the predicate. +/// Calculate the total compressed size of all `Column`'s required for +/// predicate `Expr`. +/// +/// This value represents the total amount of IO required to evaluate the +/// predicate. fn size_of_columns( columns: &BTreeSet, metadata: &ParquetMetaData, @@ -312,8 +398,10 @@ fn size_of_columns( Ok(total_size) } -/// For a given set of `Column`s required for predicate `Expr` determine whether all -/// columns are sorted. Sorted columns may be queried more efficiently in the presence of +/// For a given set of `Column`s required for predicate `Expr` determine whether +/// all columns are sorted. +/// +/// Sorted columns may be queried more efficiently in the presence of /// a PageIndex. fn columns_sorted( _columns: &BTreeSet, @@ -323,7 +411,20 @@ fn columns_sorted( Ok(false) } -/// Build a [`RowFilter`] from the given predicate `Expr` +/// Build a [`RowFilter`] from the given predicate `Expr` if possible +/// +/// # returns +/// * `Ok(Some(row_filter))` if the expression can be used as RowFilter +/// * `Ok(None)` if the expression cannot be used as an RowFilter +/// * `Err(e)` if an error occurs while building the filter +/// +/// Note that the returned `RowFilter` may not contains all conjuncts in the +/// original expression. This is because some conjuncts may not be able to be +/// evaluated as an `ArrowPredicate` and will be ignored. +/// +/// For example, if the expression is `a = 1 AND b = 2 AND c = 3` and `b = 2` +/// can not be evaluated for some reason, the returned `RowFilter` will contain +/// `a = 1` and `c = 3`. pub fn build_row_filter( expr: &Arc, file_schema: &Schema, @@ -336,8 +437,11 @@ pub fn build_row_filter( let rows_filtered = &file_metrics.pushdown_rows_filtered; let time = &file_metrics.pushdown_eval_time; + // Split into conjuncts: + // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); + // Determine which conjuncts can be evaluated as ArrowPredicates, if any let mut candidates: Vec = predicates .into_iter() .flat_map(|expr| { @@ -347,9 +451,11 @@ pub fn build_row_filter( }) .collect(); + // no candidates if candidates.is_empty() { Ok(None) } else if reorder_predicates { + // attempt to reorder the predicates by size and whether they are sorted candidates.sort_by_key(|c| c.required_bytes); let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) = @@ -385,6 +491,8 @@ pub fn build_row_filter( Ok(Some(RowFilter::new(filters))) } else { + // otherwise evaluate the predicates in the order the appeared in the + // original expressions let mut filters: Vec> = vec![]; for candidate in candidates { let filter = DatafusionArrowPredicate::try_new(