Skip to content

Commit

Permalink
fix: do not pushdown when skip is applied
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Sep 9, 2024
1 parent b6fd751 commit 2fcad95
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ impl From<LimitExec> for Arc<dyn ExecutionPlan> {
/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
/// an instance of `GlobalRequirements` and modifies these parameters while
/// checking if the limits can be pushed down or not.
///
/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
/// return a [`TreeNodeRecursion::Continue`].
pub fn pushdown_limit_helper(
mut pushdown_plan: Arc<dyn ExecutionPlan>,
mut global_state: GlobalRequirements,
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
// Extract limit, if exist, and return child inputs.
if let Some(limit_exec) = extract_limit(&pushdown_plan) {
// If we have fetch/skip info in the global state already, we need to
// decide which one to continue with:
Expand Down Expand Up @@ -190,7 +194,8 @@ pub fn pushdown_limit_helper(

let skip_and_fetch = Some(global_fetch + global_state.skip);

if pushdown_plan.supports_limit_pushdown() {
let global_skip = global_state.skip;
if global_skip == 0 && pushdown_plan.supports_limit_pushdown() {
if !combines_input_partitions(&pushdown_plan) {
// We have information in the global state and the plan pushes down,
// continue:
Expand Down Expand Up @@ -223,7 +228,6 @@ pub fn pushdown_limit_helper(
// to add the fetch info and return the plan.

// There's no push down, change fetch & skip to default values:
let global_skip = global_state.skip;
global_state.fetch = None;
global_state.skip = 0;

Expand Down Expand Up @@ -256,21 +260,23 @@ pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
global_state: GlobalRequirements,
) -> Result<Arc<dyn ExecutionPlan>> {
// Apply limit push down
let (mut new_node, mut global_state) =
pushdown_limit_helper(pushdown_plan, global_state)?;

// While limits exist, continue combining the global_state.
while new_node.tnr == TreeNodeRecursion::Stop {
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
}

// Pushdown limits in children
let children = new_node.data.children();
let new_children = children
.into_iter()
.map(|child| {
pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
})
.collect::<Result<_>>()?;

new_node.data.with_new_children(new_children)
}

Expand Down

0 comments on commit 2fcad95

Please sign in to comment.