Skip to content

Commit

Permalink
An easier one shot subscribe API (#65)
Browse files Browse the repository at this point in the history
`graphql-ws-client` was designed to allow multiple subscriptions to be
run on a single connection. This is still something I want to support,
but it's also common for users to only want to run a single subscription
per connection. Right now the API for doing that is a bit too verbose.

This PR will introduce a `streaming_operation` function on the
`ClientBuilder` to handle this use case, combining the connect,
subscribe and actor spawning into a single function call.

Todo:
- [x] Examples
- [x] Tests
- [x] Changelog
  • Loading branch information
obmarg committed Feb 10, 2024
1 parent 985e773 commit d27829b
Show file tree
Hide file tree
Showing 12 changed files with 447 additions and 63 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ all APIs might be changed.

## Unreleased - xxxx-xx-xx

### New Features

- Added a `streaming_operation` function to `next::ClientBuilder` to make
creating a single subscription on a given connection easier.

## v0.8.0-alpha.2 - 2024-01-30

### Breaking Changes
Expand Down
12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ pharos = { version = "0.5.2", optional = true }

[dev-dependencies]
assert_matches = "1.5"
async-graphql = "5.0.10"
async-graphql-axum = "5"
async-graphql = "7.0.1"
async-graphql-axum = "7"
async-tungstenite = { version = "0.24", features = ["tokio-runtime"] }
axum = "0.6"
axum = "0.7"
axum-macros = "0.4"
cynic = { version = "3" }
insta = "1.11"
tokio = { version = "1", features = ["macros"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-stream = { version = "0.1", features = ["sync"] }

graphql-ws-client.path = "."
graphql-ws-client.features = ["client-cynic", "client-graphql-client", "async-tungstenite"]
4 changes: 2 additions & 2 deletions examples-wasm/examples/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::future::IntoFuture;

use graphql_ws_client::{next::ClientBuilder, ws_stream_wasm::Connection};
use graphql_ws_client::{next::Client, ws_stream_wasm::Connection};

mod schema {
cynic::use_schema!("../schemas/books.graphql");
Expand Down Expand Up @@ -57,7 +57,7 @@ async fn main() {

let connection = Connection::new(ws_conn).await;

let (mut client, actor) = ClientBuilder::new().build(connection).await.unwrap();
let (mut client, actor) = Client::build(connection).await.unwrap();
wasm_bindgen_futures::spawn_local(actor.into_future());

let mut stream = client.streaming_operation(build_query()).await.unwrap();
Expand Down
85 changes: 85 additions & 0 deletions examples/examples/mulitiple-subscriptions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! An example of running a multiple subscriptions on a single connection
//! using `graphql-ws-client` and `async-tungstenite`
//!
//! Talks to the the tide subscription example in `async-graphql`

mod schema {
cynic::use_schema!("../schemas/books.graphql");
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(schema_path = "../schemas/books.graphql", graphql_type = "Book")]
#[allow(dead_code)]
struct Book {
id: String,
name: String,
author: String,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(schema_path = "../schemas/books.graphql", graphql_type = "BookChanged")]
#[allow(dead_code)]
struct BookChanged {
id: cynic::Id,
book: Option<Book>,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(
schema_path = "../schemas/books.graphql",
graphql_type = "SubscriptionRoot"
)]
#[allow(dead_code)]
struct BooksChangedSubscription {
books: BookChanged,
}

#[async_std::main]
async fn main() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;
use graphql_ws_client::CynicClientBuilder;

let mut request = "ws://localhost:8000/graphql".into_client_request().unwrap();
request.headers_mut().insert(
"Sec-WebSocket-Protocol",
HeaderValue::from_str("graphql-transport-ws").unwrap(),
);

let (connection, _) = async_tungstenite::async_std::connect_async(request)
.await
.unwrap();

println!("Connected");

let (sink, stream) = connection.split();

let mut client = CynicClientBuilder::new()
.build(stream, sink, async_executors::AsyncStd)
.await
.unwrap();

// In reality you'd probably want to different subscriptions, but for the sake of this example
// these are the same subscriptions
let mut first_subscription = client.streaming_operation(build_query()).await.unwrap();
let mut second_subscription = client.streaming_operation(build_query()).await.unwrap();

futures::join!(
async move {
while let Some(item) = first_subscription.next().await {
println!("{:?}", item);
}
},
async move {
while let Some(item) = second_subscription.next().await {
println!("{:?}", item);
}
}
);
}

fn build_query() -> cynic::StreamingOperation<BooksChangedSubscription> {
use cynic::SubscriptionBuilder;

BooksChangedSubscription::build(())
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! An example of using subscriptions with `graphql-ws-client` and
//! `async-tungstenite`
//! An example of creating a connection and running a single subscription on it
//! using `graphql-ws-client` and `async-tungstenite`
//!
//! Talks to the the tide subscription example in `async-graphql`

Expand Down Expand Up @@ -38,7 +38,7 @@ struct BooksChangedSubscription {
async fn main() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;
use graphql_ws_client::CynicClientBuilder;
use graphql_ws_client::next::Client;

let mut request = "ws://localhost:8000/graphql".into_client_request().unwrap();
request.headers_mut().insert(
Expand All @@ -52,16 +52,12 @@ async fn main() {

println!("Connected");

let (sink, stream) = connection.split();

let mut client = CynicClientBuilder::new()
.build(stream, sink, async_executors::AsyncStd)
let mut subscription = Client::build(connection)
.streaming_operation(build_query())
.await
.unwrap();

let mut stream = client.streaming_operation(build_query()).await.unwrap();
println!("Running subscription");
while let Some(item) = stream.next().await {
while let Some(item) = subscription.next().await {
println!("{:?}", item);
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/doc_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{next::Message, Error};

pub struct Conn;

#[async_trait::async_trait]
impl crate::next::Connection for Conn {
async fn receive(&mut self) -> Option<Message> {
unimplemented!()
}

async fn send(&mut self, _: Message) -> Result<(), Error> {
unimplemented!()
}
}

#[derive(serde::Serialize)]
pub struct Subscription;

impl crate::graphql::GraphqlOperation for Subscription {
type Response = ();

type Error = crate::Error;

fn decode(&self, _data: serde_json::Value) -> Result<Self::Response, Self::Error> {
unimplemented!()
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ mod client;
mod logging;
mod protocol;

#[doc(hidden)]
#[path = "doc_utils.rs"]
pub mod __doc_utils;

pub mod graphql;
pub mod websockets;

Expand Down
126 changes: 104 additions & 22 deletions src/next/builder.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,59 @@
use std::collections::HashMap;
use std::{collections::HashMap, future::IntoFuture};

use futures::channel::mpsc;
use futures::{channel::mpsc, future::BoxFuture, stream::BoxStream, FutureExt, StreamExt};
use serde::Serialize;

use crate::{logging::trace, protocol::Event, Error};
use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error};

use super::{
actor::ConnectionActor,
connection::{Connection, Message},
Client,
Client, SubscriptionStream,
};

/// A websocket client builder
#[derive(Default)]
/// Builder for Clients.
///
/// ```rust
/// use graphql_ws_client::next::{Client};
/// use std::future::IntoFuture;
/// #
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// let (client, actor) = Client::build(connection).await?;
/// # Ok(())
/// # }
/// ```
pub struct ClientBuilder {
payload: Option<serde_json::Value>,
subscription_buffer_size: Option<usize>,
connection: Box<dyn Connection + Send>,
}

impl ClientBuilder {
/// Constructs an AsyncWebsocketClientBuilder
pub fn new() -> ClientBuilder {
ClientBuilder::default()
impl super::Client {
/// Creates a ClientBuilder with the given connection.
///
/// ```rust
/// use graphql_ws_client::next::{Client};
/// use std::future::IntoFuture;
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// let (client, actor) = Client::build(connection).await?;
/// # Ok(())
/// # }
/// ```
pub fn build<Conn>(connection: Conn) -> ClientBuilder
where
Conn: Connection + Send + 'static,
{
ClientBuilder {
payload: None,
subscription_buffer_size: None,
connection: Box::new(connection),
}
}
}

impl ClientBuilder {
/// Add payload to `connection_init`
pub fn payload<NewPayload>(self, payload: NewPayload) -> Result<ClientBuilder, Error>
where
Expand All @@ -44,6 +74,61 @@ impl ClientBuilder {
..self
}
}

/// Initialise a Client and use it to run a single streaming operation
///
/// ```rust
/// use graphql_ws_client::next::{Client};
/// use std::future::IntoFuture;
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// # let subscription = graphql_ws_client::__doc_utils::Subscription;
/// let stream = Client::build(connection).streaming_operation(subscription).await?;
/// # Ok(())
/// # }
/// ```
///
/// Note that this takes ownership of the client, so it cannot be
/// used to run any more operations.
///
/// If users want to run mutliple operations on a connection they
/// should use the `IntoFuture` impl to construct a `Client`
pub async fn streaming_operation<'a, Operation>(
self,
op: Operation,
) -> Result<SubscriptionStream<Operation>, Error>
where
Operation: GraphqlOperation + Unpin + Send + 'static,
{
let (mut client, actor) = self.await?;

let mut actor_future = actor.into_future().fuse();

let subscribe_future = client.streaming_operation(op).fuse();
futures::pin_mut!(subscribe_future);

// Temporarily run actor_future while we start the subscription
let stream = futures::select! {
() = actor_future => {
return Err(Error::Unknown("actor ended before subscription started".into()))
},
result = subscribe_future => {
result?
}
};

Ok(stream.join(actor_future))
}
}

impl IntoFuture for ClientBuilder {
type Output = Result<(Client, ConnectionActor), Error>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(self.build())
}
}

impl ClientBuilder {
Expand All @@ -52,18 +137,14 @@ impl ClientBuilder {
/// Accepts an already built websocket connection, and returns the connection
/// and a future that must be awaited somewhere - if the future is dropped the
/// connection will also drop.
pub async fn build<Conn>(self, connection: Conn) -> Result<(Client, ConnectionActor), Error>
where
Conn: Connection + Send + 'static,
{
self.build_impl(Box::new(connection)).await
}
pub async fn build(self) -> Result<(Client, ConnectionActor), Error> {
let Self {
payload,
subscription_buffer_size,
mut connection,
} = self;

async fn build_impl(
self,
mut connection: Box<dyn Connection + Send>,
) -> Result<(Client, ConnectionActor), Error> {
connection.send(Message::init(self.payload)).await?;
connection.send(Message::init(payload)).await?;

// wait for ack before entering receiver loop:
loop {
Expand Down Expand Up @@ -108,7 +189,8 @@ impl ClientBuilder {

let actor = ConnectionActor::new(connection, command_receiver);

let client = Client::new(command_sender, self.subscription_buffer_size.unwrap_or(5));
let client =
Client::new_internal(command_sender, self.subscription_buffer_size.unwrap_or(5));

Ok((client, actor))
}
Expand Down
2 changes: 1 addition & 1 deletion src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Client {
}

impl Client {
pub(super) fn new(
pub(super) fn new_internal(
actor: mpsc::Sender<ConnectionCommand>,
subscription_buffer_size: usize,
) -> Self {
Expand Down
Loading

0 comments on commit d27829b

Please sign in to comment.