Skip to content

Commit

Permalink
refactor: new clientbuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
carlocorradini committed Jun 11, 2024
1 parent a167a2a commit 81231df
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 17 deletions.
2 changes: 1 addition & 1 deletion examples-wasm/examples/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn main() {

let connection = Connection::new(ws_conn).await;

let (client, actor) = Client::build(connection).await.unwrap();
let (client, actor) = Client::builder().build(connection).await.unwrap();
wasm_bindgen_futures::spawn_local(actor.into_future());

let mut stream = client.subscribe(build_query()).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/cynic-mulitiple-subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() {

println!("Connected");

let (client, actor) = Client::build(connection).await.unwrap();
let (client, actor) = Client::builder().build(connection).await.unwrap();
async_std::task::spawn(actor.into_future());

// In reality you'd probably want to different subscriptions, but for the sake of this example
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/cynic-single-subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ async fn main() {

println!("Connected");

let mut subscription = Client::build(connection)
.subscribe(build_query())
let mut subscription = Client::builder()
.subscribe(connection, build_query())
.await
.unwrap();

Expand Down
9 changes: 5 additions & 4 deletions examples/examples/graphql-client-single-subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ async fn main() {

println!("Connected");

let mut subscription = Client::build(connection)
.subscribe(StreamingOperation::<BooksChanged>::new(
books_changed::Variables,
))
let mut subscription = Client::builder()
.subscribe(
connection,
StreamingOperation::<BooksChanged>::new(books_changed::Variables),
)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() {

println!("Connected");

let (client, actor) = Client::build(connection).await.unwrap();
let (client, actor) = Client::builder().build(connection).await.unwrap();
tokio::spawn(actor.into_future());

let mut stream = client.subscribe(build_query()).await.unwrap();
Expand Down
217 changes: 217 additions & 0 deletions src/next/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,221 @@ use super::{
Client, Subscription,
};

const DEFAULT_SUBSCRIPTION_BUFFER_SIZE: usize = 5;

pub mod next {
use super::{
run_startup, Client, Connection, ConnectionActor, Event, KeepAliveSettings, Message,
Subscription, DEFAULT_SUBSCRIPTION_BUFFER_SIZE,
};
use crate::{graphql::GraphqlOperation, logging::trace, Error};
use serde::Serialize;
use std::{future::IntoFuture, time::Duration};

/// Builder for Graphql over Websocket clients
///
/// This can be used to configure the client prior to construction, but can also create
/// subscriptions directly in the case where users only need to run one per connection.
///
/// ```rust
/// use graphql_ws_client::{Client, ClientBuilder};
/// #
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// let (client, actor) = ClientBuilder::new().build(connection).await?;
/// // or
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// let (client, actor) = Client::builder().build(connection).await?;
/// # Ok(())
/// # }
/// ```
#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
payload: Option<serde_json::Value>,
subscription_buffer_size: Option<usize>,
keep_alive: KeepAliveSettings,
}

impl super::Client {
/// Creates a ClientBuilder.
///
/// Same as calling `ClientBuilder::new()`.
///
/// ```rust
/// use graphql_ws_client::{Client, ClientBuilder};
/// #
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// let (client, actor) = Client::builder().build(connection).await?;
/// # Ok(())
/// # }
/// ```
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
}

impl ClientBuilder {
/// Creates a ClientBuilder.
pub fn new() -> Self {
Self::default()
}

/// Add payload to `connection_init`
pub fn payload(self, payload: impl Serialize) -> Result<Self, Error> {
Ok(Self {
payload: Some(
serde_json::to_value(payload)
.map_err(|error| Error::Serializing(error.to_string()))?,
),
..self
})
}

/// Sets the size of the incoming message buffer that subscriptions created by this client will
/// use
pub fn subscription_buffer_size(self, new: usize) -> Self {
ClientBuilder {
subscription_buffer_size: Some(new),
..self
}
}

/// Sets the interval between keep alives.
///
/// Any incoming messages automatically reset this interval so keep alives may not be sent
/// on busy connections even if this is set.
pub fn keep_alive_interval(mut self, new: Duration) -> Self {
self.keep_alive.interval = Some(new);
self
}

/// The number of keepalive retries before a connection is considered broken.
///
/// This defaults to 3, but has no effect if `keep_alive_interval` is not called.
pub fn keep_alive_retries(mut self, count: usize) -> Self {
self.keep_alive.retries = count;
self
}

/// Initialize a Client and use it to run a single subscription
///
/// ```rust
/// use graphql_ws_client::{Client, ClientBuilder};
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// # let subscription = graphql_ws_client::__doc_utils::Subscription;
/// let stream = ClientBuilder::new().subscribe(connection, subscription).await?;
/// // or
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// # let subscription = graphql_ws_client::__doc_utils::Subscription;
/// let stream = Client::builder().subscribe(connection, subscription).await?;
/// # Ok(())
/// # }
/// ```
///
/// Note that this takes ownership of the client, so it cannot be
/// used to run any more operations.
///
/// If users want to run multiple operations on a connection they
/// should `build` the `Client`.
pub async fn subscribe<Conn, Operation>(
self,
connection: Conn,
operation: Operation,
) -> Result<Subscription<Operation>, Error>
where
Conn: Connection + Send + 'static,
Operation: GraphqlOperation + Unpin + Send + 'static,
{
let (client, actor) = self.build(connection).await?;

let actor_future = actor.into_future();
let subscribe_future = client.subscribe(operation);

let (stream, actor_future) = run_startup(subscribe_future, actor_future).await?;

Ok(stream.join(actor_future))
}

/// Constructs a Client
///
/// Accepts an already built websocket connection, and returns the connection
/// and a future that must be awaited somewhere - if the future is dropped the
/// connection will also drop.
pub async fn build<Conn>(
self,
mut connection: Conn,
) -> Result<(Client, ConnectionActor), Error>
where
Conn: Connection + Send + 'static,
{
let Self {
payload,
subscription_buffer_size,
keep_alive,
} = self;
let subscription_buffer_size =
subscription_buffer_size.unwrap_or(DEFAULT_SUBSCRIPTION_BUFFER_SIZE);

connection.send(Message::init(payload)).await?;

// Wait for ack before entering receiver loop:
loop {
match connection.receive().await {
None => return Err(Error::Unknown("connection dropped".into())),
Some(Message::Close { code, reason }) => {
return Err(Error::Close(
code.unwrap_or_default(),
reason.unwrap_or_default(),
))
}
Some(Message::Ping) | Some(Message::Pong) => {}
Some(message @ Message::Text(_)) => {
let event = message.deserialize::<Event>()?;
match event {
// Pings can be sent at any time
Event::Ping { .. } => {
connection.send(Message::graphql_pong()).await?;
}
Event::Pong { .. } => {}
Event::ConnectionAck { .. } => {
// Handshake completed, ready to enter main receiver loop
trace!("connection_ack received, handshake completed");
break;
}
event => {
connection
.send(Message::Close {
code: Some(4950),
reason: Some(
"Unexpected message while waiting for ack".into(),
),
})
.await
.ok();
return Err(Error::Decode(format!(
"expected a connection_ack or ping, got {}",
event.r#type()
)));
}
}
}
}
}

let (command_sender, command_receiver) =
async_channel::bounded(subscription_buffer_size);

let actor = ConnectionActor::new(Box::new(connection), command_receiver, keep_alive);

let client = Client::new_internal(command_sender, subscription_buffer_size);

Ok((client, actor))
}
}
}

/// Builder for Graphql over Websocket clients
///
/// This can be used to configure the client prior to construction, but can also create
Expand All @@ -31,6 +246,7 @@ use super::{
/// # Ok(())
/// # }
/// ```
#[deprecated(since = "0.11.0", note = "use Client::builder() instead")]
pub struct ClientBuilder {
payload: Option<serde_json::Value>,
subscription_buffer_size: Option<usize>,
Expand All @@ -50,6 +266,7 @@ impl super::Client {
/// # Ok(())
/// # }
/// ```
#[deprecated(since = "0.11.0", note = "use Client::builder() instead")]
pub fn build<Conn>(connection: Conn) -> ClientBuilder
where
Conn: Connection + Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion src/next/keepalive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures_lite::{stream, Stream};

use crate::ConnectionCommand;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub(super) struct KeepAliveSettings {
/// How often to send a keep alive ping
pub(super) interval: Option<Duration>,
Expand Down
2 changes: 1 addition & 1 deletion src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod stream;

pub use self::{
actor::ConnectionActor,
builder::ClientBuilder,
builder::next::ClientBuilder,
connection::{Connection, Message},
stream::Subscription,
};
Expand Down
9 changes: 6 additions & 3 deletions tests/cynic-tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ async fn main_test() {

println!("Connected");

let (client, actor) = graphql_ws_client::Client::build(connection).await.unwrap();
let (client, actor) = graphql_ws_client::Client::builder()
.build(connection)
.await
.unwrap();

tokio::spawn(actor.into_future());

Expand Down Expand Up @@ -135,8 +138,8 @@ async fn oneshot_operation_test() {

println!("Connected");

let stream = graphql_ws_client::Client::build(connection)
.subscribe(build_query())
let stream = graphql_ws_client::Client::builder()
.subscribe(connection, build_query())
.await
.unwrap();

Expand Down
9 changes: 6 additions & 3 deletions tests/graphql-client-tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ async fn main_test() {

println!("Connected");

let (client, actor) = graphql_ws_client::Client::build(connection).await.unwrap();
let (client, actor) = graphql_ws_client::Client::builder()
.build(connection)
.await
.unwrap();

tokio::spawn(actor.into_future());

Expand Down Expand Up @@ -100,8 +103,8 @@ async fn oneshot_operation_test() {

println!("Connected");

let stream = graphql_ws_client::Client::build(connection)
.subscribe(build_query())
let stream = graphql_ws_client::Client::builder()
.subscribe(connection, build_query())
.await
.unwrap();

Expand Down

0 comments on commit 81231df

Please sign in to comment.