Skip to content

Commit

Permalink
fix: max_wait_time in http handler not work when result set is large โ€ฆ (
Browse files Browse the repository at this point in the history
#16267)

* fix: max_wait_time in http handler not work when result set is large and return fast.

* fix
  • Loading branch information
youngsofun committed Aug 18, 2024
1 parent a8da519 commit 9197028
Showing 1 changed file with 34 additions and 38 deletions.
72 changes: 34 additions & 38 deletions src/query/service/src/servers/http/v1/query/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,75 +124,71 @@ impl PageManager {

fn append_block(
&mut self,
rows: &mut Vec<Vec<Option<String>>>,
res: &mut Vec<Vec<Option<String>>>,
block: DataBlock,
remain_rows: usize,
remain_rows: &mut usize,
remain_size: &mut usize,
) -> Result<()> {
let format_settings = {
let guard = self.format_settings.read();
guard.as_ref().unwrap().clone()
};
let mut iter = block_to_strings(&block, &format_settings)?
.into_iter()
.peekable();
let chunk: Vec<_> = iter
.by_ref()
.take(remain_rows)
.take_while(|r| {
let size = row_size(r);
let ok = *remain_size > size;
if ok {
*remain_size -= size;
}
ok
})
.collect();
rows.extend(chunk);
self.row_buffer = iter.by_ref().collect();
let rows = block_to_strings(&block, &format_settings)?;
let mut i = 0;
while *remain_rows > 0 && *remain_size > 0 && i < rows.len() {
let size = row_size(&rows[i]);
if *remain_size > size {
*remain_size -= size;
*remain_rows -= 1;
i += 1;
} else {
*remain_size = 0;
}
}
res.extend_from_slice(&rows[..i]);
self.row_buffer = rows[i..].iter().cloned().collect();
Ok(())
}

#[async_backtrace::framed]
async fn collect_new_page(&mut self, tp: &Wait) -> Result<(StringBlock, bool)> {
let mut res: Vec<Vec<Option<String>>> = Vec::with_capacity(self.max_rows_per_page);
let mut max_size_per_page = 10 * 1024 * 1024;
while res.len() < self.max_rows_per_page {
let mut remain_size = 10 * 1024 * 1024;
let mut remain_rows = self.max_rows_per_page;
while remain_rows > 0 && remain_size > 0 {
if let Some(row) = self.row_buffer.pop_front() {
let size = row_size(&row);
if max_size_per_page > size {
if remain_size > size {
res.push(row);
max_size_per_page -= size;
continue;
remain_size -= size;
remain_rows -= 1;
} else {
remain_size = 0;
}
}
break;
}
loop {
assert!(self.max_rows_per_page >= res.len());
let remain_rows = self.max_rows_per_page - res.len();
if remain_rows == 0 {
} else {
break;
}
}

while remain_rows > 0 && remain_size > 0 {
match tp {
Wait::Async => match self.block_receiver.try_recv() {
Some(block) => {
self.append_block(&mut res, block, remain_rows, &mut max_size_per_page)?
self.append_block(&mut res, block, &mut remain_rows, &mut remain_size)?
}
None => break,
},
Wait::Deadline(t) => {
let now = Instant::now();
let d = *t - now;
if d.is_zero() {
// timeout() will return Ok if the future completes immediately
break;
}
match tokio::time::timeout(d, self.block_receiver.recv()).await {
Ok(Some(block)) => {
debug!("http query got new block with {} rows", block.num_rows());
self.append_block(
&mut res,
block,
remain_rows,
&mut max_size_per_page,
)?;
self.append_block(&mut res, block, &mut remain_rows, &mut remain_size)?;
}
Ok(None) => {
info!("http query reach end of blocks");
Expand Down

0 comments on commit 9197028

Please sign in to comment.