Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An easier one shot subscribe API #65

Merged
merged 10 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ all APIs might be changed.

## Unreleased - xxxx-xx-xx

### New Features

- Added a `streaming_operation` function to `next::ClientBuilder` to make
creating a single subscription on a given connection easier.

## v0.8.0-alpha.2 - 2024-01-30

### Breaking Changes
Expand Down
12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ pharos = { version = "0.5.2", optional = true }

[dev-dependencies]
assert_matches = "1.5"
async-graphql = "5.0.10"
async-graphql-axum = "5"
async-graphql = "7.0.1"
async-graphql-axum = "7"
async-tungstenite = { version = "0.24", features = ["tokio-runtime"] }
axum = "0.6"
axum = "0.7"
axum-macros = "0.4"
cynic = { version = "3" }
insta = "1.11"
tokio = { version = "1", features = ["macros"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-stream = { version = "0.1", features = ["sync"] }

graphql-ws-client.path = "."
graphql-ws-client.features = ["client-cynic", "client-graphql-client", "async-tungstenite"]
4 changes: 2 additions & 2 deletions examples-wasm/examples/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::future::IntoFuture;

use graphql_ws_client::{next::ClientBuilder, ws_stream_wasm::Connection};
use graphql_ws_client::{next::Client, ws_stream_wasm::Connection};

mod schema {
cynic::use_schema!("../schemas/books.graphql");
Expand Down Expand Up @@ -57,7 +57,7 @@ async fn main() {

let connection = Connection::new(ws_conn).await;

let (mut client, actor) = ClientBuilder::new().build(connection).await.unwrap();
let (mut client, actor) = Client::build(connection).await.unwrap();
wasm_bindgen_futures::spawn_local(actor.into_future());

let mut stream = client.streaming_operation(build_query()).await.unwrap();
Expand Down
85 changes: 85 additions & 0 deletions examples/examples/mulitiple-subscriptions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! An example of running a multiple subscriptions on a single connection
//! using `graphql-ws-client` and `async-tungstenite`
//!
//! Talks to the the tide subscription example in `async-graphql`

mod schema {
cynic::use_schema!("../schemas/books.graphql");
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(schema_path = "../schemas/books.graphql", graphql_type = "Book")]
#[allow(dead_code)]
struct Book {
id: String,
name: String,
author: String,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(schema_path = "../schemas/books.graphql", graphql_type = "BookChanged")]
#[allow(dead_code)]
struct BookChanged {
id: cynic::Id,
book: Option<Book>,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(
schema_path = "../schemas/books.graphql",
graphql_type = "SubscriptionRoot"
)]
#[allow(dead_code)]
struct BooksChangedSubscription {
books: BookChanged,
}

#[async_std::main]
async fn main() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;
use graphql_ws_client::CynicClientBuilder;

let mut request = "ws://localhost:8000/graphql".into_client_request().unwrap();
request.headers_mut().insert(
"Sec-WebSocket-Protocol",
HeaderValue::from_str("graphql-transport-ws").unwrap(),
);

let (connection, _) = async_tungstenite::async_std::connect_async(request)
.await
.unwrap();

println!("Connected");

let (sink, stream) = connection.split();

let mut client = CynicClientBuilder::new()
.build(stream, sink, async_executors::AsyncStd)
.await
.unwrap();

// In reality you'd probably want to different subscriptions, but for the sake of this example
// these are the same subscriptions
let mut first_subscription = client.streaming_operation(build_query()).await.unwrap();
let mut second_subscription = client.streaming_operation(build_query()).await.unwrap();

futures::join!(
async move {
while let Some(item) = first_subscription.next().await {
println!("{:?}", item);
}
},
async move {
while let Some(item) = second_subscription.next().await {
println!("{:?}", item);
}
}
);
}

fn build_query() -> cynic::StreamingOperation<BooksChangedSubscription> {
use cynic::SubscriptionBuilder;

BooksChangedSubscription::build(())
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! An example of using subscriptions with `graphql-ws-client` and
//! `async-tungstenite`
//! An example of creating a connection and running a single subscription on it
//! using `graphql-ws-client` and `async-tungstenite`
//!
//! Talks to the the tide subscription example in `async-graphql`

Expand Down Expand Up @@ -38,7 +38,7 @@ struct BooksChangedSubscription {
async fn main() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
use futures::StreamExt;
use graphql_ws_client::CynicClientBuilder;
use graphql_ws_client::next::Client;

let mut request = "ws://localhost:8000/graphql".into_client_request().unwrap();
request.headers_mut().insert(
Expand All @@ -52,16 +52,12 @@ async fn main() {

println!("Connected");

let (sink, stream) = connection.split();

let mut client = CynicClientBuilder::new()
.build(stream, sink, async_executors::AsyncStd)
let mut subscription = Client::build(connection)
.streaming_operation(build_query())
.await
.unwrap();

let mut stream = client.streaming_operation(build_query()).await.unwrap();
println!("Running subscription");
while let Some(item) = stream.next().await {
while let Some(item) = subscription.next().await {
println!("{:?}", item);
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/doc_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{next::Message, Error};

pub struct Conn;

#[async_trait::async_trait]
impl crate::next::Connection for Conn {
async fn receive(&mut self) -> Option<Message> {
unimplemented!()
}

async fn send(&mut self, _: Message) -> Result<(), Error> {
unimplemented!()
}
}

#[derive(serde::Serialize)]
pub struct Subscription;

impl crate::graphql::GraphqlOperation for Subscription {
type Response = ();

type Error = crate::Error;

fn decode(&self, _data: serde_json::Value) -> Result<Self::Response, Self::Error> {
unimplemented!()
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ mod client;
mod logging;
mod protocol;

#[doc(hidden)]
#[path = "doc_utils.rs"]
pub mod __doc_utils;

pub mod graphql;
pub mod websockets;

Expand Down
126 changes: 104 additions & 22 deletions src/next/builder.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,59 @@
use std::collections::HashMap;
use std::{collections::HashMap, future::IntoFuture};

use futures::channel::mpsc;
use futures::{channel::mpsc, future::BoxFuture, stream::BoxStream, FutureExt, StreamExt};
use serde::Serialize;

use crate::{logging::trace, protocol::Event, Error};
use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error};

use super::{
actor::ConnectionActor,
connection::{Connection, Message},
Client,
Client, SubscriptionStream,
};

/// A websocket client builder
#[derive(Default)]
/// Builder for Clients.
///
/// ```rust
/// use graphql_ws_client::next::{Client};
/// use std::future::IntoFuture;
/// #
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// let (client, actor) = Client::build(connection).await?;
/// # Ok(())
/// # }
/// ```
pub struct ClientBuilder {
payload: Option<serde_json::Value>,
subscription_buffer_size: Option<usize>,
connection: Box<dyn Connection + Send>,
}

impl ClientBuilder {
/// Constructs an AsyncWebsocketClientBuilder
pub fn new() -> ClientBuilder {
ClientBuilder::default()
impl super::Client {
/// Creates a ClientBuilder with the given connection.
///
/// ```rust
/// use graphql_ws_client::next::{Client};
/// use std::future::IntoFuture;
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// let (client, actor) = Client::build(connection).await?;
/// # Ok(())
/// # }
/// ```
pub fn build<Conn>(connection: Conn) -> ClientBuilder
where
Conn: Connection + Send + 'static,
{
ClientBuilder {
payload: None,
subscription_buffer_size: None,
connection: Box::new(connection),
}
}
}

impl ClientBuilder {
/// Add payload to `connection_init`
pub fn payload<NewPayload>(self, payload: NewPayload) -> Result<ClientBuilder, Error>
where
Expand All @@ -44,6 +74,61 @@ impl ClientBuilder {
..self
}
}

/// Initialise a Client and use it to run a single streaming operation
///
/// ```rust
/// use graphql_ws_client::next::{Client};
/// use std::future::IntoFuture;
/// # async fn example() -> Result<(), graphql_ws_client::Error> {
/// # let connection = graphql_ws_client::__doc_utils::Conn;
/// # let subscription = graphql_ws_client::__doc_utils::Subscription;
/// let stream = Client::build(connection).streaming_operation(subscription).await?;
/// # Ok(())
/// # }
/// ```
///
/// Note that this takes ownership of the client, so it cannot be
/// used to run any more operations.
///
/// If users want to run mutliple operations on a connection they
/// should use the `IntoFuture` impl to construct a `Client`
pub async fn streaming_operation<'a, Operation>(
self,
op: Operation,
) -> Result<SubscriptionStream<Operation>, Error>
where
Operation: GraphqlOperation + Unpin + Send + 'static,
{
let (mut client, actor) = self.await?;

let mut actor_future = actor.into_future().fuse();

let subscribe_future = client.streaming_operation(op).fuse();
futures::pin_mut!(subscribe_future);

// Temporarily run actor_future while we start the subscription
let stream = futures::select! {
() = actor_future => {
return Err(Error::Unknown("actor ended before subscription started".into()))
},
result = subscribe_future => {
result?
}
};

Ok(stream.join(actor_future))
}
}

impl IntoFuture for ClientBuilder {
type Output = Result<(Client, ConnectionActor), Error>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(self.build())
}
}

impl ClientBuilder {
Expand All @@ -52,18 +137,14 @@ impl ClientBuilder {
/// Accepts an already built websocket connection, and returns the connection
/// and a future that must be awaited somewhere - if the future is dropped the
/// connection will also drop.
pub async fn build<Conn>(self, connection: Conn) -> Result<(Client, ConnectionActor), Error>
where
Conn: Connection + Send + 'static,
{
self.build_impl(Box::new(connection)).await
}
pub async fn build(self) -> Result<(Client, ConnectionActor), Error> {
let Self {
payload,
subscription_buffer_size,
mut connection,
} = self;

async fn build_impl(
self,
mut connection: Box<dyn Connection + Send>,
) -> Result<(Client, ConnectionActor), Error> {
connection.send(Message::init(self.payload)).await?;
connection.send(Message::init(payload)).await?;

// wait for ack before entering receiver loop:
loop {
Expand Down Expand Up @@ -108,7 +189,8 @@ impl ClientBuilder {

let actor = ConnectionActor::new(connection, command_receiver);

let client = Client::new(command_sender, self.subscription_buffer_size.unwrap_or(5));
let client =
Client::new_internal(command_sender, self.subscription_buffer_size.unwrap_or(5));

Ok((client, actor))
}
Expand Down
2 changes: 1 addition & 1 deletion src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Client {
}

impl Client {
pub(super) fn new(
pub(super) fn new_internal(
actor: mpsc::Sender<ConnectionCommand>,
subscription_buffer_size: usize,
) -> Self {
Expand Down
Loading
Loading