Skip to content

Commit

Permalink
fix: substrait limit when fetch is None (apache#7669)
Browse files Browse the repository at this point in the history
* fix: substrait limit when fetch is None

Signed-off-by: Ruihang Xia <[email protected]>

* Add comments

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and Ted-Jiang committed Oct 7, 2023
1 parent ccf9ed5 commit 7ffd198
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 4 deletions.
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl LimitStream {

match &poll {
Poll::Ready(Some(Ok(batch))) => {
if batch.num_rows() > 0 && self.skip == 0 {
if batch.num_rows() > 0 {
break poll;
} else {
// continue to poll input stream
Expand Down
9 changes: 7 additions & 2 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,13 @@ pub async fn from_substrait_rel(
from_substrait_rel(ctx, input, extensions).await?,
);
let offset = fetch.offset as usize;
let count = fetch.count as usize;
input.limit(offset, Some(count))?.build()
// Since protobuf can't directly distinguish `None` vs `0` `None` is encoded as `MAX`
let count = if fetch.count as usize == usize::MAX {
None
} else {
Some(fetch.count as usize)
};
input.limit(offset, count)?.build()
} else {
not_impl_err!("Fetch without an input is not valid")
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ pub fn to_substrait_rel(
}
LogicalPlan::Limit(limit) => {
let input = to_substrait_rel(limit.input.as_ref(), ctx, extension_info)?;
let limit_fetch = limit.fetch.unwrap_or(0);
// Since protobuf can't directly distinguish `None` vs `0` encode `None` as `MAX`
let limit_fetch = limit.fetch.unwrap_or(usize::MAX);
Ok(Box::new(Rel {
rel_type: Some(RelType::Fetch(Box::new(FetchRel {
common: None,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ async fn select_with_limit() -> Result<()> {
roundtrip_fill_na("SELECT * FROM data LIMIT 100").await
}

#[tokio::test]
async fn select_without_limit() -> Result<()> {
roundtrip_fill_na("SELECT * FROM data OFFSET 10").await
}

#[tokio::test]
async fn select_with_limit_offset() -> Result<()> {
roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await
Expand Down

0 comments on commit 7ffd198

Please sign in to comment.