Skip to content

Commit

Permalink
WIP: support intermediate flushes when encoding
Browse files Browse the repository at this point in the history
due to the use of ready!(), whenever the underlying reader returns
Poll::Pending, it is transmitted directly to the caller, so there is no
flush until the entire data has been read
  • Loading branch information
Geal committed Aug 24, 2022
1 parent ada65c6 commit a35d8e5
Showing 1 changed file with 71 additions and 10 deletions.
81 changes: 71 additions & 10 deletions src/tokio/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
enum State {
Encoding,
Flushing,
Finishing,
Done,
}

Expand Down Expand Up @@ -57,27 +58,80 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
output: &mut PartialBuffer<&mut [u8]>,
) -> Poll<Result<()>> {
let mut this = self.project();
let mut read = 0usize;

loop {
println!("encoder state: {:?}", this.state);
*this.state = match this.state {
State::Encoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
this.encoder.encode(&mut input, output)?;
let len = input.written().len();
this.reader.as_mut().consume(len);
State::Encoding
let res = this.reader.as_mut().poll_fill_buf(cx);

match res {
Poll::Pending => {
println!(
"[{}] encoder got pending, read={}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
read
);
if read == 0 {
return Poll::Pending;
} else {
println!(
"will flush, output unwritten = {}",
output.unwritten().len()
);

State::Flushing
}
}
Poll::Ready(res) => {
println!(
"[{}]encoder: res_err={:?}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
match &res {
Ok(_) => String::new(),
Err(e) => e.clone().to_string(),
}
);
let input = res?;

if input.is_empty() {
State::Finishing
} else {
println!(
"encoder got input: {}",
std::str::from_utf8(input).unwrap()
);
let mut input = PartialBuffer::new(input);
this.encoder.encode(&mut input, output)?;
let len = input.written().len();
this.reader.as_mut().consume(len);
read += len;
State::Encoding
}
}
}
}

State::Flushing => {
if this.encoder.flush(output)? {
State::Encoding
} else {
State::Flushing
}
}

State::Finishing => {
if this.encoder.finish(output)? {
State::Done
} else {
State::Flushing
State::Finishing
}
}

Expand All @@ -87,7 +141,14 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
if let State::Done = *this.state {
return Poll::Ready(Ok(()));
}

println!(
"should send chunk: output.unwritten().len()={}, output.written().len()={}",
output.unwritten().len(),
output.written().len()
);
if output.unwritten().is_empty() {
println!("returning chunk");
return Poll::Ready(Ok(()));
}
}
Expand Down

0 comments on commit a35d8e5

Please sign in to comment.