diff --git a/examples-wasm/examples/subscriptions.rs b/examples-wasm/examples/subscriptions.rs index 4548f96..1b4ea5b 100644 --- a/examples-wasm/examples/subscriptions.rs +++ b/examples-wasm/examples/subscriptions.rs @@ -57,7 +57,7 @@ async fn main() { let connection = Connection::new(ws_conn).await; - let (client, actor) = Client::build(connection).await.unwrap(); + let (client, actor) = Client::builder().build(connection).await.unwrap(); wasm_bindgen_futures::spawn_local(actor.into_future()); let mut stream = client.subscribe(build_query()).await.unwrap(); diff --git a/examples/examples/cynic-mulitiple-subscriptions.rs b/examples/examples/cynic-mulitiple-subscriptions.rs index da06220..91dba0f 100644 --- a/examples/examples/cynic-mulitiple-subscriptions.rs +++ b/examples/examples/cynic-mulitiple-subscriptions.rs @@ -55,7 +55,7 @@ async fn main() { println!("Connected"); - let (client, actor) = Client::build(connection).await.unwrap(); + let (client, actor) = Client::builder().build(connection).await.unwrap(); async_std::task::spawn(actor.into_future()); // In reality you'd probably want to different subscriptions, but for the sake of this example diff --git a/examples/examples/cynic-single-subscription.rs b/examples/examples/cynic-single-subscription.rs index a0ba83c..33771f3 100644 --- a/examples/examples/cynic-single-subscription.rs +++ b/examples/examples/cynic-single-subscription.rs @@ -52,8 +52,8 @@ async fn main() { println!("Connected"); - let mut subscription = Client::build(connection) - .subscribe(build_query()) + let mut subscription = Client::builder() + .subscribe(connection, build_query()) .await .unwrap(); diff --git a/examples/examples/graphql-client-single-subscription.rs b/examples/examples/graphql-client-single-subscription.rs index b83ac01..3ebe8f1 100644 --- a/examples/examples/graphql-client-single-subscription.rs +++ b/examples/examples/graphql-client-single-subscription.rs @@ -32,10 +32,11 @@ async fn main() { println!("Connected"); - let mut subscription = Client::build(connection) - .subscribe(StreamingOperation::::new( - books_changed::Variables, - )) + let mut subscription = Client::builder() + .subscribe( + connection, + StreamingOperation::::new(books_changed::Variables), + ) .await .unwrap(); diff --git a/examples/examples/tokio.rs b/examples/examples/tokio.rs index e6e64b8..9e92415 100644 --- a/examples/examples/tokio.rs +++ b/examples/examples/tokio.rs @@ -54,7 +54,7 @@ async fn main() { println!("Connected"); - let (client, actor) = Client::build(connection).await.unwrap(); + let (client, actor) = Client::builder().build(connection).await.unwrap(); tokio::spawn(actor.into_future()); let mut stream = client.subscribe(build_query()).await.unwrap(); diff --git a/src/next/builder.rs b/src/next/builder.rs index c27b5b7..e447ead 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -16,6 +16,221 @@ use super::{ Client, Subscription, }; +const DEFAULT_SUBSCRIPTION_BUFFER_SIZE: usize = 5; + +pub mod next { + use super::{ + run_startup, Client, Connection, ConnectionActor, Event, KeepAliveSettings, Message, + Subscription, DEFAULT_SUBSCRIPTION_BUFFER_SIZE, + }; + use crate::{graphql::GraphqlOperation, logging::trace, Error}; + use serde::Serialize; + use std::{future::IntoFuture, time::Duration}; + + /// Builder for Graphql over Websocket clients + /// + /// This can be used to configure the client prior to construction, but can also create + /// subscriptions directly in the case where users only need to run one per connection. + /// + /// ```rust + /// use graphql_ws_client::{Client, ClientBuilder}; + /// # + /// # async fn example() -> Result<(), graphql_ws_client::Error> { + /// # let connection = graphql_ws_client::__doc_utils::Conn; + /// let (client, actor) = ClientBuilder::new().build(connection).await?; + /// // or + /// # let connection = graphql_ws_client::__doc_utils::Conn; + /// let (client, actor) = Client::builder().build(connection).await?; + /// # Ok(()) + /// # } + /// ``` + #[derive(Clone, Debug, Default)] + pub struct ClientBuilder { + payload: Option, + subscription_buffer_size: Option, + keep_alive: KeepAliveSettings, + } + + impl super::Client { + /// Creates a ClientBuilder. + /// + /// Same as calling `ClientBuilder::new()`. + /// + /// ```rust + /// use graphql_ws_client::{Client, ClientBuilder}; + /// # + /// # async fn example() -> Result<(), graphql_ws_client::Error> { + /// # let connection = graphql_ws_client::__doc_utils::Conn; + /// let (client, actor) = Client::builder().build(connection).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn builder() -> ClientBuilder { + ClientBuilder::new() + } + } + + impl ClientBuilder { + /// Creates a ClientBuilder. + pub fn new() -> Self { + Self::default() + } + + /// Add payload to `connection_init` + pub fn payload(self, payload: impl Serialize) -> Result { + Ok(Self { + payload: Some( + serde_json::to_value(payload) + .map_err(|error| Error::Serializing(error.to_string()))?, + ), + ..self + }) + } + + /// Sets the size of the incoming message buffer that subscriptions created by this client will + /// use + pub fn subscription_buffer_size(self, new: usize) -> Self { + ClientBuilder { + subscription_buffer_size: Some(new), + ..self + } + } + + /// Sets the interval between keep alives. + /// + /// Any incoming messages automatically reset this interval so keep alives may not be sent + /// on busy connections even if this is set. + pub fn keep_alive_interval(mut self, new: Duration) -> Self { + self.keep_alive.interval = Some(new); + self + } + + /// The number of keepalive retries before a connection is considered broken. + /// + /// This defaults to 3, but has no effect if `keep_alive_interval` is not called. + pub fn keep_alive_retries(mut self, count: usize) -> Self { + self.keep_alive.retries = count; + self + } + + /// Initialize a Client and use it to run a single subscription + /// + /// ```rust + /// use graphql_ws_client::{Client, ClientBuilder}; + /// # async fn example() -> Result<(), graphql_ws_client::Error> { + /// # let connection = graphql_ws_client::__doc_utils::Conn; + /// # let subscription = graphql_ws_client::__doc_utils::Subscription; + /// let stream = ClientBuilder::new().subscribe(connection, subscription).await?; + /// // or + /// # let connection = graphql_ws_client::__doc_utils::Conn; + /// # let subscription = graphql_ws_client::__doc_utils::Subscription; + /// let stream = Client::builder().subscribe(connection, subscription).await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// Note that this takes ownership of the client, so it cannot be + /// used to run any more operations. + /// + /// If users want to run multiple operations on a connection they + /// should `build` the `Client`. + pub async fn subscribe( + self, + connection: Conn, + operation: Operation, + ) -> Result, Error> + where + Conn: Connection + Send + 'static, + Operation: GraphqlOperation + Unpin + Send + 'static, + { + let (client, actor) = self.build(connection).await?; + + let actor_future = actor.into_future(); + let subscribe_future = client.subscribe(operation); + + let (stream, actor_future) = run_startup(subscribe_future, actor_future).await?; + + Ok(stream.join(actor_future)) + } + + /// Constructs a Client + /// + /// Accepts an already built websocket connection, and returns the connection + /// and a future that must be awaited somewhere - if the future is dropped the + /// connection will also drop. + pub async fn build( + self, + mut connection: Conn, + ) -> Result<(Client, ConnectionActor), Error> + where + Conn: Connection + Send + 'static, + { + let Self { + payload, + subscription_buffer_size, + keep_alive, + } = self; + let subscription_buffer_size = + subscription_buffer_size.unwrap_or(DEFAULT_SUBSCRIPTION_BUFFER_SIZE); + + connection.send(Message::init(payload)).await?; + + // Wait for ack before entering receiver loop: + loop { + match connection.receive().await { + None => return Err(Error::Unknown("connection dropped".into())), + Some(Message::Close { code, reason }) => { + return Err(Error::Close( + code.unwrap_or_default(), + reason.unwrap_or_default(), + )) + } + Some(Message::Ping) | Some(Message::Pong) => {} + Some(message @ Message::Text(_)) => { + let event = message.deserialize::()?; + match event { + // Pings can be sent at any time + Event::Ping { .. } => { + connection.send(Message::graphql_pong()).await?; + } + Event::Pong { .. } => {} + Event::ConnectionAck { .. } => { + // Handshake completed, ready to enter main receiver loop + trace!("connection_ack received, handshake completed"); + break; + } + event => { + connection + .send(Message::Close { + code: Some(4950), + reason: Some( + "Unexpected message while waiting for ack".into(), + ), + }) + .await + .ok(); + return Err(Error::Decode(format!( + "expected a connection_ack or ping, got {}", + event.r#type() + ))); + } + } + } + } + } + + let (command_sender, command_receiver) = + async_channel::bounded(subscription_buffer_size); + + let actor = ConnectionActor::new(Box::new(connection), command_receiver, keep_alive); + + let client = Client::new_internal(command_sender, subscription_buffer_size); + + Ok((client, actor)) + } + } +} + /// Builder for Graphql over Websocket clients /// /// This can be used to configure the client prior to construction, but can also create @@ -31,6 +246,7 @@ use super::{ /// # Ok(()) /// # } /// ``` +#[deprecated(since = "0.11.0", note = "use Client::builder() instead")] pub struct ClientBuilder { payload: Option, subscription_buffer_size: Option, @@ -50,6 +266,7 @@ impl super::Client { /// # Ok(()) /// # } /// ``` + #[deprecated(since = "0.11.0", note = "use Client::builder() instead")] pub fn build(connection: Conn) -> ClientBuilder where Conn: Connection + Send + 'static, diff --git a/src/next/keepalive.rs b/src/next/keepalive.rs index 752e31f..5fe35fc 100644 --- a/src/next/keepalive.rs +++ b/src/next/keepalive.rs @@ -4,7 +4,7 @@ use futures_lite::{stream, Stream}; use crate::ConnectionCommand; -#[derive(Clone)] +#[derive(Clone, Debug)] pub(super) struct KeepAliveSettings { /// How often to send a keep alive ping pub(super) interval: Option, diff --git a/src/next/mod.rs b/src/next/mod.rs index f535927..83aed4e 100644 --- a/src/next/mod.rs +++ b/src/next/mod.rs @@ -24,7 +24,7 @@ mod stream; pub use self::{ actor::ConnectionActor, - builder::ClientBuilder, + builder::next::ClientBuilder, connection::{Connection, Message}, stream::Subscription, }; diff --git a/tests/cynic-tests.rs b/tests/cynic-tests.rs index af5d4d1..ed3974f 100644 --- a/tests/cynic-tests.rs +++ b/tests/cynic-tests.rs @@ -72,7 +72,10 @@ async fn main_test() { println!("Connected"); - let (client, actor) = graphql_ws_client::Client::build(connection).await.unwrap(); + let (client, actor) = graphql_ws_client::Client::builder() + .build(connection) + .await + .unwrap(); tokio::spawn(actor.into_future()); @@ -135,8 +138,8 @@ async fn oneshot_operation_test() { println!("Connected"); - let stream = graphql_ws_client::Client::build(connection) - .subscribe(build_query()) + let stream = graphql_ws_client::Client::builder() + .subscribe(connection, build_query()) .await .unwrap(); diff --git a/tests/graphql-client-tests.rs b/tests/graphql-client-tests.rs index 8cd6d2f..e4300ae 100644 --- a/tests/graphql-client-tests.rs +++ b/tests/graphql-client-tests.rs @@ -37,7 +37,10 @@ async fn main_test() { println!("Connected"); - let (client, actor) = graphql_ws_client::Client::build(connection).await.unwrap(); + let (client, actor) = graphql_ws_client::Client::builder() + .build(connection) + .await + .unwrap(); tokio::spawn(actor.into_future()); @@ -100,8 +103,8 @@ async fn oneshot_operation_test() { println!("Connected"); - let stream = graphql_ws_client::Client::build(connection) - .subscribe(build_query()) + let stream = graphql_ws_client::Client::builder() + .subscribe(connection, build_query()) .await .unwrap();