Skip to content

Commit

Permalink
feat(core): Allow concurrent reading on bytes stream (#4499)
Browse files Browse the repository at this point in the history
feat(core): Allow concurrent work on bytes stream

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Apr 17, 2024
1 parent 5299879 commit 755de58
Showing 1 changed file with 78 additions and 72 deletions.
150 changes: 78 additions & 72 deletions core/src/types/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::ops::RangeBounds;

use bytes::BufMut;
use futures::stream;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;

Expand Down Expand Up @@ -72,7 +71,7 @@ impl Reader {
///
/// - Buffer length smaller than range means we have reached the end of file.
pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
let bufs: Vec<_> = self.into_stream(range).try_collect().await?;
let bufs: Vec<_> = self.clone().into_stream(range).try_collect().await?;
Ok(bufs.into_iter().flatten().collect())
}

Expand All @@ -89,7 +88,7 @@ impl Reader {
buf: &mut impl BufMut,
range: impl RangeBounds<u64>,
) -> Result<usize> {
let mut stream = self.into_stream(range);
let mut stream = self.clone().into_stream(range);

let mut read = 0;
loop {
Expand Down Expand Up @@ -158,17 +157,14 @@ impl Reader {
}

/// Create a buffer stream to read specific range from given reader.
pub fn into_stream(
&self,
range: impl RangeBounds<u64>,
) -> impl Stream<Item = Result<Buffer>> + Unpin + MaybeSend + 'static {
let futs = into_future_iterator::ReadFutureIterator::new(
self.inner.clone(),
self.options.chunk(),
range,
);

stream::iter(futs).buffered(self.options.concurrent())
///
/// # Notes
///
/// This API can be public but we are not sure if it's useful for users.
/// And the name `BufferStream` is not good enough to expose to users.
/// Let's keep it inside for now.
fn into_stream(self, range: impl RangeBounds<u64>) -> into_stream::BufferStream {
into_stream::BufferStream::new(self.inner, self.options, range)
}

/// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
Expand All @@ -180,22 +176,29 @@ impl Reader {

/// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
#[inline]
pub fn into_bytes_stream(self, range: Range<u64>) -> FuturesBytesStream {
FuturesBytesStream::new(self.inner, self.options.chunk(), range)
pub fn into_bytes_stream(self, range: impl RangeBounds<u64>) -> FuturesBytesStream {
let stream = self.into_stream(range);
FuturesBytesStream::new(stream)
}
}

pub mod into_future_iterator {
pub mod into_stream {
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
use std::{
ops::{Bound, RangeBounds},
sync::{atomic::AtomicBool, Arc},
};

use futures::stream::{self, Buffered, Iter};
use futures::{Stream, StreamExt};

use crate::raw::*;
use crate::*;

pub struct ReadFutureIterator {
/// ReadFutureIterator is an iterator that returns future of [`Buffer`] from [`Reader`].
struct ReadFutureIterator {
r: oio::Reader,
chunk: Option<usize>,

Expand All @@ -205,7 +208,8 @@ pub mod into_future_iterator {
}

impl ReadFutureIterator {
pub fn new(r: oio::Reader, chunk: Option<usize>, range: impl RangeBounds<u64>) -> Self {
#[inline]
fn new(r: oio::Reader, chunk: Option<usize>, range: impl RangeBounds<u64>) -> Self {
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start + 1,
Expand Down Expand Up @@ -262,6 +266,45 @@ pub mod into_future_iterator {
Some(Box::pin(fut))
}
}

/// BufferStream is a stream that returns [`Buffer`] from [`Reader`].
///
/// This stream will use concurrent read to fetch data from underlying storage.
///
/// # Notes
///
/// BufferStream uses `Buffered<Iter<ReadFutureIterator>>` internally,
/// but we want to hide those details from users.
pub struct BufferStream(Buffered<Iter<ReadFutureIterator>>);

impl BufferStream {
/// Create a new buffer stream from given reader.
#[inline]
pub fn new(r: oio::Reader, options: OpReader, range: impl RangeBounds<u64>) -> Self {
let iter = ReadFutureIterator::new(r, options.chunk(), range);
let stream = stream::iter(iter).buffered(options.concurrent());

BufferStream(stream)
}
}

impl Stream for BufferStream {
type Item = Result<Buffer>;

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::raw::MaybeSend;

trait AssertTrait: Unpin + MaybeSend + 'static {}
impl AssertTrait for BufferStream {}
}
}

pub mod into_futures_async_read {
Expand Down Expand Up @@ -418,15 +461,16 @@ pub mod into_futures_async_read {

pub mod into_futures_stream {
use std::io;
use std::ops::Range;
use std::pin::Pin;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::Stream;
use futures::StreamExt;

use super::into_stream::BufferStream;
use crate::raw::*;
use crate::*;

Expand All @@ -436,37 +480,17 @@ pub mod into_futures_stream {
///
/// FuturesStream also implements [`Unpin`], [`Send`] and [`Sync`].
pub struct FuturesBytesStream {
r: oio::Reader,
state: State,
offset: u64,
size: u64,
chunk: usize,

cur: u64,
}

enum State {
Idle(Buffer),
Next(BoxedStaticFuture<Result<Buffer>>),
stream: BufferStream,
buf: Buffer,
}

/// # Safety
///
/// FuturesReader only exposes `&mut self` to the outside world, so it's safe to be `Sync`.
unsafe impl Sync for State {}

impl FuturesBytesStream {
/// NOTE: don't allow users to create FuturesStream directly.
#[inline]
pub(crate) fn new(r: oio::Reader, chunk: Option<usize>, range: Range<u64>) -> Self {
pub(crate) fn new(stream: BufferStream) -> Self {
FuturesBytesStream {
r,
state: State::Idle(Buffer::new()),
offset: range.start,
size: range.end - range.start,
chunk: chunk.unwrap_or(8 * 1024 * 1024),

cur: 0,
stream,
buf: Buffer::new(),
}
}
}
Expand All @@ -478,34 +502,16 @@ pub mod into_futures_stream {
let this = self.get_mut();

loop {
match &mut this.state {
State::Idle(buf) => {
// Consume current buffer
if let Some(bs) = buf.next() {
return Poll::Ready(Some(Ok(bs)));
}

// Make sure cur didn't exceed size.
if this.cur >= this.size {
return Poll::Ready(None);
}

let r = this.r.clone();
let next_offset = this.offset + this.cur;
let next_size = (this.size - this.cur).min(this.chunk as u64) as usize;
let fut = async move { r.read_at_dyn(next_offset, next_size).await };
this.state = State::Next(Box::pin(fut));
}
State::Next(fut) => {
let res = ready!(fut.as_mut().poll(cx));
match res {
Ok(buf) => {
this.state = State::Idle(buf);
}
Err(err) => return Poll::Ready(Some(Err(format_std_io_error(err)))),
};
}
// Consume current buffer
if let Some(bs) = Iterator::next(&mut this.buf) {
return Poll::Ready(Some(Ok(bs)));
}

this.buf = match ready!(this.stream.poll_next_unpin(cx)) {
Some(Ok(buf)) => buf,
Some(Err(err)) => return Poll::Ready(Some(Err(format_std_io_error(err)))),
None => return Poll::Ready(None),
};
}
}
}
Expand Down

0 comments on commit 755de58

Please sign in to comment.