Skip to content

Commit

Permalink
Specifically test MSRV on CI (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
obmarg committed Feb 11, 2024
1 parent 4e2d7ca commit ae70005
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 34 deletions.
10 changes: 3 additions & 7 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- uses: dtolnay/rust-toolchain@1.76.0
with:
profile: minimal
toolchain: stable
components: rustfmt
- name: Check formatting
uses: actions-rs/cargo@v1
with:
Expand All @@ -33,10 +32,7 @@ jobs:

steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
- uses: dtolnay/[email protected]
- uses: Swatinem/rust-cache@v2
- name: Build
uses: actions-rs/cargo@v1
Expand Down
45 changes: 18 additions & 27 deletions src/next/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
Operation::Response: 'static,
{
Self {
stream: self.stream.join(future).boxed(),
stream: join_stream(self.stream, future).boxed(),
..self
}
}
Expand All @@ -61,34 +61,25 @@ where
}
}

trait JoinStreamExt<'a> {
type Item;

/// Joins a future onto the execution of a stream returning a stream that also polls
/// the given future.
///
/// If the future ends the stream will still continue till completion but if the stream
/// ends the future will be cancelled.
///
/// This can be used when you have the receivng side of a channel and a future that sends
/// on that channel - combining the two into a single stream that'll run till the channel
/// is exhausted. If you drop the stream you also cancel the underlying process.
fn join(self, future: Fuse<BoxFuture<'static, ()>>) -> impl Stream<Item = Self::Item>;
/// Joins a future onto the execution of a stream returning a stream that also polls
/// the given future.
///
/// If the future ends the stream will still continue till completion but if the stream
/// ends the future will be cancelled.
///
/// This can be used when you have the receivng side of a channel and a future that sends
/// on that channel - combining the two into a single stream that'll run till the channel
/// is exhausted. If you drop the stream you also cancel the underlying process.
fn join_stream<Item>(
stream: BoxStream<'static, Item>,
future: Fuse<BoxFuture<'static, ()>>,
) -> impl Stream<Item = Item> {
futures::stream::unfold(
ProducerState::Running(stream.fuse(), future),
producer_handler,
)
}

impl<'a, Item> JoinStreamExt<'a> for BoxStream<'static, Item>
where
Item: 'static,
{
type Item = Item;

fn join(self, future: Fuse<BoxFuture<'static, ()>>) -> impl Stream<Item = Self::Item> + 'a {
futures::stream::unfold(
ProducerState::Running(self.fuse(), future),
producer_handler,
)
}
}
enum ProducerState<'a, Item> {
Running(
stream::Fuse<BoxStream<'a, Item>>,
Expand Down

0 comments on commit ae70005

Please sign in to comment.