Skip to content

Commit

Permalink
feat: streamFrom/signedStreamFrom
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Mar 14, 2024
1 parent 445a68e commit 40780f3
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

## master

- Added AnyCable signed streams support. ([@palkan][])

Two new methods have been added to connect to streams directly without any channels: `cable.streamFrom(name)` and `cable.streamFromSigned(signedName)`. See [signed streams docs](https://docs.anycable.io/edge/anycable-go/signed_streams).

## 0.7.12 (2024-01-08)

- Omit `undefined` in serialized channel identifiers. ([@ardecvz][])
Expand Down
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ Multiple reasons that forced us to implement an alternative client library for A

## Usage: Web

> See the [demo application](https://github.com/anycable/anycable_rails_demo/pull/21) using AnyCable web client
### Install

```sh
Expand Down Expand Up @@ -51,10 +49,39 @@ By default, the connection URL is looked up in meta tags (`action-cable-url` or
createCable('ws://cable.example.com/my_cable')
```

### Pub/Sub

> [!IMPORTANT]
> This feature is backed by AnyCable _signed streams_ (available since v1.5). See the [documentation](https://docs.anycable.io/edge/anycable-go/signed_streams).
You can subscribe directly to data streams as follows:

```js
const cable = createCable();

const chatChannel = cable.streamFrom('room/42');

chatChannel.on('message', (msg) => {
// ...
});
```

In most cases, however, you'd prefer to use secured (_signed_) stream names generated by your backend:

```js
const cable = createCable();
const signedName = await obtainSignedStreamNameFromWhenever();
const chatChannel = cable.streamFromSigned(signedName);
// ...
```

### Channels

AnyCable client provides multiple ways to subscribe to channels: class-based subscriptions and _headless_ subscriptions.

> [!TIP]
> Read more about the concept of channels and how AnyCable uses it [here](https://docs.anycable.io/edge/anycable-go/rpc).
#### Class-based subscriptions

Class-based APIs allows provides an abstraction layer to hide implementation details of subscriptions.
Expand Down
4 changes: 4 additions & 0 deletions packages/core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master

- Added AnyCable signed streams support. ([@palkan][])

Two new methods have been added to connect to streams directly without any channels: `cable.streamFrom(name)` and `cable.streamFromSigned(signedName)`. See [signed streams docs](https://docs.anycable.io/edge/anycable-go/signed_streams).

## 0.7.13 (2024-02-26)

- Do not try to send `pong` if cable is no longer connected. ([@palkan][])
Expand Down
16 changes: 16 additions & 0 deletions packages/core/cable/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ export type CableState =

export class GhostChannel extends Channel {}

export const PUBSUB_CHANNEL: string

type PubSubChannelParams =
| { stream_name: string }
| { signed_stream_name: string }

export class PubSubChannel extends Channel<
PubSubChannelParams,
Message,
ChannelEvents<Message>,
never
> {}

export class Cable {
transport: Transport
hub: Hub
Expand Down Expand Up @@ -74,6 +87,9 @@ export class Cable {
...args: {} extends P ? [undefined?] : [P]
): T

streamFrom(name: string): PubSubChannel
streamFromSigned(signedName: string): PubSubChannel

keepalive(msg?: Message): void

send(msg: object): void
Expand Down
24 changes: 23 additions & 1 deletion packages/core/cable/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ export class GhostChannel extends Channel {
}
}

const STATE = Symbol('state')
export const PUBSUB_CHANNEL = '$pubsub'

export class PubSubChannel extends Channel {
static identifier = PUBSUB_CHANNEL

async perform() {
throw Error('not implemented')
}
}

export const STATE = Symbol('state')

export class Cable {
constructor({ transport, protocol, encoder, logger, lazy, hubOptions }) {
Expand Down Expand Up @@ -273,6 +283,18 @@ export class Cable {
this.emit('keepalive', msg)
}

streamFrom(name) {
let channel = new PubSubChannel({ stream_name: name })

return this.subscribe(channel)
}

streamFromSigned(name) {
let channel = new PubSubChannel({ signed_stream_name: name })

return this.subscribe(channel)
}

subscribeTo(ChannelClass, params) {
let channel
let ghostName
Expand Down
94 changes: 94 additions & 0 deletions packages/core/cable/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from '../index.js'
import { TestTransport } from '../transport/testing'
import { TestLogger } from '../logger/testing'
import { PUBSUB_CHANNEL, PubSubChannel } from './index.js'

class TestProtocol implements Protocol {
cable!: Cable
Expand Down Expand Up @@ -1613,6 +1614,99 @@ describe('subscribeTo', () => {
})
})

describe('streamFrom / streamFromSigned', () => {
beforeEach(() => {
cable.connect()
cable.connected()

jest
.spyOn(protocol, 'subscribe')
.mockImplementation(async (identifier, params) => {
return JSON.stringify({ identifier, ...params })
})
})

it('subscribes to $pubsub channel', async () => {
let channel = cable.streamFrom('chat_15')
await channel.ensureSubscribed()

let signedChannel = cable.streamFromSigned('xyz-chat-zyx')
await signedChannel.ensureSubscribed()

expect(cable.hub.size).toEqual(2)
expect(channel.state).toEqual('connected')
expect(signedChannel.state).toEqual('connected')

let p = new Promise<Message>(resolve => channel.on('message', resolve))
let p2 = new Promise<Message>(resolve =>
signedChannel.on('message', resolve)
)

transport.receive(
JSON.stringify({
identifier: JSON.stringify({
identifier: '$pubsub',
stream_name: 'chat_15'
}),
payload: { foo: 'clear' }
})
)

let received = await p

expect(received).toEqual({ foo: 'clear' })

transport.receive(
JSON.stringify({
identifier: JSON.stringify({
identifier: PUBSUB_CHANNEL,
signed_stream_name: 'xyz-chat-zyx'
}),
payload: { foo: 'signed' }
})
)

let received2 = await p2

expect(received2).toEqual({ foo: 'signed' })
})

it('using PubSubChannel class', async () => {
let channel = new PubSubChannel({ stream_name: 'chat_2' })
cable.subscribe(channel)
await channel.ensureSubscribed()

expect(cable.hub.size).toEqual(1)
expect(channel.state).toEqual('connected')

let p = new Promise<Message>(resolve => channel.on('message', resolve))

transport.receive(
JSON.stringify({
identifier: JSON.stringify({
identifier: '$pubsub',
stream_name: 'chat_2'
}),
payload: { foo: 'clear' }
})
)

let received = await p

expect(received).toEqual({ foo: 'clear' })
})

it('rejects perform attempts', async () => {
let channel = cable.streamFrom('chat_15')
await channel.ensureSubscribed()

// @ts-ignore
expect(channel.perform('keepalive')).rejects.toEqual(
Error('not implemented')
)
})
})

describe('setSessionID', () => {
it('sets session id and updates transport params', () => {
cable.setSessionId('session-id')
Expand Down
7 changes: 0 additions & 7 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@
"@jridgewell/gen-mapping" "^0.1.0"
"@jridgewell/trace-mapping" "^0.3.9"

"@anycable/core@^0.6.0":
version "0.6.0"
resolved "https://registry.npmjs.org/@anycable/core/-/core-0.6.0.tgz"
integrity sha512-70dzfCUCg0yJx98siCn9g96deKf6iTGlBADw1B2TLuhRrYtVQ3GD/dGbPhb/cQ2znwNV7DfiLNkVhkBDS4mG6A==
dependencies:
nanoevents "^7.0.1"

"@babel/code-frame@^7.0.0", "@babel/code-frame@^7.12.13", "@babel/code-frame@^7.18.6":
version "7.18.6"
resolved "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.18.6.tgz"
Expand Down

0 comments on commit 40780f3

Please sign in to comment.