Skip to content

Commit

Permalink
fix: single partition in SortPreservingMergeExec with fetch (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
haohuaijin committed Aug 23, 2024
1 parent 7be9897 commit 8fd9d69
Showing 1 changed file with 91 additions and 6 deletions.
97 changes: 91 additions & 6 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;

use crate::common::spawn_buffered;
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::sorts::streaming_merge;
use crate::{
Expand Down Expand Up @@ -238,12 +239,23 @@ impl ExecutionPlan for SortPreservingMergeExec {
0 => internal_err!(
"SortPreservingMergeExec requires at least one input partition"
),
1 => {
// bypass if there is only one partition to merge (no metrics in this case either)
let result = self.input.execute(0, context);
debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input");
result
}
1 => match self.fetch {
Some(fetch) => {
let stream = self.input.execute(0, context)?;
debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}");
Ok(Box::pin(LimitStream::new(
stream,
0,
Some(fetch),
BaselineMetrics::new(&self.metrics, partition),
)))
}
None => {
let stream = self.input.execute(0, context);
debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input without fetch");
stream
}
},
_ => {
let receivers = (0..input_partitions)
.map(|partition| {
Expand Down Expand Up @@ -817,6 +829,79 @@ mod tests {
);
}

#[tokio::test]
async fn test_sort_merge_single_partition_with_fetch() {
let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
let schema = batch.schema();

let sort = vec![PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
}];
let exec = MemoryExec::try_new(&[vec![batch]], schema, None).unwrap();
let merge = Arc::new(
SortPreservingMergeExec::new(sort, Arc::new(exec)).with_fetch(Some(2)),
);

let collected = collect(merge, task_ctx).await.unwrap();
assert_eq!(collected.len(), 1);

assert_batches_eq!(
&[
"+---+---+",
"| a | b |",
"+---+---+",
"| 1 | a |",
"| 2 | b |",
"+---+---+",
],
collected.as_slice()
);
}

#[tokio::test]
async fn test_sort_merge_single_partition_without_fetch() {
let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
let schema = batch.schema();

let sort = vec![PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
}];
let exec = MemoryExec::try_new(&[vec![batch]], schema, None).unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));

let collected = collect(merge, task_ctx).await.unwrap();
assert_eq!(collected.len(), 1);

assert_batches_eq!(
&[
"+---+---+",
"| a | b |",
"+---+---+",
"| 1 | a |",
"| 2 | b |",
"| 7 | c |",
"| 9 | d |",
"| 3 | e |",
"+---+---+",
],
collected.as_slice()
);
}

#[tokio::test]
async fn test_async() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
Expand Down

0 comments on commit 8fd9d69

Please sign in to comment.