From f5a97d58d484e93c2f79c5b624b10bd9e75c45f0 Mon Sep 17 00:00:00 2001 From: Eugene Marushchenko Date: Sun, 21 Jan 2024 02:35:00 +1000 Subject: [PATCH] Add hash_join_single_partition_threshold_rows config (#8720) Co-authored-by: Andrew Lamb --- datafusion/common/src/config.rs | 4 + .../src/physical_optimizer/join_selection.rs | 284 ++++++++++-------- .../test_files/information_schema.slt | 2 + docs/source/user-guide/configs.md | 1 + 4 files changed, 165 insertions(+), 126 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e00c17930850..eb516f97a48f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -561,6 +561,10 @@ config_namespace! { /// will be collected into a single partition pub hash_join_single_partition_threshold: usize, default = 1024 * 1024 + /// The maximum estimated size in rows for one input side of a HashJoin + /// will be collected into a single partition + pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128 + /// The default filter selectivity used by Filter Statistics /// when an exact selectivity cannot be determined. Valid values are /// between 0 (no selectivity) and 100 (all rows are selected). diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index ba66dca55b35..f9b9fdf85cfa 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -87,9 +87,10 @@ fn should_swap_join_order( } } -fn supports_collect_by_size( +fn supports_collect_by_thresholds( plan: &dyn ExecutionPlan, - collection_size_threshold: usize, + threshold_byte_size: usize, + threshold_num_rows: usize, ) -> bool { // Currently we do not trust the 0 value from stats, due to stats collection might have bug // TODO check the logic in datasource::get_statistics_with_limit() @@ -97,10 +98,10 @@ fn supports_collect_by_size( return false; }; - if let Some(size) = stats.total_byte_size.get_value() { - *size != 0 && *size < collection_size_threshold - } else if let Some(row_count) = stats.num_rows.get_value() { - *row_count != 0 && *row_count < collection_size_threshold + if let Some(byte_size) = stats.total_byte_size.get_value() { + *byte_size != 0 && *byte_size < threshold_byte_size + } else if let Some(num_rows) = stats.num_rows.get_value() { + *num_rows != 0 && *num_rows < threshold_num_rows } else { false } @@ -251,9 +252,14 @@ impl PhysicalOptimizerRule for JoinSelection { // - We will also swap left and right sides for cross joins so that the left // side is the small side. let config = &config.optimizer; - let collect_left_threshold = config.hash_join_single_partition_threshold; + let collect_threshold_byte_size = config.hash_join_single_partition_threshold; + let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; state.plan.transform_up(&|plan| { - statistical_join_selection_subrule(plan, collect_left_threshold) + statistical_join_selection_subrule( + plan, + collect_threshold_byte_size, + collect_threshold_num_rows, + ) }) } @@ -270,8 +276,8 @@ impl PhysicalOptimizerRule for JoinSelection { /// /// This function will first consider the given join type and check whether the /// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides. -/// When the `collect_threshold` is provided, this function will also check left -/// and right sizes. +/// When the `ignore_threshold` is false, this function will also check left +/// and right sizes in bytes or rows. /// /// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`. /// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft` @@ -279,7 +285,9 @@ impl PhysicalOptimizerRule for JoinSelection { /// and [`JoinType::RightAnti`], respectively. fn try_collect_left( hash_join: &HashJoinExec, - collect_threshold: Option, + ignore_threshold: bool, + threshold_byte_size: usize, + threshold_num_rows: usize, ) -> Result>> { let left = hash_join.left(); let right = hash_join.right(); @@ -291,9 +299,14 @@ fn try_collect_left( | JoinType::LeftSemi | JoinType::Right | JoinType::RightSemi - | JoinType::RightAnti => collect_threshold.map_or(true, |threshold| { - supports_collect_by_size(&**left, threshold) - }), + | JoinType::RightAnti => { + ignore_threshold + || supports_collect_by_thresholds( + &**left, + threshold_byte_size, + threshold_num_rows, + ) + } }; let right_can_collect = match join_type { JoinType::Right | JoinType::Full | JoinType::RightAnti => false, @@ -301,9 +314,14 @@ fn try_collect_left( | JoinType::RightSemi | JoinType::Left | JoinType::LeftSemi - | JoinType::LeftAnti => collect_threshold.map_or(true, |threshold| { - supports_collect_by_size(&**right, threshold) - }), + | JoinType::LeftAnti => { + ignore_threshold + || supports_collect_by_thresholds( + &**right, + threshold_byte_size, + threshold_num_rows, + ) + } }; match (left_can_collect, right_can_collect) { (true, true) => { @@ -366,52 +384,56 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result, - collect_left_threshold: usize, + collect_threshold_byte_size: usize, + collect_threshold_num_rows: usize, ) -> Result>> { - let transformed = if let Some(hash_join) = - plan.as_any().downcast_ref::() - { - match hash_join.partition_mode() { - PartitionMode::Auto => { - try_collect_left(hash_join, Some(collect_left_threshold))?.map_or_else( - || partitioned_hash_join(hash_join).map(Some), - |v| Ok(Some(v)), + let transformed = + if let Some(hash_join) = plan.as_any().downcast_ref::() { + match hash_join.partition_mode() { + PartitionMode::Auto => try_collect_left( + hash_join, + false, + collect_threshold_byte_size, + collect_threshold_num_rows, )? - } - PartitionMode::CollectLeft => try_collect_left(hash_join, None)? .map_or_else( || partitioned_hash_join(hash_join).map(Some), |v| Ok(Some(v)), )?, - PartitionMode::Partitioned => { - let left = hash_join.left(); - let right = hash_join.right(); - if should_swap_join_order(&**left, &**right)? - && supports_swap(*hash_join.join_type()) - { - swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)? - } else { - None + PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)? + .map_or_else( + || partitioned_hash_join(hash_join).map(Some), + |v| Ok(Some(v)), + )?, + PartitionMode::Partitioned => { + let left = hash_join.left(); + let right = hash_join.right(); + if should_swap_join_order(&**left, &**right)? + && supports_swap(*hash_join.join_type()) + { + swap_hash_join(hash_join, PartitionMode::Partitioned).map(Some)? + } else { + None + } } } - } - } else if let Some(cross_join) = plan.as_any().downcast_ref::() { - let left = cross_join.left(); - let right = cross_join.right(); - if should_swap_join_order(&**left, &**right)? { - let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left)); - // TODO avoid adding ProjectionExec again and again, only adding Final Projection - let proj: Arc = Arc::new(ProjectionExec::try_new( - swap_reverting_projection(&left.schema(), &right.schema()), - Arc::new(new_join), - )?); - Some(proj) + } else if let Some(cross_join) = plan.as_any().downcast_ref::() { + let left = cross_join.left(); + let right = cross_join.right(); + if should_swap_join_order(&**left, &**right)? { + let new_join = CrossJoinExec::new(Arc::clone(right), Arc::clone(left)); + // TODO avoid adding ProjectionExec again and again, only adding Final Projection + let proj: Arc = Arc::new(ProjectionExec::try_new( + swap_reverting_projection(&left.schema(), &right.schema()), + Arc::new(new_join), + )?); + Some(proj) + } else { + None + } } else { None - } - } else { - None - }; + }; Ok(if let Some(transformed) = transformed { Transformed::Yes(transformed) @@ -682,22 +704,62 @@ mod tests_statistical { use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalExpr; + /// Return statistcs for empty table + fn empty_statistics() -> Statistics { + Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::new_unknown()], + } + } + + /// Get table thresholds: (num_rows, byte_size) + fn get_thresholds() -> (usize, usize) { + let optimizer_options = ConfigOptions::new().optimizer; + ( + optimizer_options.hash_join_single_partition_threshold_rows, + optimizer_options.hash_join_single_partition_threshold, + ) + } + + /// Return statistcs for small table + fn small_statistics() -> Statistics { + let (threshold_num_rows, threshold_byte_size) = get_thresholds(); + Statistics { + num_rows: Precision::Inexact(threshold_num_rows / 128), + total_byte_size: Precision::Inexact(threshold_byte_size / 128), + column_statistics: vec![ColumnStatistics::new_unknown()], + } + } + + /// Return statistcs for big table + fn big_statistics() -> Statistics { + let (threshold_num_rows, threshold_byte_size) = get_thresholds(); + Statistics { + num_rows: Precision::Inexact(threshold_num_rows * 2), + total_byte_size: Precision::Inexact(threshold_byte_size * 2), + column_statistics: vec![ColumnStatistics::new_unknown()], + } + } + + /// Return statistcs for big table + fn bigger_statistics() -> Statistics { + let (threshold_num_rows, threshold_byte_size) = get_thresholds(); + Statistics { + num_rows: Precision::Inexact(threshold_num_rows * 4), + total_byte_size: Precision::Inexact(threshold_byte_size * 4), + column_statistics: vec![ColumnStatistics::new_unknown()], + } + } + fn create_big_and_small() -> (Arc, Arc) { let big = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(10), - total_byte_size: Precision::Inexact(100000), - column_statistics: vec![ColumnStatistics::new_unknown()], - }, + big_statistics(), Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), )); let small = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(100000), - total_byte_size: Precision::Inexact(10), - column_statistics: vec![ColumnStatistics::new_unknown()], - }, + small_statistics(), Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), )); (big, small) @@ -821,11 +883,11 @@ mod tests_statistical { assert_eq!( swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(10) + Precision::Inexact(8192) ); assert_eq!( swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(100000) + Precision::Inexact(2097152) ); } @@ -872,11 +934,11 @@ mod tests_statistical { assert_eq!( swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(100000) + Precision::Inexact(2097152) ); assert_eq!( swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(10) + Precision::Inexact(8192) ); } @@ -917,11 +979,11 @@ mod tests_statistical { assert_eq!( swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(10) + Precision::Inexact(8192) ); assert_eq!( swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(100000) + Precision::Inexact(2097152) ); assert_eq!(original_schema, swapped_join.schema()); @@ -1032,11 +1094,11 @@ mod tests_statistical { assert_eq!( swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(10) + Precision::Inexact(8192) ); assert_eq!( swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(100000) + Precision::Inexact(2097152) ); } @@ -1078,29 +1140,17 @@ mod tests_statistical { #[tokio::test] async fn test_join_selection_collect_left() { let big = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(10000000), - total_byte_size: Precision::Inexact(10000000), - column_statistics: vec![ColumnStatistics::new_unknown()], - }, + big_statistics(), Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), )); let small = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(10), - total_byte_size: Precision::Inexact(10), - column_statistics: vec![ColumnStatistics::new_unknown()], - }, + small_statistics(), Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), )); let empty = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Absent, - total_byte_size: Precision::Absent, - column_statistics: vec![ColumnStatistics::new_unknown()], - }, + empty_statistics(), Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]), )); @@ -1121,7 +1171,7 @@ mod tests_statistical { Column::new_with_schema("small_col", &small.schema()).unwrap(), )]; check_join_partition_mode( - big, + big.clone(), small.clone(), join_on, true, @@ -1145,8 +1195,8 @@ mod tests_statistical { Column::new_with_schema("small_col", &small.schema()).unwrap(), )]; check_join_partition_mode( - empty, - small, + empty.clone(), + small.clone(), join_on, true, PartitionMode::CollectLeft, @@ -1155,52 +1205,40 @@ mod tests_statistical { #[tokio::test] async fn test_join_selection_partitioned() { - let big1 = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(10000000), - total_byte_size: Precision::Inexact(10000000), - column_statistics: vec![ColumnStatistics::new_unknown()], - }, - Schema::new(vec![Field::new("big_col1", DataType::Int32, false)]), + let bigger = Arc::new(StatisticsExec::new( + bigger_statistics(), + Schema::new(vec![Field::new("bigger_col", DataType::Int32, false)]), )); - let big2 = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(20000000), - total_byte_size: Precision::Inexact(20000000), - column_statistics: vec![ColumnStatistics::new_unknown()], - }, - Schema::new(vec![Field::new("big_col2", DataType::Int32, false)]), + let big = Arc::new(StatisticsExec::new( + big_statistics(), + Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), )); let empty = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Absent, - total_byte_size: Precision::Absent, - column_statistics: vec![ColumnStatistics::new_unknown()], - }, + empty_statistics(), Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]), )); let join_on = vec![( - Column::new_with_schema("big_col1", &big1.schema()).unwrap(), - Column::new_with_schema("big_col2", &big2.schema()).unwrap(), + Column::new_with_schema("big_col", &big.schema()).unwrap(), + Column::new_with_schema("bigger_col", &bigger.schema()).unwrap(), )]; check_join_partition_mode( - big1.clone(), - big2.clone(), + big.clone(), + bigger.clone(), join_on, false, PartitionMode::Partitioned, ); let join_on = vec![( - Column::new_with_schema("big_col2", &big2.schema()).unwrap(), - Column::new_with_schema("big_col1", &big1.schema()).unwrap(), + Column::new_with_schema("bigger_col", &bigger.schema()).unwrap(), + Column::new_with_schema("big_col", &big.schema()).unwrap(), )]; check_join_partition_mode( - big2, - big1.clone(), + bigger.clone(), + big.clone(), join_on, true, PartitionMode::Partitioned, @@ -1208,27 +1246,21 @@ mod tests_statistical { let join_on = vec![( Column::new_with_schema("empty_col", &empty.schema()).unwrap(), - Column::new_with_schema("big_col1", &big1.schema()).unwrap(), + Column::new_with_schema("big_col", &big.schema()).unwrap(), )]; check_join_partition_mode( empty.clone(), - big1.clone(), + big.clone(), join_on, false, PartitionMode::Partitioned, ); let join_on = vec![( - Column::new_with_schema("big_col1", &big1.schema()).unwrap(), + Column::new_with_schema("big_col", &big.schema()).unwrap(), Column::new_with_schema("empty_col", &empty.schema()).unwrap(), )]; - check_join_partition_mode( - big1, - empty, - join_on, - false, - PartitionMode::Partitioned, - ); + check_join_partition_mode(big, empty, join_on, false, PartitionMode::Partitioned); } fn check_join_partition_mode( diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b37b78ab6d79..768292d3d4b4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -196,6 +196,7 @@ datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 +datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_hash_join true @@ -272,6 +273,7 @@ datafusion.optimizer.enable_round_robin_repartition true When set to true, the p datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition +datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index a812b74284cf..7a7460799b1a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -101,6 +101,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |