Skip to content

Commit

Permalink
support intermediate flushes when encoding
Browse files Browse the repository at this point in the history
due to the use of ready!(), whenever the underlying reader returns
Poll::Pending, it is transmitted directly to the caller, so there is no
flush until the entire data has been read.

This commit flushes compressed data when we get Poll::Pending, and stays
in the flushing state in case there is more data to send.
  • Loading branch information
Geal committed Aug 25, 2022
1 parent 149c9f8 commit 9800cd0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
12 changes: 0 additions & 12 deletions src/codec/flate/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,6 @@ impl Encode for FlateEncoder {
FlushCompress::Sync,
)?;

loop {
let old_len = output.written().len();
self.encode(
&mut PartialBuffer::new(&[][..]),
output,
FlushCompress::None,
)?;
if output.written().len() == old_len {
break;
}
}

self.flushed = true;
Ok(!output.unwritten().is_empty())
}
Expand Down
49 changes: 40 additions & 9 deletions src/tokio/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
enum State {
Encoding,
Flushing,
Finishing,
Done,
}

Expand Down Expand Up @@ -57,27 +58,56 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();
let mut read = 0usize;

loop {
*this.state = match this.state {
State::Encoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
let res = this.reader.as_mut().poll_fill_buf(cx);

match res {
Poll::Pending => {
if read == 0 {
return Poll::Pending;
} else {
State::Flushing
}
}
Poll::Ready(res) => {
let input = res?;

if input.is_empty() {
State::Finishing
} else {
let mut input = PartialBuffer::new(input);
this.encoder.encode(&mut input, output)?;
let len = input.written().len();
this.reader.as_mut().consume(len);
read += len;
State::Encoding
}
}
}
}

State::Flushing => {
if read == 0 {
let mut input = PartialBuffer::new(&[][..]);
this.encoder.encode(&mut input, output)?;
let len = input.written().len();
this.reader.as_mut().consume(len);
}

if this.encoder.flush(output)? {
State::Encoding
} else {
State::Flushing
}
}

State::Flushing => {
State::Finishing => {
if this.encoder.finish(output)? {
State::Done
} else {
State::Flushing
State::Finishing
}
}

Expand All @@ -87,6 +117,7 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
if let State::Done = *this.state {
return Poll::Ready(Ok(()));
}

if output.unwritten().is_empty() {
return Poll::Ready(Ok(()));
}
Expand Down

0 comments on commit 9800cd0

Please sign in to comment.