Skip to content

Commit

Permalink
VTX-4075: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fsdvh committed Jan 24, 2024
1 parent e3944be commit fe7e05d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3570,7 +3570,7 @@ mod tests {
// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"UnionExec: sort_expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
Expand All @@ -3580,7 +3580,7 @@ mod tests {
let expected = &[
"SortExec: expr=[c@2 ASC]",
"CoalescePartitionsExec",
"UnionExec",
"UnionExec: sort_expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
Expand Down Expand Up @@ -4172,13 +4172,13 @@ mod tests {
// should not sort (as the data was already sorted)
let expected_parquet = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"UnionExec: sort_expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"UnionExec: sort_expr=[c@2 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
Expand Down
42 changes: 21 additions & 21 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ mod tests {

let expected_input = ["SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
Expand All @@ -1194,7 +1194,7 @@ mod tests {
let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
Expand Down Expand Up @@ -1270,7 +1270,7 @@ mod tests {
// one input to the union is already sorted, one is not.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
Expand Down Expand Up @@ -1301,7 +1301,7 @@ mod tests {
// one input to the union is already sorted, one is not.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
Expand Down Expand Up @@ -1333,13 +1333,13 @@ mod tests {
// First ParquetExec has output ordering(nullable_col@0 ASC). However, it doesn't satisfy the
// required ordering of SortPreservingMergeExec.
let expected_input = ["SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]"];

let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
Expand Down Expand Up @@ -1371,15 +1371,15 @@ mod tests {
// Second input to the union is already Sorted (matches with the required ordering by the SortPreservingMergeExec above).
// Third input to the union is not Sorted (SortExec is matches required ordering by the SortPreservingMergeExec above).
let expected_input = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]"];
// should adjust sorting in the first input of the union such that it is not unnecessarily fine
let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
Expand Down Expand Up @@ -1412,14 +1412,14 @@ mod tests {
// `UnionExec` satisfy the ordering, OR add a single sort after
// the `UnionExec` (both of which are equally good for this example).
let expected_input = ["SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]"];
let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
Expand Down Expand Up @@ -1461,13 +1461,13 @@ mod tests {
// example below. However, we should be able to change the unnecessarily
// fine `SortExec`s below with required `SortExec`s that are absolutely necessary.
let expected_input = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]"];
let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC]",
Expand Down Expand Up @@ -1513,7 +1513,7 @@ mod tests {
// Should adjust the requirement in the third input of the union so
// that it is not unnecessarily fine.
let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
Expand Down Expand Up @@ -1542,14 +1542,14 @@ mod tests {

// Union has unnecessarily fine ordering below it. We should be able to replace them with absolutely necessary ordering.
let expected_input = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]"];
// Union preserves the inputs ordering and we should not change any of the SortExecs under UnionExec
let expected_output = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC]",
Expand Down Expand Up @@ -1640,15 +1640,15 @@ mod tests {
let expected_input = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 DESC NULLS LAST]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
let expected_optimized = [
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Expand Down Expand Up @@ -1677,14 +1677,14 @@ mod tests {
// The unnecessary SortExecs should be removed
let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]"];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Expand Down Expand Up @@ -1723,15 +1723,15 @@ mod tests {

// Should not change the unnecessarily fine `SortExec`s because there is `LimitExec`
let expected_input = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]"];
let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" UnionExec: sort_expr=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" GlobalLimitExec: skip=0, fetch=100",
Expand Down

0 comments on commit fe7e05d

Please sign in to comment.