Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: panic and incorrect results in LogFunc::output_ordering() #11571

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 119 additions & 4 deletions datafusion/functions/src/math/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ impl ScalarUDFImpl for LogFunc {
}

fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
match (input[0].sort_properties, input[1].sort_properties) {
(first @ SortProperties::Ordered(value), SortProperties::Ordered(base))
if !value.descending && base.descending
|| value.descending && !base.descending =>
let (base_sort_properties, num_sort_properties) = if input.len() == 1 {
// log(x) defaults to log(10, x)
(SortProperties::Singleton, input[0].sort_properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense to me 👍🏼

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

} else {
(input[0].sort_properties, input[1].sort_properties)
};
match (num_sort_properties, base_sort_properties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the following logic is not completely correct 🤔
e.g.
base is DESC NULLS FIRST
num is ASC NULLS LAST
This implementation will return log(base, num) is ASC NULLS LAST
But the actual output might look like

NULL
NULL
1
2
NULL

Which is unordered.

Copy link
Member Author

@jonahgao jonahgao Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. I noticed that other logic also does not consider nulls_first, for example

pub fn add(&self, rhs: &Self) -> Self {
match (self, rhs) {
(Self::Singleton, _) => *rhs,
(_, Self::Singleton) => *self,
(Self::Ordered(lhs), Self::Ordered(rhs))
if lhs.descending == rhs.descending =>
{
Self::Ordered(SortOptions {
descending: lhs.descending,
nulls_first: lhs.nulls_first || rhs.nulls_first,
})
}
_ => Self::Unordered,
}
}

Maybe @berkaysynnada can help confirm it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. We also need to check the placement of nulls in output_ordering() (and possibly in other implementations of it for multi-input functions). As @jonahgao noticed, impl SortProperties {} needs similar handling as well, but that can be addressed in a separate PR. I can create a ticket for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the nulls_first problem for log by 3feeaa8. We can create separate PRs for other fixes.

cc @2010YOUY01 @berkaysynnada

(first @ SortProperties::Ordered(num), SortProperties::Ordered(base))
if num.descending != base.descending
&& num.nulls_first == base.nulls_first =>
{
Ok(first)
}
Expand Down Expand Up @@ -230,6 +236,7 @@ mod tests {

use super::*;

use arrow::compute::SortOptions;
use datafusion_common::cast::{as_float32_array, as_float64_array};
use datafusion_common::DFSchema;
use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -334,4 +341,112 @@ mod tests {
assert_eq!(args[0], lit(2));
assert_eq!(args[1], lit(3));
}

#[test]
fn test_log_output_ordering() {
// [Unordered, Ascending, Descending, Literal]
let orders = vec![
ExprProperties::new_unknown(),
ExprProperties::new_unknown().with_order(SortProperties::Ordered(
SortOptions {
descending: false,
nulls_first: true,
},
)),
ExprProperties::new_unknown().with_order(SortProperties::Ordered(
SortOptions {
descending: true,
nulls_first: true,
},
)),
ExprProperties::new_unknown().with_order(SortProperties::Singleton),
];

let log = LogFunc::new();

// Test log(num)
for order in orders.iter().cloned() {
let result = log.output_ordering(&[order.clone()]).unwrap();
assert_eq!(result, order.sort_properties);
}

// Test log(base, num), where `nulls_first` is the same
let mut results = Vec::with_capacity(orders.len() * orders.len());
for base_order in orders.iter() {
for num_order in orders.iter().cloned() {
let result = log
.output_ordering(&[base_order.clone(), num_order])
.unwrap();
results.push(result);
}
}
let expected = vec![
// base: Unordered
SortProperties::Unordered,
SortProperties::Unordered,
SortProperties::Unordered,
SortProperties::Unordered,
// base: Ascending, num: Unordered
SortProperties::Unordered,
// base: Ascending, num: Ascending
SortProperties::Unordered,
// base: Ascending, num: Descending
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
// base: Ascending, num: Literal
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
// base: Descending, num: Unordered
SortProperties::Unordered,
// base: Descending, num: Ascending
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: true,
}),
// base: Descending, num: Descending
SortProperties::Unordered,
// base: Descending, num: Literal
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: true,
}),
// base: Literal, num: Unordered
SortProperties::Unordered,
// base: Literal, num: Ascending
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: true,
}),
// base: Literal, num: Descending
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
// base: Literal, num: Literal
SortProperties::Singleton,
];
assert_eq!(results, expected);

// Test with different `nulls_first`
let base_order = ExprProperties::new_unknown().with_order(
SortProperties::Ordered(SortOptions {
descending: true,
nulls_first: true,
}),
);
let num_order = ExprProperties::new_unknown().with_order(
SortProperties::Ordered(SortOptions {
descending: false,
nulls_first: false,
}),
);
assert_eq!(
log.output_ordering(&[base_order, num_order]).unwrap(),
SortProperties::Unordered
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding these tests. output_ordering() API of scalar functions are experimental and needs unit tests (#10595). This is a good example for the rest of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output_ordering() API of scalar functions are experimental

What do you mean by "experimental"? If that is the case perhaps we can update the documentation to explain what that means -- I don't see any mention of it

https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.ScalarUDFImpl.html#method.output_ordering

}
31 changes: 19 additions & 12 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ select column1 + column2 from foo group by column1, column2 ORDER BY column2 des
7
3

# Test issue: https://github.com/apache/datafusion/issues/11549
query I
select column1 from foo order by log(column2);
----
1
3
5

# Cleanup
statement ok
Expand Down Expand Up @@ -512,7 +519,7 @@ CREATE EXTERNAL TABLE aggregate_test_100 (
)
STORED AS CSV
WITH ORDER(c11)
WITH ORDER(c12 DESC)
WITH ORDER(c12 DESC NULLS LAST)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make c11 and c12 have the same nulls_first.

LOCATION '../../testing/data/csv/aggregate_test_100.csv'
OPTIONS ('format.has_header' 'true');

Expand Down Expand Up @@ -547,34 +554,34 @@ physical_plan
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT LOG(c11, c12) as log_c11_base_c12
EXPLAIN SELECT LOG(c12, c11) as log_c11_base_c12
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base should be placed in the first parameter.

FROM aggregate_test_100
ORDER BY log_c11_base_c12;
----
logical_plan
01)Sort: log_c11_base_c12 ASC NULLS LAST
02)--Projection: log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) AS log_c11_base_c12
02)--Projection: log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) AS log_c11_base_c12
03)----TableScan: aggregate_test_100 projection=[c11, c12]
physical_plan
01)SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c11_base_c12]
02)--ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c11_base_c12]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC]], has_header=true
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], has_header=true

query TT
EXPLAIN SELECT LOG(c12, c11) as log_c12_base_c11
EXPLAIN SELECT LOG(c11, c12) as log_c12_base_c11
FROM aggregate_test_100
ORDER BY log_c12_base_c11 DESC;
ORDER BY log_c12_base_c11 DESC NULLS LAST;
----
logical_plan
01)Sort: log_c12_base_c11 DESC NULLS FIRST
02)--Projection: log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) AS log_c12_base_c11
01)Sort: log_c12_base_c11 DESC NULLS LAST
02)--Projection: log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) AS log_c12_base_c11
03)----TableScan: aggregate_test_100 projection=[c11, c12]
physical_plan
01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC]
02)--ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c12_base_c11]
01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC NULLS LAST]
02)--ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c12_base_c11]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC]], has_header=true
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], has_header=true

statement ok
drop table aggregate_test_100;
Expand Down