Skip to content

Commit

Permalink
fix: remove futures from ws_stream_wasm module (#111)
Browse files Browse the repository at this point in the history
In #107 I removed futures in favour of futures-lite in various places.
However, because ws_stream_wasm is not a default feature I didn't notice
that I'd not removed futures from there as well.

CI also did not catch this because futures was used for the tungstenite
feature, which is often always on.

This PR removes futures entirely, by implementing our own copy of
StreamExt::send based on the futures impl, and then making sure there's
no remaining uses of futures in the crate.
  • Loading branch information
obmarg committed Jun 8, 2024
1 parent 681d7e8 commit c76a826
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ members = ["examples", "examples-wasm"]
[features]
default = ["logging"]
logging = ["dep:log"]
tungstenite = ["dep:tungstenite", "dep:futures"]
tungstenite = ["dep:tungstenite"]
client-cynic = ["cynic"]
client-graphql-client = ["graphql_client"]
ws_stream_wasm = ["dep:ws_stream_wasm", "dep:pharos"]

[dependencies]
async-channel = "2"
futures-lite = "2"
futures-sink = "0.3"
futures-timer = "3"
log = { version = "0.4", optional = true }
pin-project = "1"
Expand All @@ -39,7 +40,6 @@ thiserror = "1.0"
cynic = { version = "3", optional = true }
tungstenite = { version = "0.23", optional = true }
graphql_client = { version = "0.14.0", optional = true }
futures = { version = "0.3", optional = true }

ws_stream_wasm = { version = "0.7", optional = true }
pharos = { version = "0.5.2", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! ```rust
//! use graphql_ws_client::Client;
//! use std::future::IntoFuture;
//! use futures::StreamExt;
//! use futures_lite::StreamExt;
//! # async fn example() -> Result<(), graphql_ws_client::Error> {
//! # let connection = graphql_ws_client::__doc_utils::Conn;
//! # let subscription = graphql_ws_client::__doc_utils::Subscription;
Expand Down Expand Up @@ -44,6 +44,7 @@
mod error;
mod logging;
mod protocol;
mod sink_ext;

#[doc(hidden)]
#[path = "doc_utils.rs"]
Expand Down
4 changes: 2 additions & 2 deletions src/native.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use futures::{Sink, SinkExt};
use futures_lite::{Stream, StreamExt};
use futures_sink::Sink;
use tungstenite::{self, protocol::CloseFrame};

use crate::{Error, Message};
use crate::{sink_ext::SinkExt, Error, Message};

#[cfg_attr(docsrs, doc(cfg(feature = "tungstenite")))]
impl<T> crate::next::Connection for T
Expand Down
2 changes: 1 addition & 1 deletion src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use self::{
/// ```rust,no_run
/// use graphql_ws_client::Client;
/// use std::future::IntoFuture;
/// use futures::StreamExt;
/// use futures_lite::StreamExt;
/// # use graphql_ws_client::__doc_utils::spawn;
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
Expand Down
59 changes: 59 additions & 0 deletions src/sink_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use futures_lite::ready;
use futures_sink::Sink;

/// A very limited clone of futures::SinkExt to avoid having to pull the original in
pub trait SinkExt<Item>: Sink<Item> {
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
where
Self: Unpin,
{
Send::new(self, item)
}
}

impl<Item, T> SinkExt<Item> for T where T: Sink<Item> {}

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, Si: ?Sized, Item> {
sink: &'a mut Si,
item: Option<Item>,
}

// Pinning is never projected to children
impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}

impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
Self {
sink,
item: Some(item),
}
}
}

impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {
type Output = Result<(), Si::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut sink = Pin::new(&mut this.sink);

if let Some(item) = this.item.take() {
ready!(sink.as_mut().poll_ready(cx))?;
sink.as_mut().start_send(item)?;
}

// we're done sending the item, but want to block on flushing the
// sink
ready!(sink.poll_flush(cx))?;

Poll::Ready(Ok(()))
}
}
16 changes: 6 additions & 10 deletions src/ws_stream_wasm.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use futures::{FutureExt, SinkExt, StreamExt};
use futures_lite::{FutureExt, StreamExt};
use pharos::{Observable, ObserveConfig};
use ws_stream_wasm::{WsEvent, WsMessage, WsMeta, WsStream};

use crate::Error;
use crate::{sink_ext::SinkExt, Error};

