diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b07a71..c07f88a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,12 @@ all APIs might be changed. ## Unreleased - xxxx-xx-xx +### Breaking Changes + +- Subscription IDs sent to the server are now just monotonic numbers rather + than uuids. +- `SubscriptionStream` no longer takes `GraphqlClient` as a generic parameter + ## v0.7.0 - 2024-01-03 ### Breaking Changes diff --git a/Cargo.toml b/Cargo.toml index 20e0993..24c9c3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ members = ["examples", "examples-wasm"] default = ["async-tungstenite"] client-cynic = ["async-tungstenite", "cynic"] client-graphql-client = ["async-tungstenite", "graphql_client"] -ws_stream_wasm = ["dep:ws_stream_wasm", "uuid/js", "no-logging", "pharos", "pin-project-lite"] +ws_stream_wasm = ["dep:ws_stream_wasm", "no-logging", "pharos", "pin-project-lite"] no-logging = [] [dependencies] @@ -31,7 +31,6 @@ pin-project = "1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -uuid = { version = "1.0", features = ["v4"] } cynic = { version = "3", optional = true } async-tungstenite = { version = "0.24", optional = true } diff --git a/src/client.rs b/src/client.rs index 91a876f..03b23cb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,15 @@ -use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc}; +use std::{ + collections::HashMap, + marker::PhantomData, + pin::Pin, + sync::{ + atomic::{self, AtomicU64}, + Arc, + }, +}; use futures::{ - channel::{mpsc, oneshot}, + channel::mpsc, future::RemoteHandle, lock::Mutex, sink::{Sink, SinkExt}, @@ -9,7 +17,6 @@ use futures::{ task::{Context, Poll, SpawnExt}, }; use serde::Serialize; -use uuid::Uuid; use super::{ graphql::{self, GraphqlOperation}, @@ -27,6 +34,7 @@ where { inner: Arc>, sender_sink: mpsc::Sender, + next_id: AtomicU64, phantom: PhantomData, } @@ -133,15 +141,14 @@ where let (mut sender_sink, sender_stream) = mpsc::channel(1); - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let sender_handle = runtime - .spawn_with_handle(sender_loop( - sender_stream, - websocket_sink, - Arc::clone(&operations), - shutdown_receiver, - )) + .spawn_with_handle(async move { + sender_stream + .map(Ok) + .forward(websocket_sink) + .await + .map_err(|error| Error::Send(error.to_string())) + }) .map_err(|err| Error::SpawnHandle(err.to_string()))?; // wait for ack before entering receiver loop: @@ -185,7 +192,6 @@ where websocket_stream, sender_sink.clone(), Arc::clone(&operations), - shutdown_sender, )) .map_err(|err| Error::SpawnHandle(err.to_string()))?; @@ -195,6 +201,7 @@ where operations, sender_handle, }), + next_id: 0.into(), sender_sink, phantom: PhantomData, }) @@ -218,12 +225,12 @@ where pub async fn streaming_operation<'a, Operation>( &mut self, op: Operation, - ) -> Result, Error> + ) -> Result, Error> where Operation: GraphqlOperation + Unpin + Send + 'static, { - let id = Uuid::new_v4(); + let id = self.next_id.fetch_add(1, atomic::Ordering::Relaxed); let (sender, receiver) = mpsc::channel(SUBSCRIPTION_BUFFER_SIZE); self.inner.operations.lock().await.insert(id, sender); @@ -242,7 +249,7 @@ where let mut sender_clone = self.sender_sink.clone(); let id_clone = id.to_string(); - Ok(SubscriptionStream:: { + Ok(SubscriptionStream:: { id: id.to_string(), stream: Box::pin(receiver.map(move |response| { op.decode(response) @@ -260,7 +267,6 @@ where Ok(()) }) }), - phantom: PhantomData, }) } } @@ -269,21 +275,18 @@ where /// /// Emits an item for each message received by the subscription. #[pin_project::pin_project] -pub struct SubscriptionStream +pub struct SubscriptionStream where - GraphqlClient: graphql::GraphqlClient, - Operation: GraphqlOperation, + Operation: GraphqlOperation, { id: String, stream: Pin> + Send>>, cancel_func: Box futures::future::BoxFuture<'static, Result<(), Error>> + Send>, - phantom: PhantomData, } -impl SubscriptionStream +impl SubscriptionStream where - GraphqlClient: graphql::GraphqlClient + Send, - Operation: GraphqlOperation + Send, + Operation: GraphqlOperation + Send, { /// Stops the operation by sending a Complete message to the server. pub async fn stop_operation(self) -> Result<(), Error> { @@ -291,10 +294,9 @@ where } } -impl Stream for SubscriptionStream +impl Stream for SubscriptionStream where - GraphqlClient: graphql::GraphqlClient, - Operation: GraphqlOperation + Unpin, + Operation: GraphqlOperation + Unpin, { type Item = Result; @@ -305,13 +307,12 @@ where type OperationSender = mpsc::Sender; -type OperationMap = Arc>>>; +type OperationMap = Arc>>>; async fn receiver_loop( mut receiver: S, mut sender: mpsc::Sender, operations: OperationMap, - shutdown: oneshot::Sender<()>, ) -> Result<(), Error> where S: Stream> + Unpin, @@ -330,9 +331,10 @@ where } } - shutdown - .send(()) - .map_err(|_| Error::SenderShutdown("Couldn't shutdown sender".to_owned())) + // Clear out any operations + operations.lock().await.clear(); + + Ok(()) } async fn handle_message( @@ -355,7 +357,10 @@ where }; let id = match event.id() { - Some(id) => Some(Uuid::parse_str(id).map_err(|err| Error::Decode(err.to_string()))?), + Some(id) => Some( + id.parse::() + .map_err(|err| Error::Decode(err.to_string()))?, + ), None => None, }; @@ -414,50 +419,6 @@ where Ok(()) } -async fn sender_loop( - message_stream: mpsc::Receiver, - mut ws_sender: S, - operations: OperationMap, - shutdown: oneshot::Receiver<()>, -) -> Result<(), Error> -where - M: WebsocketMessage, - S: Sink + Unpin, - E: std::error::Error, -{ - use futures::{future::FutureExt, select}; - - let mut message_stream = message_stream.fuse(); - let mut shutdown = shutdown.fuse(); - - loop { - select! { - msg = message_stream.next() => { - if let Some(msg) = msg { - trace!("Sending message: {:?}", msg); - ws_sender - .send(msg) - .await - .map_err(|err| Error::Send(err.to_string()))?; - } else { - return Ok(()); - } - } - _ = shutdown => { - // Shutdown the incoming message stream - let mut message_stream = message_stream.into_inner(); - message_stream.close(); - while message_stream.next().await.is_some() {} - - // Clear out any operations - operations.lock().await.clear(); - - return Ok(()); - } - } - } -} - struct ClientInner where GraphqlClient: crate::graphql::GraphqlClient,