diff --git a/src/tokio/bufread/generic/encoder.rs b/src/tokio/bufread/generic/encoder.rs index a80fa122..786eaf1d 100644 --- a/src/tokio/bufread/generic/encoder.rs +++ b/src/tokio/bufread/generic/encoder.rs @@ -13,6 +13,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; enum State { Encoding, Flushing, + Finishing, Done, } @@ -57,27 +58,80 @@ impl Encoder { output: &mut PartialBuffer<&mut [u8]>, ) -> Poll> { let mut this = self.project(); + let mut read = 0usize; loop { + println!("encoder state: {:?}", this.state); *this.state = match this.state { State::Encoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - this.encoder.encode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - State::Encoding + let res = this.reader.as_mut().poll_fill_buf(cx); + + match res { + Poll::Pending => { + println!( + "[{}] encoder got pending, read={}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + read + ); + if read == 0 { + return Poll::Pending; + } else { + println!( + "will flush, output unwritten = {}", + output.unwritten().len() + ); + + State::Flushing + } + } + Poll::Ready(res) => { + println!( + "[{}]encoder: res_err={:?}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + match &res { + Ok(_) => String::new(), + Err(e) => e.clone().to_string(), + } + ); + let input = res?; + + if input.is_empty() { + State::Finishing + } else { + println!( + "encoder got input: {}", + std::str::from_utf8(input).unwrap() + ); + let mut input = PartialBuffer::new(input); + this.encoder.encode(&mut input, output)?; + let len = input.written().len(); + this.reader.as_mut().consume(len); + read += len; + State::Encoding + } + } } } State::Flushing => { + if this.encoder.flush(output)? { + State::Encoding + } else { + State::Flushing + } + } + + State::Finishing => { if this.encoder.finish(output)? { State::Done } else { - State::Flushing + State::Finishing } } @@ -87,7 +141,14 @@ impl Encoder { if let State::Done = *this.state { return Poll::Ready(Ok(())); } + + println!( + "should send chunk: output.unwritten().len()={}, output.written().len()={}", + output.unwritten().len(), + output.written().len() + ); if output.unwritten().is_empty() { + println!("returning chunk"); return Poll::Ready(Ok(())); } }