/// A websocket connection for ws_stream_wasm
#[cfg_attr(docsrs, doc(cfg(feature = "ws_stream_wasm")))]
Expand Down Expand Up @@ -72,14 +72,10 @@ impl crate::next::Connection for Connection {

impl Connection {
async fn next(&mut self) -> Option<EventOrMessage> {
futures::select! {
event = self.event_stream.next().fuse() => {
event.map(EventOrMessage::Event)
}
message = self.messages.next().fuse() => {
message.map(EventOrMessage::Message)
}
}
let event = async { self.event_stream.next().await.map(EventOrMessage::Event) };
let message = async { self.messages.next().await.map(EventOrMessage::Message) };

event.race(message).await
}
}

Expand Down
17 changes: 9 additions & 8 deletions tests/cynic-tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{future::IntoFuture, time::Duration};

use assert_matches::assert_matches;
use futures_lite::{future, StreamExt};
use subscription_server::SubscriptionServer;
use tokio::time::sleep;

Expand Down Expand Up @@ -54,7 +55,6 @@ struct BooksChangedSubscription {
#[tokio::test]
async fn main_test() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;

let server = SubscriptionServer::start().await;

Expand Down Expand Up @@ -95,7 +95,7 @@ async fn main_test() {
},
];

futures::join!(
future::zip(
async {
for update in &updates {
server.send(update.to_owned()).unwrap();
Expand All @@ -110,14 +110,14 @@ async fn main_test() {
let data = update.data.unwrap();
assert_eq!(data.books.id.inner(), expected.id.0);
}
}
);
},
)
.await;
}

#[tokio::test]
async fn oneshot_operation_test() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;

let server = SubscriptionServer::start().await;

Expand Down Expand Up @@ -155,7 +155,7 @@ async fn oneshot_operation_test() {
},
];

futures::join!(
future::zip(
async {
sleep(Duration::from_millis(10)).await;
for update in &updates {
Expand All @@ -171,8 +171,9 @@ async fn oneshot_operation_test() {
let data = update.data.unwrap();
assert_eq!(data.books.id.inner(), expected.id.0);
}
}
);
},
)
.await;
}

fn build_query() -> cynic::StreamingOperation<BooksChangedSubscription, BooksChangedVariables> {
Expand Down
17 changes: 9 additions & 8 deletions tests/graphql-client-tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{future::IntoFuture, time::Duration};

use assert_matches::assert_matches;
use futures_lite::{future, StreamExt};
use graphql_client::GraphQLQuery;
use graphql_ws_client::graphql::StreamingOperation;
use subscription_server::SubscriptionServer;
Expand All @@ -19,7 +20,6 @@ struct BooksChanged;
#[tokio::test]
async fn main_test() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;

let server = SubscriptionServer::start().await;

Expand Down Expand Up @@ -60,7 +60,7 @@ async fn main_test() {
},
];

futures::join!(
future::zip(
async {
for update in &updates {
server.send(update.to_owned()).unwrap();
Expand All @@ -75,14 +75,14 @@ async fn main_test() {
let data = update.data.unwrap();
assert_eq!(data.books.id, expected.id.0);
}
}
);
},
)
.await;
}

#[tokio::test]
async fn oneshot_operation_test() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;

let server = SubscriptionServer::start().await;

Expand Down Expand Up @@ -120,7 +120,7 @@ async fn oneshot_operation_test() {
},
];

futures::join!(
future::zip(
async {
sleep(Duration::from_millis(10)).await;
for update in &updates {
Expand All @@ -136,8 +136,9 @@ async fn oneshot_operation_test() {
let data = update.data.unwrap();
assert_eq!(data.books.id, expected.id.0);
}
}
);
},
)
.await;
}

fn build_query() -> graphql_ws_client::graphql::StreamingOperation<BooksChanged> {
Expand Down
4 changes: 2 additions & 2 deletions tests/subscription_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use async_graphql::{EmptyMutation, Object, Schema, SimpleObject, Subscription, ID};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
use axum::{extract::Extension, routing::post, Router};
use futures::{Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use tokio::sync::broadcast::Sender;
use tokio_stream::wrappers::BroadcastStream;

Expand Down Expand Up @@ -116,6 +116,6 @@ pub struct SubscriptionRoot {
impl SubscriptionRoot {
async fn books(&self, _mutation_type: MutationType) -> impl Stream<Item = BookChanged> {
println!("Subscription received");
BroadcastStream::new(self.channel.subscribe()).filter_map(|r| async move { r.ok() })
BroadcastStream::new(self.channel.subscribe()).filter_map(|r| r.ok())
}
}

0 comments on commit c76a826

Please sign in to comment.