diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index fcd7e064e7c..00c18ee16f3 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -438,19 +438,13 @@ impl AsyncWrite for BufWriter { } BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from), BufWriterState::Write(x) => { - let upload = x.take().ok_or_else(|| { - std::io::Error::new( - ErrorKind::InvalidInput, - "Cannot shutdown a writer that has already been shut down", + if let Some(upload) = x.take() { + self.state = BufWriterState::Flush( + async move { upload.finish().await.map(|_| ()) }.boxed(), ) - })?; - self.state = BufWriterState::Flush( - async move { - upload.finish().await?; - Ok(()) - } - .boxed(), - ) + } else { + return Poll::Ready(Ok(())); + } } } } diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 4df4d8fd46a..11a71d5543a 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -221,6 +221,7 @@ impl WriteMultipart { /// Flush final chunk, and await completion of all in-flight requests pub async fn finish(mut self) -> Result { if !self.buffer.is_empty() { + self.wait_for_capacity(0).await?; let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) }