From 587c1b45e00e9afdcef0a65250f3ce4ed4beb73f Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Sat, 3 Feb 2024 15:54:40 +0000 Subject: [PATCH 01/10] start building an easier one-shot subscribe API --- examples-wasm/examples/subscriptions.rs | 4 +- src/doc_utils.rs | 14 ++++ src/lib.rs | 4 + src/next/builder.rs | 101 +++++++++++++++++++----- src/next/mod.rs | 2 +- src/next/stream.rs | 75 +++++++++++++++++- tests/cynic-tests.rs | 3 +- 7 files changed, 175 insertions(+), 28 deletions(-) create mode 100644 src/doc_utils.rs diff --git a/examples-wasm/examples/subscriptions.rs b/examples-wasm/examples/subscriptions.rs index 7f8df70..1dbf9ed 100644 --- a/examples-wasm/examples/subscriptions.rs +++ b/examples-wasm/examples/subscriptions.rs @@ -5,7 +5,7 @@ use std::future::IntoFuture; -use graphql_ws_client::{next::ClientBuilder, ws_stream_wasm::Connection}; +use graphql_ws_client::{next::Client, ws_stream_wasm::Connection}; mod schema { cynic::use_schema!("../schemas/books.graphql"); @@ -57,7 +57,7 @@ async fn main() { let connection = Connection::new(ws_conn).await; - let (mut client, actor) = ClientBuilder::new().build(connection).await.unwrap(); + let (mut client, actor) = Client::build(connection).await.unwrap(); wasm_bindgen_futures::spawn_local(actor.into_future()); let mut stream = client.streaming_operation(build_query()).await.unwrap(); diff --git a/src/doc_utils.rs b/src/doc_utils.rs new file mode 100644 index 0000000..2ea9d63 --- /dev/null +++ b/src/doc_utils.rs @@ -0,0 +1,14 @@ +use crate::{next::Message, Error}; + +pub struct Conn; + +#[async_trait::async_trait] +impl crate::next::Connection for Conn { + async fn receive(&mut self) -> Option { + unimplemented!() + } + + async fn send(&mut self, _: Message) -> Result<(), Error> { + unimplemented!() + } +} diff --git a/src/lib.rs b/src/lib.rs index 694aa9d..cd28cd8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,10 @@ mod client; mod logging; mod protocol; +#[doc(hidden)] +#[path = "doc_utils.rs"] +pub mod __doc_utils; + pub mod graphql; pub mod websockets; diff --git a/src/next/builder.rs b/src/next/builder.rs index 1034199..551028b 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -1,29 +1,47 @@ -use std::collections::HashMap; +use std::{collections::HashMap, future::IntoFuture}; -use futures::channel::mpsc; +use futures::{channel::mpsc, future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; use serde::Serialize; -use crate::{logging::trace, protocol::Event, Error}; +use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error}; use super::{ actor::ConnectionActor, connection::{Connection, Message}, - Client, + Client, SubscriptionStream, }; /// A websocket client builder -#[derive(Default)] pub struct ClientBuilder { payload: Option, subscription_buffer_size: Option, + connection: Box, } -impl ClientBuilder { - /// Constructs an AsyncWebsocketClientBuilder - pub fn new() -> ClientBuilder { - ClientBuilder::default() +impl super::Client { + /// Starts building a new Client. + /// + /// ```rust + /// use graphql_ws_client::next::{Client}; + /// use std::future::IntoFuture; + /// # let conn = graphql_ws_client::Conn + /// # fn main() -> Result<(), graphql_ws_client_::Error> { + /// let (client, actor) = Client::new(conn).await? + /// # } + /// ``` + pub fn build(connection: Conn) -> ClientBuilder + where + Conn: Connection + Send + 'static, + { + ClientBuilder { + payload: None, + subscription_buffer_size: None, + connection: Box::new(connection), + } } +} +impl ClientBuilder { /// Add payload to `connection_init` pub fn payload(self, payload: NewPayload) -> Result where @@ -44,6 +62,50 @@ impl ClientBuilder { ..self } } + + /// Initialise a Client and use it to run a single streaming operation + /// + /// ``` + /// todo!("a doctest") + /// ``` + /// If users want to run mutliple operations on a connection they + /// should use the `IntoFuture` impl to construct a `Client` + pub async fn streaming_operation<'a, Operation>( + self, + op: Operation, + ) -> Result, Error> + where + Operation: GraphqlOperation + Unpin + Send + 'static, + { + let (mut client, actor) = self.await?; + + let mut actor_future = actor.into_future().fuse(); + + let subscribe_future = client.streaming_operation(op).fuse(); + futures::pin_mut!(subscribe_future); + + // Temporarily run actor_future while we start the subscription + let stream = futures::select! { + () = actor_future => { + return Err(Error::Unknown("actor ended before subscription started".into())) + }, + result = subscribe_future => { + result? + } + }; + + Ok(stream.join(actor_future)) + } +} + +impl IntoFuture for ClientBuilder { + type Output = Result<(Client, ConnectionActor), Error>; + + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + todo!() + } } impl ClientBuilder { @@ -52,18 +114,14 @@ impl ClientBuilder { /// Accepts an already built websocket connection, and returns the connection /// and a future that must be awaited somewhere - if the future is dropped the /// connection will also drop. - pub async fn build(self, connection: Conn) -> Result<(Client, ConnectionActor), Error> - where - Conn: Connection + Send + 'static, - { - self.build_impl(Box::new(connection)).await - } + pub async fn build(self) -> Result<(Client, ConnectionActor), Error> { + let Self { + payload, + subscription_buffer_size, + mut connection, + } = self; - async fn build_impl( - self, - mut connection: Box, - ) -> Result<(Client, ConnectionActor), Error> { - connection.send(Message::init(self.payload)).await?; + connection.send(Message::init(payload)).await?; // wait for ack before entering receiver loop: loop { @@ -108,7 +166,8 @@ impl ClientBuilder { let actor = ConnectionActor::new(connection, command_receiver); - let client = Client::new(command_sender, self.subscription_buffer_size.unwrap_or(5)); + let client = + Client::new_internal(command_sender, self.subscription_buffer_size.unwrap_or(5)); Ok((client, actor)) } diff --git a/src/next/mod.rs b/src/next/mod.rs index 7a38e03..e82e5e6 100644 --- a/src/next/mod.rs +++ b/src/next/mod.rs @@ -36,7 +36,7 @@ pub struct Client { } impl Client { - pub(super) fn new( + pub(super) fn new_internal( actor: mpsc::Sender, subscription_buffer_size: usize, ) -> Self { diff --git a/src/next/stream.rs b/src/next/stream.rs index fdbf3d1..b1fe9f6 100644 --- a/src/next/stream.rs +++ b/src/next/stream.rs @@ -3,7 +3,12 @@ use std::{ task::{Context, Poll}, }; -use futures::{channel::mpsc, SinkExt, Stream}; +use futures::{ + channel::mpsc, + future::{self, BoxFuture, Fuse}, + stream::{self, BoxStream}, + Future, FutureExt, SinkExt, Stream, StreamExt, +}; use crate::{graphql::GraphqlOperation, Error}; @@ -18,7 +23,7 @@ where Operation: GraphqlOperation, { pub(super) id: usize, - pub(super) stream: Pin> + Send>>, + pub(super) stream: BoxStream<'static, Result>, pub(super) actor: mpsc::Sender, } @@ -33,6 +38,16 @@ where .await .map_err(|error| Error::Send(error.to_string())) } + + pub(super) fn join(self, future: Fuse>) -> Self + where + Operation::Response: 'static, + { + Self { + stream: self.stream.join(future).boxed(), + ..self + } + } } impl Stream for SubscriptionStream @@ -45,3 +60,59 @@ where self.project().stream.as_mut().poll_next(cx) } } + +trait JoinStreamExt<'a> { + type Item; + + /// Joins a future onto the execution of a stream returning a stream that also polls + /// the given future. + /// + /// If the future ends the stream will still continue till completion but if the stream + /// ends the future will be cancelled. + /// + /// This can be used when you have the receivng side of a channel and a future that sends + /// on that channel - combining the two into a single stream that'll run till the channel + /// is exhausted. If you drop the stream you also cancel the underlying process. + fn join(self, future: Fuse>) -> impl Stream; +} + +impl<'a, Item> JoinStreamExt<'a> for BoxStream<'static, Item> +where + Item: 'static, +{ + type Item = Item; + + fn join(self, future: Fuse>) -> impl Stream + 'a { + futures::stream::unfold( + ProducerState::Running(self.fuse(), future), + |mut state| async { + loop { + match state { + ProducerState::Running(mut stream, mut producer) => { + futures::select! { + output = stream.next() => { + return Some((output?, ProducerState::Running(stream, producer))); + } + _ = producer => { + state = ProducerState::Draining(stream); + continue; + } + } + } + ProducerState::Draining(mut stream) => { + return Some((stream.next().await?, ProducerState::Draining(stream))) + } + } + } + }, + ) + } +} + +enum ProducerState<'a, Item> { + Running( + stream::Fuse>, + future::Fuse>, + ), + Draining(stream::Fuse>), +} diff --git a/tests/cynic-tests.rs b/tests/cynic-tests.rs index 538beb8..8c76c23 100644 --- a/tests/cynic-tests.rs +++ b/tests/cynic-tests.rs @@ -75,8 +75,7 @@ async fn main_test() { println!("Connected"); - let (mut client, actor) = graphql_ws_client::next::ClientBuilder::new() - .build(connection) + let (mut client, actor) = graphql_ws_client::next::Client::build(connection) .await .unwrap(); From 0b8c6eb740accfbaa6a0d5b03f03af3e87208c4d Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Fri, 9 Feb 2024 19:04:42 +0000 Subject: [PATCH 02/10] Update src/next/builder.rs --- src/next/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/next/builder.rs b/src/next/builder.rs index 551028b..5fed05b 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -26,7 +26,7 @@ impl super::Client { /// use std::future::IntoFuture; /// # let conn = graphql_ws_client::Conn /// # fn main() -> Result<(), graphql_ws_client_::Error> { - /// let (client, actor) = Client::new(conn).await? + /// let (client, actor) = Client::build(conn).await? /// # } /// ``` pub fn build(connection: Conn) -> ClientBuilder From 5c3024fab381f03da2e33732debb9a7c8a1d45ed Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Sat, 3 Feb 2024 16:59:47 +0000 Subject: [PATCH 03/10] some docs & a fix --- src/next/builder.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/next/builder.rs b/src/next/builder.rs index 5fed05b..7a984e0 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -11,7 +11,16 @@ use super::{ Client, SubscriptionStream, }; -/// A websocket client builder +/// Builder for Clients. +/// +/// ```rust +/// use graphql_ws_client::next::{Client}; +/// use std::future::IntoFuture; +/// # let conn = graphql_ws_client::Conn +/// # fn main() -> Result<(), graphql_ws_client_::Error> { +/// let (client, actor) = Client::new(conn).await? +/// # } +/// ``` pub struct ClientBuilder { payload: Option, subscription_buffer_size: Option, @@ -19,7 +28,7 @@ pub struct ClientBuilder { } impl super::Client { - /// Starts building a new Client. + /// Creates a ClientBuilder with the given connection. /// /// ```rust /// use graphql_ws_client::next::{Client}; @@ -104,7 +113,7 @@ impl IntoFuture for ClientBuilder { type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { - todo!() + Box::pin(self.build()) } } From b401dee3c6479ed7289aecb820756edc5993c403 Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Fri, 9 Feb 2024 19:32:21 +0000 Subject: [PATCH 04/10] fix doctests for new builder --- src/doc_utils.rs | 13 +++++++++++++ src/next/builder.rs | 30 ++++++++++++++++++++++-------- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/src/doc_utils.rs b/src/doc_utils.rs index 2ea9d63..b38fa26 100644 --- a/src/doc_utils.rs +++ b/src/doc_utils.rs @@ -12,3 +12,16 @@ impl crate::next::Connection for Conn { unimplemented!() } } + +#[derive(serde::Serialize)] +pub struct Subscription; + +impl crate::graphql::GraphqlOperation for Subscription { + type Response = (); + + type Error = crate::Error; + + fn decode(&self, _data: serde_json::Value) -> Result { + unimplemented!() + } +} diff --git a/src/next/builder.rs b/src/next/builder.rs index 7a984e0..040b3c7 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -16,9 +16,11 @@ use super::{ /// ```rust /// use graphql_ws_client::next::{Client}; /// use std::future::IntoFuture; -/// # let conn = graphql_ws_client::Conn -/// # fn main() -> Result<(), graphql_ws_client_::Error> { -/// let (client, actor) = Client::new(conn).await? +/// # +/// # async fn example() -> Result<(), graphql_ws_client::Error> { +/// # let connection = graphql_ws_client::__doc_utils::Conn; +/// let (client, actor) = Client::build(connection).await?; +/// # Ok(()) /// # } /// ``` pub struct ClientBuilder { @@ -33,9 +35,10 @@ impl super::Client { /// ```rust /// use graphql_ws_client::next::{Client}; /// use std::future::IntoFuture; - /// # let conn = graphql_ws_client::Conn - /// # fn main() -> Result<(), graphql_ws_client_::Error> { - /// let (client, actor) = Client::build(conn).await? + /// # async fn example() -> Result<(), graphql_ws_client::Error> { + /// # let connection = graphql_ws_client::__doc_utils::Conn; + /// let (client, actor) = Client::build(connection).await?; + /// # Ok(()) /// # } /// ``` pub fn build(connection: Conn) -> ClientBuilder @@ -74,9 +77,20 @@ impl ClientBuilder { /// Initialise a Client and use it to run a single streaming operation /// + /// ```rust + /// use graphql_ws_client::next::{Client}; + /// use std::future::IntoFuture; + /// # async fn example() -> Result<(), graphql_ws_client::Error> { + /// # let connection = graphql_ws_client::__doc_utils::Conn; + /// # let subscription = graphql_ws_client::__doc_utils::Subscription; + /// let stream = Client::build(connection).streaming_operation(subscription).await?; + /// # Ok(()) + /// # } /// ``` - /// todo!("a doctest") - /// ``` + /// + /// Note that this takes ownership of the client, so it cannot be + /// used to run any more operations. + /// /// If users want to run mutliple operations on a connection they /// should use the `IntoFuture` impl to construct a `Client` pub async fn streaming_operation<'a, Operation>( From a337e3b7cbb199c6337e86514ee39127471a99f1 Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Fri, 9 Feb 2024 19:58:06 +0000 Subject: [PATCH 05/10] wip test updates --- tests/cynic-tests.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/cynic-tests.rs b/tests/cynic-tests.rs index 8c76c23..5b68e8e 100644 --- a/tests/cynic-tests.rs +++ b/tests/cynic-tests.rs @@ -119,6 +119,70 @@ async fn main_test() { ); } +#[tokio::test] +async fn oneshot_operation_test() { + use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue}; + use futures::StreamExt; + + let (channel, _) = tokio::sync::broadcast::channel(10); + + subscription_server::start(57433, channel.clone()).await; + + sleep(Duration::from_millis(20)).await; + + let mut request = "ws://localhost:57433/ws".into_client_request().unwrap(); + request.headers_mut().insert( + "Sec-WebSocket-Protocol", + HeaderValue::from_str("graphql-transport-ws").unwrap(), + ); + + let (connection, _) = async_tungstenite::tokio::connect_async(request) + .await + .unwrap(); + + println!("Connected"); + + let stream = graphql_ws_client::next::Client::build(connection) + .streaming_operation(build_query()) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let updates = [ + subscription_server::BookChanged { + id: "123".into(), + book: None, + }, + subscription_server::BookChanged { + id: "456".into(), + book: None, + }, + subscription_server::BookChanged { + id: "789".into(), + book: None, + }, + ]; + + futures::join!( + async { + for update in &updates { + channel.send(update.to_owned()).unwrap(); + } + }, + async { + let received_updates = stream.take(updates.len()).collect::>().await; + + for (expected, update) in updates.iter().zip(received_updates) { + let update = update.unwrap(); + assert_matches!(update.errors, None); + let data = update.data.unwrap(); + assert_eq!(data.books.id.inner(), expected.id.0); + } + } + ); +} + fn build_query() -> cynic::StreamingOperation { use cynic::SubscriptionBuilder; From 568f95ba9a45fa5f15bdb4f10838cdd03f0ec539 Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Fri, 9 Feb 2024 20:14:06 +0000 Subject: [PATCH 06/10] extract producer_handler --- src/next/stream.rs | 45 ++++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/src/next/stream.rs b/src/next/stream.rs index b1fe9f6..f567741 100644 --- a/src/next/stream.rs +++ b/src/next/stream.rs @@ -85,30 +85,10 @@ where fn join(self, future: Fuse>) -> impl Stream + 'a { futures::stream::unfold( ProducerState::Running(self.fuse(), future), - |mut state| async { - loop { - match state { - ProducerState::Running(mut stream, mut producer) => { - futures::select! { - output = stream.next() => { - return Some((output?, ProducerState::Running(stream, producer))); - } - _ = producer => { - state = ProducerState::Draining(stream); - continue; - } - } - } - ProducerState::Draining(mut stream) => { - return Some((stream.next().await?, ProducerState::Draining(stream))) - } - } - } - }, + producer_handler, ) } } - enum ProducerState<'a, Item> { Running( stream::Fuse>, @@ -116,3 +96,26 @@ enum ProducerState<'a, Item> { ), Draining(stream::Fuse>), } + +async fn producer_handler( + mut state: ProducerState<'_, Item>, +) -> Option<(Item, ProducerState<'_, Item>)> { + loop { + match state { + ProducerState::Running(mut stream, mut producer) => { + futures::select! { + output = stream.next() => { + return Some((output?, ProducerState::Running(stream, producer))); + } + _ = producer => { + state = ProducerState::Draining(stream); + continue; + } + } + } + ProducerState::Draining(mut stream) => { + return Some((stream.next().await?, ProducerState::Draining(stream))) + } + } + } +} From adc8a549c4602434f0cf8dcc3e11eed100b0f00a Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Sat, 10 Feb 2024 13:00:33 +0000 Subject: [PATCH 07/10] improve subscription server from tests --- Cargo.toml | 12 +++-- tests/cynic-tests.rs | 19 +++----- tests/subscription_server/mod.rs | 76 ++++++++++++++++++++++++++------ 3 files changed, 78 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ce24543..f7f826b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,11 +43,15 @@ pharos = { version = "0.5.2", optional = true } [dev-dependencies] assert_matches = "1.5" -async-graphql = "5.0.10" -async-graphql-axum = "5" +async-graphql = "7.0.1" +async-graphql-axum = "7" async-tungstenite = { version = "0.24", features = ["tokio-runtime"] } -axum = "0.6" +axum = "0.7" +axum-macros = "0.4" cynic = { version = "3" } insta = "1.11" tokio = { version = "1", features = ["macros"] } -tokio-stream = { version = "0.1", features = ["sync"] } \ No newline at end of file +tokio-stream = { version = "0.1", features = ["sync"] } + +graphql-ws-client.path = "." +graphql-ws-client.features = ["client-cynic", "client-graphql-client", "async-tungstenite"] \ No newline at end of file diff --git a/tests/cynic-tests.rs b/tests/cynic-tests.rs index 5b68e8e..00673d8 100644 --- a/tests/cynic-tests.rs +++ b/tests/cynic-tests.rs @@ -1,8 +1,7 @@ -#![cfg(feature = "cynic")] - use std::{future::IntoFuture, time::Duration}; use assert_matches::assert_matches; +use subscription_server::SubscriptionServer; use tokio::time::sleep; mod subscription_server; @@ -57,13 +56,11 @@ async fn main_test() { use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue}; use futures::StreamExt; - let (channel, _) = tokio::sync::broadcast::channel(10); - - subscription_server::start(57432, channel.clone()).await; + let server = SubscriptionServer::start().await; sleep(Duration::from_millis(20)).await; - let mut request = "ws://localhost:57432/ws".into_client_request().unwrap(); + let mut request = server.websocket_url().into_client_request().unwrap(); request.headers_mut().insert( "Sec-WebSocket-Protocol", HeaderValue::from_str("graphql-transport-ws").unwrap(), @@ -103,7 +100,7 @@ async fn main_test() { futures::join!( async { for update in &updates { - channel.send(update.to_owned()).unwrap(); + server.send(update.to_owned()).unwrap(); } }, async { @@ -124,13 +121,11 @@ async fn oneshot_operation_test() { use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue}; use futures::StreamExt; - let (channel, _) = tokio::sync::broadcast::channel(10); - - subscription_server::start(57433, channel.clone()).await; + let server = SubscriptionServer::start().await; sleep(Duration::from_millis(20)).await; - let mut request = "ws://localhost:57433/ws".into_client_request().unwrap(); + let mut request = server.websocket_url().into_client_request().unwrap(); request.headers_mut().insert( "Sec-WebSocket-Protocol", HeaderValue::from_str("graphql-transport-ws").unwrap(), @@ -167,7 +162,7 @@ async fn oneshot_operation_test() { futures::join!( async { for update in &updates { - channel.send(update.to_owned()).unwrap(); + server.send(update.to_owned()).unwrap(); } }, async { diff --git a/tests/subscription_server/mod.rs b/tests/subscription_server/mod.rs index 1047948..ddac633 100644 --- a/tests/subscription_server/mod.rs +++ b/tests/subscription_server/mod.rs @@ -3,29 +3,79 @@ use async_graphql::{EmptyMutation, Object, Schema, SimpleObject, Subscription, ID}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription}; -use axum::{extract::Extension, routing::post, Router, Server}; +use axum::{extract::Extension, routing::post, Router}; use futures::{Stream, StreamExt}; use tokio::sync::broadcast::Sender; use tokio_stream::wrappers::BroadcastStream; pub type BooksSchema = Schema; -pub async fn start(port: u16, channel: Sender) { - let schema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot { channel }).finish(); +pub struct SubscriptionServer { + shutdown: Option>, + port: u16, + sender: Sender, +} + +impl Drop for SubscriptionServer { + fn drop(&mut self) { + if let Some(shutdown) = self.shutdown.take() { + shutdown.send(()).ok(); + } + } +} + +impl SubscriptionServer { + pub async fn start() -> SubscriptionServer { + let (channel, _) = tokio::sync::broadcast::channel(16); + + let schema = Schema::build( + QueryRoot, + EmptyMutation, + SubscriptionRoot { + channel: channel.clone(), + }, + ) + .finish(); + + let app = Router::new() + .route("/", post(graphql_handler)) + .route_service("/ws", GraphQLSubscription::new(schema.clone())) + .layer(Extension(schema)); - let app = Router::new() - .route("/", post(graphql_handler)) - .route_service("/ws", GraphQLSubscription::new(schema.clone())) - .layer(Extension(schema)); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); - tokio::spawn(async move { - Server::bind(&format!("127.0.0.1:{port}").parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); - }); + let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>(); + + tokio::spawn(async move { + axum::serve(listener, app.with_state(())) + .with_graceful_shutdown(async move { + shutdown_receiver.await.ok(); + }) + .await + .unwrap(); + }); + + SubscriptionServer { + port, + shutdown: Some(shutdown_sender), + sender: channel, + } + } + + pub fn websocket_url(&self) -> String { + format!("ws://localhost:{}/ws", self.port) + } + + pub fn send( + &self, + change: BookChanged, + ) -> Result<(), tokio::sync::broadcast::error::SendError> { + self.sender.send(change).map(|_| ()) + } } +#[axum_macros::debug_handler] async fn graphql_handler(schema: Extension, req: GraphQLRequest) -> GraphQLResponse { schema.execute(req.into_inner()).await.into() } From 853d0796d56ea789a66297145b3cd973ce7c4ec8 Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Sat, 10 Feb 2024 13:00:46 +0000 Subject: [PATCH 08/10] fix the test --- src/next/stream.rs | 2 +- tests/cynic-tests.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/next/stream.rs b/src/next/stream.rs index f567741..2c81959 100644 --- a/src/next/stream.rs +++ b/src/next/stream.rs @@ -114,7 +114,7 @@ async fn producer_handler( } } ProducerState::Draining(mut stream) => { - return Some((stream.next().await?, ProducerState::Draining(stream))) + return Some((stream.next().await?, ProducerState::Draining(stream))); } } } diff --git a/tests/cynic-tests.rs b/tests/cynic-tests.rs index 00673d8..5441c47 100644 --- a/tests/cynic-tests.rs +++ b/tests/cynic-tests.rs @@ -142,8 +142,6 @@ async fn oneshot_operation_test() { .await .unwrap(); - sleep(Duration::from_millis(100)).await; - let updates = [ subscription_server::BookChanged { id: "123".into(), @@ -161,6 +159,7 @@ async fn oneshot_operation_test() { futures::join!( async { + sleep(Duration::from_millis(10)).await; for update in &updates { server.send(update.to_owned()).unwrap(); } From 00260dd2e04a6bb4ea4e128550567cf5578d3ef5 Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Sat, 10 Feb 2024 13:09:18 +0000 Subject: [PATCH 09/10] one shot examples --- examples/examples/mulitiple-subscriptions.rs | 85 +++++++++++++++++++ ...ubscriptions.rs => single-subscription.rs} | 16 ++-- 2 files changed, 91 insertions(+), 10 deletions(-) create mode 100644 examples/examples/mulitiple-subscriptions.rs rename examples/examples/{subscriptions.rs => single-subscription.rs} (76%) diff --git a/examples/examples/mulitiple-subscriptions.rs b/examples/examples/mulitiple-subscriptions.rs new file mode 100644 index 0000000..9c0c7d0 --- /dev/null +++ b/examples/examples/mulitiple-subscriptions.rs @@ -0,0 +1,85 @@ +//! An example of running a multiple subscriptions on a single connection +//! using `graphql-ws-client` and `async-tungstenite` +//! +//! Talks to the the tide subscription example in `async-graphql` + +mod schema { + cynic::use_schema!("../schemas/books.graphql"); +} + +#[derive(cynic::QueryFragment, Debug)] +#[cynic(schema_path = "../schemas/books.graphql", graphql_type = "Book")] +#[allow(dead_code)] +struct Book { + id: String, + name: String, + author: String, +} + +#[derive(cynic::QueryFragment, Debug)] +#[cynic(schema_path = "../schemas/books.graphql", graphql_type = "BookChanged")] +#[allow(dead_code)] +struct BookChanged { + id: cynic::Id, + book: Option, +} + +#[derive(cynic::QueryFragment, Debug)] +#[cynic( + schema_path = "../schemas/books.graphql", + graphql_type = "SubscriptionRoot" +)] +#[allow(dead_code)] +struct BooksChangedSubscription { + books: BookChanged, +} + +#[async_std::main] +async fn main() { + use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue}; + use futures::StreamExt; + use graphql_ws_client::CynicClientBuilder; + + let mut request = "ws://localhost:8000/graphql".into_client_request().unwrap(); + request.headers_mut().insert( + "Sec-WebSocket-Protocol", + HeaderValue::from_str("graphql-transport-ws").unwrap(), + ); + + let (connection, _) = async_tungstenite::async_std::connect_async(request) + .await + .unwrap(); + + println!("Connected"); + + let (sink, stream) = connection.split(); + + let mut client = CynicClientBuilder::new() + .build(stream, sink, async_executors::AsyncStd) + .await + .unwrap(); + + // In reality you'd probably want to different subscriptions, but for the sake of this example + // these are the same subscriptions + let mut first_subscription = client.streaming_operation(build_query()).await.unwrap(); + let mut second_subscription = client.streaming_operation(build_query()).await.unwrap(); + + futures::join!( + async move { + while let Some(item) = first_subscription.next().await { + println!("{:?}", item); + } + }, + async move { + while let Some(item) = second_subscription.next().await { + println!("{:?}", item); + } + } + ); +} + +fn build_query() -> cynic::StreamingOperation { + use cynic::SubscriptionBuilder; + + BooksChangedSubscription::build(()) +} diff --git a/examples/examples/subscriptions.rs b/examples/examples/single-subscription.rs similarity index 76% rename from examples/examples/subscriptions.rs rename to examples/examples/single-subscription.rs index 6366151..6406f1a 100644 --- a/examples/examples/subscriptions.rs +++ b/examples/examples/single-subscription.rs @@ -1,5 +1,5 @@ -//! An example of using subscriptions with `graphql-ws-client` and -//! `async-tungstenite` +//! An example of creating a connection and running a single subscription on it +//! using `graphql-ws-client` and `async-tungstenite` //! //! Talks to the the tide subscription example in `async-graphql` @@ -38,7 +38,7 @@ struct BooksChangedSubscription { async fn main() { use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue}; use futures::StreamExt; - use graphql_ws_client::CynicClientBuilder; + use graphql_ws_client::next::Client; let mut request = "ws://localhost:8000/graphql".into_client_request().unwrap(); request.headers_mut().insert( @@ -52,16 +52,12 @@ async fn main() { println!("Connected"); - let (sink, stream) = connection.split(); - - let mut client = CynicClientBuilder::new() - .build(stream, sink, async_executors::AsyncStd) + let mut subscription = Client::build(connection) + .streaming_operation(build_query()) .await .unwrap(); - let mut stream = client.streaming_operation(build_query()).await.unwrap(); - println!("Running subscription"); - while let Some(item) = stream.next().await { + while let Some(item) = subscription.next().await { println!("{:?}", item); } } From 72f1ea6bdc43dab966f2c04f5339eb494664b9c1 Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Sat, 10 Feb 2024 13:24:51 +0000 Subject: [PATCH 10/10] changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48f2805..c049222 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ all APIs might be changed. ## Unreleased - xxxx-xx-xx +### New Features + +- Added a `streaming_operation` function to `next::ClientBuilder` to make + creating a single subscription on a given connection easier. + ## v0.8.0-alpha.2 - 2024-01-30 ### Breaking Changes