Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename bounded_order_preserving_variants config to prefer_exising_sort and update docs #7723

Merged
merged 7 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,13 @@ config_namespace! {
/// ```
pub repartition_sorts: bool, default = true

/// When true, DataFusion will opportunistically remove sorts by replacing
/// `RepartitionExec` with `SortPreservingRepartitionExec`, and
/// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
/// replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually there is no SortPreservingRepartitionExec operator but it is a variant of RepartitionExec with preserve_order as true. It is a little confusion at first if trying to look for SortPreservingRepartitionExec type in IDE.

Maybe:

Suggested change
/// replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and
/// replacing `RepartitionExec` with `SortPreservingRepartitionExec` (i.e., `RepartitionExec` with `preserve_order` as true), and

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thank you @viirya -- I tried to improve the wording in 35c6748. Let me know what you think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks @alamb

/// `CoalescePartitionsExec` with `SortPreservingMergeExec`,
/// even when the query is bounded.
pub bounded_order_preserving_variants: bool, default = false
///
/// When false, DataFusion will prefer to maximize the parallelism using
/// `Repartition/Coalesce` and resort the data subsequently with `SortExec`
pub prefer_exising_sort: bool, default = false

/// When set to true, the logical plan optimizer will produce warning
/// messages if any optimization rules produce errors and then proceed to the next
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ fn ensure_distribution(
// - it is desired according to config
// - when plan is unbounded
let order_preserving_variants_desirable =
is_unbounded || config.optimizer.bounded_order_preserving_variants;
is_unbounded || config.optimizer.prefer_exising_sort;

if dist_context.plan.children().is_empty() {
return Ok(Transformed::No(dist_context));
Expand Down Expand Up @@ -2085,8 +2085,7 @@ mod tests {
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.bounded_order_preserving_variants =
bounded_order_preserving_variants;
config.optimizer.prefer_exising_sort = bounded_order_preserving_variants;
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
}

Expand Down Expand Up @@ -2124,7 +2123,7 @@ mod tests {
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.bounded_order_preserving_variants = $BOUNDED_ORDER_PRESERVING_VARIANTS;
config.optimizer.prefer_exising_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS;

// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
Expand Down Expand Up @@ -4516,7 +4515,7 @@ mod tests {
let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.bounded_order_preserving_variants = false;
config.optimizer.prefer_exising_sort = false;
let distribution_plan =
EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, distribution_plan);
Expand Down Expand Up @@ -4558,7 +4557,7 @@ mod tests {
let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.bounded_order_preserving_variants = false;
config.optimizer.prefer_exising_sort = false;
let distribution_plan =
EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, distribution_plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ pub(crate) fn replace_with_order_preserving_variants(
// any case, as doing so helps fix the pipeline.
// Also do the replacement if opted-in via config options.
let use_order_preserving_variant =
config.optimizer.bounded_order_preserving_variants || unbounded_output(plan);
config.optimizer.prefer_exising_sort || unbounded_output(plan);
let updated_sort_input = get_updated_plan(
exec_tree,
is_spr_better || use_order_preserving_variant,
Expand Down Expand Up @@ -336,7 +336,7 @@ mod tests {

// Run the rule top-down
// let optimized_physical_plan = physical_plan.transform_down(&replace_repartition_execs)?;
let config = SessionConfig::new().with_bounded_order_preserving_variants($ALLOW_BOUNDED);
let config = SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED);
let plan_with_pipeline_fixer = OrderPreservationContext::new(physical_plan);
let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options()))?;
let optimized_physical_plan = parallel.plan;
Expand Down
20 changes: 12 additions & 8 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,12 @@ impl SessionConfig {
self.options.optimizer.repartition_sorts
}

/// Remove sorts by replacing with order-preserving variants of operators,
/// even when query is bounded?
pub fn bounded_order_preserving_variants(&self) -> bool {
self.options.optimizer.bounded_order_preserving_variants
/// Prefer existing sort (true) or maximize parallelism (false). See
/// [prefer_existing_sort] for more details
///
/// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
pub fn prefer_existing_sort(&self) -> bool {
self.options.optimizer.prefer_exising_sort
}

/// Are statistics collected during execution?
Expand Down Expand Up @@ -221,10 +223,12 @@ impl SessionConfig {
self
}

/// Enables or disables the use of order-preserving variants of `CoalescePartitions`
/// and `RepartitionExec` operators, even when the query is bounded
pub fn with_bounded_order_preserving_variants(mut self, enabled: bool) -> Self {
self.options.optimizer.bounded_order_preserving_variants = enabled;
/// Prefer existing sort (true) or maximize parallelism (false). See
/// [prefer_existing_sort] for more details
///
/// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
self.options.optimizer.prefer_exising_sort = enabled;
self
}

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. |
| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level |
| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` |
| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. |
| datafusion.optimizer.prefer_exising_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, When false, DataFusion will prefer to maximize the parallelism using `Repartition/Coalesce` and resort the data subsequently with `SortExec` |
| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail |
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
Expand Down
Loading