From c9bd2910c353ffe4efcf888737b6ee4011f172a6 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 26 Apr 2024 20:37:58 +0800 Subject: [PATCH] ScalarUDF: Remove `supports_zero_argument` and avoid creating null array for empty args (#10193) * Avoid create null array for empty args Signed-off-by: jayzhan211 * fix test Signed-off-by: jayzhan211 * fix test Signed-off-by: jayzhan211 * return scalar instead of array Signed-off-by: jayzhan211 * remove supports 0 args in scalarudf Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * rm test1 Signed-off-by: jayzhan211 * invoke no args and support randomness Signed-off-by: jayzhan211 * rm randomness Signed-off-by: jayzhan211 * add func with no args Signed-off-by: jayzhan211 * array Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 --- .../physical_optimizer/projection_pushdown.rs | 4 - .../user_defined_scalar_functions.rs | 202 ++++++------------ .../expr/src/type_coercion/functions.rs | 1 + datafusion/expr/src/udf.rs | 29 ++- datafusion/functions-array/src/make_array.rs | 4 + datafusion/functions/src/math/pi.rs | 18 +- datafusion/functions/src/math/random.rs | 48 +---- datafusion/functions/src/string/uuid.rs | 16 +- datafusion/physical-expr/src/functions.rs | 1 - .../physical-expr/src/scalar_function.rs | 34 ++- datafusion/physical-expr/src/udf.rs | 1 - .../proto/src/physical_plan/from_proto.rs | 2 - .../tests/cases/roundtrip_physical_plan.rs | 2 - 13 files changed, 130 insertions(+), 232 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 359916de0f1e..f07cf1fc6f86 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1379,7 +1379,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1448,7 +1447,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 3))), @@ -1520,7 +1518,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1589,7 +1586,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d_new", 3))), diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index c40573a8df80..4f262b54fb20 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -16,10 +16,7 @@ // under the License. use arrow::compute::kernels::numeric::add; -use arrow_array::{ - Array, ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch, UInt8Array, -}; -use arrow_schema::DataType::Float64; +use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState}; use datafusion::prelude::*; @@ -36,9 +33,7 @@ use datafusion_expr::{ Accumulator, ColumnarValue, CreateFunction, ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; -use rand::{thread_rng, Rng}; use std::any::Any; -use std::iter; use std::sync::Arc; /// test that casting happens on udfs. @@ -168,6 +163,48 @@ async fn scalar_udf() -> Result<()> { Ok(()) } +struct Simple0ArgsScalarUDF { + name: String, + signature: Signature, + return_type: DataType, +} + +impl std::fmt::Debug for Simple0ArgsScalarUDF { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("ScalarUDF") + .field("name", &self.name) + .field("signature", &self.signature) + .field("fun", &"") + .finish() + } +} + +impl ScalarUDFImpl for Simple0ArgsScalarUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(self.return_type.clone()) + } + + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) + } + + fn invoke_no_args(&self, _number_rows: usize) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(100)))) + } +} + #[tokio::test] async fn scalar_udf_zero_params() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); @@ -179,20 +216,14 @@ async fn scalar_udf_zero_params() -> Result<()> { let ctx = SessionContext::new(); ctx.register_batch("t", batch)?; - // create function just returns 100 regardless of inp - let myfunc = Arc::new(|_args: &[ColumnarValue]| { - Ok(ColumnarValue::Array( - Arc::new((0..1).map(|_| 100).collect::()) as ArrayRef, - )) - }); - ctx.register_udf(create_udf( - "get_100", - vec![], - Arc::new(DataType::Int32), - Volatility::Immutable, - myfunc, - )); + let get_100_udf = Simple0ArgsScalarUDF { + name: "get_100".to_string(), + signature: Signature::exact(vec![], Volatility::Immutable), + return_type: DataType::Int32, + }; + + ctx.register_udf(ScalarUDF::from(get_100_udf)); let result = plan_and_collect(&ctx, "select get_100() a from t").await?; let expected = [ @@ -403,123 +434,6 @@ async fn test_user_defined_functions_with_alias() -> Result<()> { Ok(()) } -#[derive(Debug)] -pub struct RandomUDF { - signature: Signature, -} - -impl RandomUDF { - pub fn new() -> Self { - Self { - signature: Signature::any(0, Volatility::Volatile), - } - } -} - -impl ScalarUDFImpl for RandomUDF { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn name(&self) -> &str { - "random_udf" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(Float64) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - // This udf is always invoked with zero argument so its argument - // is a null array indicating the batch size. - ColumnarValue::Array(array) if array.data_type().is_null() => array.len(), - _ => { - return Err(datafusion::error::DataFusionError::Internal( - "Invalid argument type".to_string(), - )) - } - }; - let mut rng = thread_rng(); - let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len); - let array = Float64Array::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) - } -} - -/// Ensure that a user defined function with zero argument will be invoked -/// with a null array indicating the batch size. -#[tokio::test] -async fn test_user_defined_functions_zero_argument() -> Result<()> { - let ctx = SessionContext::new(); - - let schema = Arc::new(Schema::new(vec![Field::new( - "index", - DataType::UInt8, - false, - )])); - - let batch = RecordBatch::try_new( - schema, - vec![Arc::new(UInt8Array::from_iter_values([1, 2, 3]))], - )?; - - ctx.register_batch("data_table", batch)?; - - let random_normal_udf = ScalarUDF::from(RandomUDF::new()); - ctx.register_udf(random_normal_udf); - - let result = plan_and_collect( - &ctx, - "SELECT random_udf() AS random_udf, random() AS native_random FROM data_table", - ) - .await?; - - assert_eq!(result.len(), 1); - let batch = &result[0]; - let random_udf = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let native_random = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!(random_udf.len(), native_random.len()); - - let mut previous = -1.0; - for i in 0..random_udf.len() { - assert!(random_udf.value(i) >= 0.0 && random_udf.value(i) < 1.0); - assert!(random_udf.value(i) != previous); - previous = random_udf.value(i); - } - - Ok(()) -} - -#[tokio::test] -async fn deregister_udf() -> Result<()> { - let random_normal_udf = ScalarUDF::from(RandomUDF::new()); - let ctx = SessionContext::new(); - - ctx.register_udf(random_normal_udf.clone()); - - assert!(ctx.udfs().contains("random_udf")); - - ctx.deregister_udf("random_udf"); - - assert!(!ctx.udfs().contains("random_udf")); - - Ok(()) -} - #[derive(Debug)] struct CastToI64UDF { signature: Signature, @@ -615,6 +529,22 @@ async fn test_user_defined_functions_cast_to_i64() -> Result<()> { Ok(()) } +#[tokio::test] +async fn deregister_udf() -> Result<()> { + let cast2i64 = ScalarUDF::from(CastToI64UDF::new()); + let ctx = SessionContext::new(); + + ctx.register_udf(cast2i64.clone()); + + assert!(ctx.udfs().contains("cast_to_i64")); + + ctx.deregister_udf("cast_to_i64"); + + assert!(!ctx.udfs().contains("cast_to_i64")); + + Ok(()) +} + #[derive(Debug)] struct TakeUDF { signature: Signature, diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 07516c1f6f53..eb4f325ff818 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -52,6 +52,7 @@ pub fn data_types( ); } } + let valid_types = get_valid_types(&signature.type_signature, current_types)?; if valid_types diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 4557fe60a447..c9c11a6bbfea 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -23,7 +23,7 @@ use crate::{ ScalarFunctionImplementation, Signature, }; use arrow::datatypes::DataType; -use datafusion_common::{ExprSchema, Result}; +use datafusion_common::{not_impl_err, ExprSchema, Result}; use std::any::Any; use std::fmt; use std::fmt::Debug; @@ -180,6 +180,13 @@ impl ScalarUDF { self.inner.invoke(args) } + /// Invoke the function without `args` but number of rows, returning the appropriate result. + /// + /// See [`ScalarUDFImpl::invoke_no_args`] for more details. + pub fn invoke_no_args(&self, number_rows: usize) -> Result { + self.inner.invoke_no_args(number_rows) + } + /// Returns a `ScalarFunctionImplementation` that can invoke the function /// during execution pub fn fun(&self) -> ScalarFunctionImplementation { @@ -322,10 +329,9 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// The function will be invoked passed with the slice of [`ColumnarValue`] /// (either scalar or array). /// - /// # Zero Argument Functions - /// If the function has zero parameters (e.g. `now()`) it will be passed a - /// single element slice which is a a null array to indicate the batch's row - /// count (so the function can know the resulting array size). + /// If the function does not take any arguments, please use [invoke_no_args] + /// instead and return [not_impl_err] for this function. + /// /// /// # Performance /// @@ -335,7 +341,18 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// /// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments /// to arrays, which will likely be simpler code, but be slower. - fn invoke(&self, args: &[ColumnarValue]) -> Result; + /// + /// [invoke_no_args]: ScalarUDFImpl::invoke_no_args + fn invoke(&self, _args: &[ColumnarValue]) -> Result; + + /// Invoke the function without `args`, instead the number of rows are provided, + /// returning the appropriate result. + fn invoke_no_args(&self, _number_rows: usize) -> Result { + not_impl_err!( + "Function {} does not implement invoke_no_args but called", + self.name() + ) + } /// Returns any aliases (alternate names) for this function. /// diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs index 0439a736ee42..770276938f6b 100644 --- a/datafusion/functions-array/src/make_array.rs +++ b/datafusion/functions-array/src/make_array.rs @@ -104,6 +104,10 @@ impl ScalarUDFImpl for MakeArray { make_scalar_function(make_array_inner)(args) } + fn invoke_no_args(&self, _number_rows: usize) -> Result { + make_scalar_function(make_array_inner)(&[]) + } + fn aliases(&self) -> &[String] { &self.aliases } diff --git a/datafusion/functions/src/math/pi.rs b/datafusion/functions/src/math/pi.rs index 0801e797511b..f9403e411fe2 100644 --- a/datafusion/functions/src/math/pi.rs +++ b/datafusion/functions/src/math/pi.rs @@ -16,13 +16,11 @@ // under the License. use std::any::Any; -use std::sync::Arc; -use arrow::array::Float64Array; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{not_impl_err, Result, ScalarValue}; use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -62,12 +60,14 @@ impl ScalarUDFImpl for PiFunc { Ok(Float64) } - fn invoke(&self, args: &[ColumnarValue]) -> Result { - if !matches!(&args[0], ColumnarValue::Array(_)) { - return exec_err!("Expect pi function to take no param"); - } - let array = Float64Array::from_value(std::f64::consts::PI, 1); - Ok(ColumnarValue::Array(Arc::new(array))) + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) + } + + fn invoke_no_args(&self, _number_rows: usize) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some( + std::f64::consts::PI, + )))) } fn monotonicity(&self) -> Result> { diff --git a/datafusion/functions/src/math/random.rs b/datafusion/functions/src/math/random.rs index 2c1ad4136702..b5eece212a3b 100644 --- a/datafusion/functions/src/math/random.rs +++ b/datafusion/functions/src/math/random.rs @@ -16,7 +16,6 @@ // under the License. use std::any::Any; -use std::iter; use std::sync::Arc; use arrow::array::Float64Array; @@ -24,7 +23,7 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; use rand::{thread_rng, Rng}; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{not_impl_err, Result}; use datafusion_expr::ColumnarValue; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; @@ -64,45 +63,14 @@ impl ScalarUDFImpl for RandomFunc { Ok(Float64) } - fn invoke(&self, args: &[ColumnarValue]) -> Result { - random(args) + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) } -} - -/// Random SQL function -fn random(args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - ColumnarValue::Array(array) => array.len(), - _ => return exec_err!("Expect random function to take no param"), - }; - let mut rng = thread_rng(); - let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); - let array = Float64Array::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use arrow::array::NullArray; - - use datafusion_common::cast::as_float64_array; - use datafusion_expr::ColumnarValue; - - use crate::math::random::random; - - #[test] - fn test_random_expression() { - let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))]; - let array = random(&args) - .expect("failed to initialize function random") - .into_array(1) - .expect("Failed to convert to array"); - let floats = - as_float64_array(&array).expect("failed to initialize function random"); - assert_eq!(floats.len(), 1); - assert!(0.0 <= floats.value(0) && floats.value(0) < 1.0); + fn invoke_no_args(&self, num_rows: usize) -> Result { + let mut rng = thread_rng(); + let values = std::iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(num_rows); + let array = Float64Array::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) } } diff --git a/datafusion/functions/src/string/uuid.rs b/datafusion/functions/src/string/uuid.rs index c68871d42e9f..9c97b4dd7413 100644 --- a/datafusion/functions/src/string/uuid.rs +++ b/datafusion/functions/src/string/uuid.rs @@ -16,7 +16,6 @@ // under the License. use std::any::Any; -use std::iter; use std::sync::Arc; use arrow::array::GenericStringArray; @@ -24,7 +23,7 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Utf8; use uuid::Uuid; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{not_impl_err, Result}; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -58,15 +57,14 @@ impl ScalarUDFImpl for UuidFunc { Ok(Utf8) } + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) + } + /// Prints random (v4) uuid values per row /// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' - fn invoke(&self, args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - ColumnarValue::Array(array) => array.len(), - _ => return exec_err!("Expect uuid function to take no param"), - }; - - let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len); + fn invoke_no_args(&self, num_rows: usize) -> Result { + let values = std::iter::repeat_with(|| Uuid::new_v4().to_string()).take(num_rows); let array = GenericStringArray::::from_iter_values(values); Ok(ColumnarValue::Array(Arc::new(array))) } diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index ac5b87e701af..06c4bd1c9531 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -74,7 +74,6 @@ pub fn create_physical_expr( input_phy_exprs.to_vec(), return_type, fun.monotonicity()?, - fun.signature().type_signature.supports_zero_argument(), ))) } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 9ae9f3dee3e7..3b360fc20c39 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -58,8 +58,6 @@ pub struct ScalarFunctionExpr { // and it specifies the effect of an increase or decrease in // the corresponding `arg` to the function value. monotonicity: Option, - // Whether this function can be invoked with zero arguments - supports_zero_argument: bool, } impl Debug for ScalarFunctionExpr { @@ -70,7 +68,6 @@ impl Debug for ScalarFunctionExpr { .field("args", &self.args) .field("return_type", &self.return_type) .field("monotonicity", &self.monotonicity) - .field("supports_zero_argument", &self.supports_zero_argument) .finish() } } @@ -83,7 +80,6 @@ impl ScalarFunctionExpr { args: Vec>, return_type: DataType, monotonicity: Option, - supports_zero_argument: bool, ) -> Self { Self { fun, @@ -91,7 +87,6 @@ impl ScalarFunctionExpr { args, return_type, monotonicity, - supports_zero_argument, } } @@ -142,25 +137,21 @@ impl PhysicalExpr for ScalarFunctionExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - // evaluate the arguments, if there are no arguments we'll instead pass in a null array - // indicating the batch size (as a convention) - let inputs = match self.args.is_empty() { - // If the function supports zero argument, we pass in a null array indicating the batch size. - // This is for user-defined functions. - // MakeArray support zero argument but has the different behavior from the array with one null. - true if self.supports_zero_argument && self.name != "make_array" => { - vec![ColumnarValue::create_null_array(batch.num_rows())] - } - _ => self - .args - .iter() - .map(|e| e.evaluate(batch)) - .collect::>>()?, - }; + let inputs = self + .args + .iter() + .map(|e| e.evaluate(batch)) + .collect::>>()?; // evaluate the function match self.fun { - ScalarFunctionDefinition::UDF(ref fun) => fun.invoke(&inputs), + ScalarFunctionDefinition::UDF(ref fun) => { + if self.args.is_empty() { + fun.invoke_no_args(batch.num_rows()) + } else { + fun.invoke(&inputs) + } + } ScalarFunctionDefinition::Name(_) => { internal_err!( "Name function must be resolved to one of the other variants prior to physical planning" @@ -183,7 +174,6 @@ impl PhysicalExpr for ScalarFunctionExpr { children, self.return_type().clone(), self.monotonicity.clone(), - self.supports_zero_argument, ))) } diff --git a/datafusion/physical-expr/src/udf.rs b/datafusion/physical-expr/src/udf.rs index 368dfdf92f45..aad78b7c2f90 100644 --- a/datafusion/physical-expr/src/udf.rs +++ b/datafusion/physical-expr/src/udf.rs @@ -57,7 +57,6 @@ pub fn create_physical_expr( input_phy_exprs.to_vec(), return_type, fun.monotonicity()?, - fun.signature().type_signature.supports_zero_argument(), ))) } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 1d3edb7b6075..e9728d8542fe 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -342,7 +342,6 @@ pub fn parse_physical_expr( Some(buf) => codec.try_decode_udf(&e.name, buf)?, None => registry.udf(e.name.as_str())?, }; - let signature = udf.signature(); let scalar_fun_def = ScalarFunctionDefinition::UDF(udf.clone()); let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?; @@ -353,7 +352,6 @@ pub fn parse_physical_expr( args, convert_required!(e.return_type)?, None, - signature.type_signature.supports_zero_argument(), )) } ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 642860d6397b..5e446f93fea7 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -626,7 +626,6 @@ fn roundtrip_scalar_udf() -> Result<()> { vec![col("a", &schema)?], DataType::Int64, None, - false, ); let project = @@ -755,7 +754,6 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { vec![col("text", &schema)?], DataType::Int64, None, - false, )); let filter = Arc::new(FilterExec::try_new(