Skip to content

Commit

Permalink
feat: add info event to propagate protocol-level events
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jul 31, 2024
1 parent eb0d8bb commit 2908314
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

## master

- Add `info` event to Cable and Channel. ([@palkan][])

This event can be used to notify of some protocol-level events that happen under the hood and have no representation at the Channel API level. A example of such event is a stream history retrieval failure (`{type: "history_not_found"}`).

## 0.9.0 (2024-05-21)

- Types improvements. ([@cmdoptesc][])
Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,36 @@ This is a recommended way to use this feature with Hotwire applications, where i

You can also disable retrieving history since the specified time completely by setting the `historyTimestamp` option to `false`.

#### Handling history retrieval failures

AnyCable reliable streams store history for a finite period of time and also have an upper size limit. Thus, in some cases, clients may fail to retrieve the missed messages (e.g., after a long-term disconnect). To gracefully handle this situation, you may decide to fallback to a full state reset (e.g., a browser page reload). You can use the specific "info" event to react on various protocol-level events not exposed to the generic Channel interface:

```js
import { createCable, Channel } from '@anycable/web'

const cable = createCable({protocol: 'actioncable-v1-ext-json'});

class ChatChannel extends Channel {
static identifier = 'ChatChannel'

constructor(params) {
super(params)

this.on("info", (evt) => {
if (evt.type === "history_not_found") {
// Restore state by performing an action
this.perform("resetState")
}

// Successful history retrieval is also notified
if (evt.type === "history_received") {
// ...
}
})
}
}
```

#### PONGs support

The extended protocol also support sending `pong` commands in response to `ping` messages. A server (AnyCable-Go) keeps track of pongs and disconnect the client if no pongs received in time. This helps to identify broken connections quicker.
Expand Down
4 changes: 4 additions & 0 deletions packages/core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master

- Add `notification` event to Cable and Channel. ([@palkan][])

This event can be used to notify of some protocol-level events that happen under the hood and have no representation at the Channel API level. A example of such notification is a stream history retrieval failure.

## 0.9.0 (2024-05-21)

- Types improvements. ([@cmdoptesc][])
Expand Down
2 changes: 2 additions & 0 deletions packages/core/action_cable_ext/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol {

if (type === 'confirm_history') {
this.logger.debug('history result received', msg)
this.cable.notify('history_received', identifier)
return
}

if (type === 'reject_history') {
this.logger.warn('failed to retrieve history', msg)
this.cable.notify('history_not_found', identifier)
return
}

Expand Down
22 changes: 22 additions & 0 deletions packages/core/action_cable_ext/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ describe('history', () => {
expect(logger.logs[0].message).toEqual('history result received')
})

it('notifies history_received', () => {
protocol.receive({ type: 'confirm_history', identifier })

expect(cable.mailbox).toHaveLength(1)
expect(cable.mailbox[0]).toMatchObject({
type: 'info',
event: 'history_received',
identifier
})
})

it('logs reject_history', () => {
expect(
protocol.receive({ type: 'reject_history', identifier })
Expand All @@ -337,4 +348,15 @@ describe('history', () => {
expect(logger.warnings).toHaveLength(1)
expect(logger.warnings[0].message).toEqual('failed to retrieve history')
})

it('notifies history_not_found', () => {
protocol.receive({ type: 'reject_history', identifier })

expect(cable.mailbox).toHaveLength(1)
expect(cable.mailbox[0]).toMatchObject({
type: 'info',
event: 'history_not_found',
identifier
})
})
})
9 changes: 9 additions & 0 deletions packages/core/cable/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ type ConnectEvent = Partial<{
reconnect: boolean
}>

export type InfoEvent = {
type: string
identifier?: Identifier
data?: object
}

export interface CableEvents {
connect: (event: ConnectEvent) => void
disconnect: (event: ReasonError) => void
close: (event?: ReasonError) => void
keepalive: (msg?: Message) => void
info: (event: InfoEvent) => void
}

export type CableOptions = {
Expand Down Expand Up @@ -108,6 +115,8 @@ export class Cable {
restored(remoteIds: string[]): void
disconnected(reason?: ReasonError): void
closed(reason?: string | ReasonError): void
notify(event: string, data?: object): void
notify(event: string, identifier?: Identifier, data?: object): void

setSessionId(sid: string): void
}
Expand Down
19 changes: 19 additions & 0 deletions packages/core/cable/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,25 @@ export class Cable {
this.emit('connect', { reconnect, restored })
}

notify(event, identifier, data) {
if (identifier && typeof identifier !== 'string') {
data = identifier
identifier = undefined
}

// If identifier is present then it's a channel-level notification
if (!identifier) {
this.emit('info', { type: event, data })
} else {
let sub = this.hub.subscriptions.get(identifier)
if (sub) {
sub.channels.forEach(channel =>
channel.emit('info', { type: event, data })
)
}
}
}

handleClose(err) {
this.logger.debug('transport closed', { error: err })

Expand Down
60 changes: 59 additions & 1 deletion packages/core/cable/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import {
SubscriptionRejectedError,
StaleConnectionError,
ReasonError,
Message
Message,
InfoEvent
} from '../index.js'
import { TestTransport } from '../transport/testing'
import { TestLogger } from '../logger/testing'
Expand Down Expand Up @@ -999,6 +1000,63 @@ describe('channels', () => {
).rejects.toEqual(Error('failed'))
})

it('notify w/o identifier', async () => {
let received: InfoEvent[] = []
let promise = new Promise<void>((resolve, reject) => {
let tid = setTimeout(() => {
reject(Error('Timed out to receive notification event'))
}, 500)

cable.on('info', evt => {
received.push(evt)

if (received.length === 2) {
clearTimeout(tid)
resolve()
}
})
})

cable.notify('test_notification')
cable.notify('test_notification', { foo: 'bar' })

await promise

expect(received).toEqual([
{ type: 'test_notification', data: undefined },
{ type: 'test_notification', data: { foo: 'bar' } }
])
})

it('notify', async () => {
cable.subscribe(channel)
expect(cable.hub.size).toEqual(1)

await channel.ensureSubscribed()

let promise = new Promise<void>((resolve, reject) => {
let tid = setTimeout(() => {
reject(Error('Timed out to receive notification event'))
}, 500)

cable.on('info', () => {
clearTimeout(tid)
reject(Error('Should not receive info event for cable'))
})

channel.on('info', evt => {
clearTimeout(tid)
expect(evt.type).toEqual('test_notification')
expect(evt.data).toEqual({ foo: 'bar' })
resolve()
})
})

cable.notify('test_notification', channel.identifier, { foo: 'bar' })

await promise
})

describe('closure and recovery with channels', () => {
let channel2: TestChannel
let firstError: Promise<void>
Expand Down
6 changes: 6 additions & 0 deletions packages/core/channel/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ type ConnectEvent = Partial<{
reconnect: boolean
}>

export type InfoEvent = {
type: string
data?: object
}

export interface ChannelEvents<T> {
connect: (event: ConnectEvent) => void
disconnect: (event: ReasonError) => void
close: (event?: ReasonError) => void
message: (msg: T, meta?: MessageMeta) => void
info: (event: InfoEvent) => void
}

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down
6 changes: 4 additions & 2 deletions packages/core/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ export {
ChannelEvents,
Message,
MessageMeta,
Identifier
Identifier,
InfoEvent as ChannelInfoEvent
} from './channel/index.js'
export { Transport, FallbackTransport } from './transport/index.js'
export { Encoder, JSONEncoder } from './encoder/index.js'
Expand All @@ -29,7 +30,8 @@ export {
CableOptions,
Cable,
NoConnectionError,
CableEvents
CableEvents,
InfoEvent
} from './cable/index.js'
export {
Monitor,
Expand Down
1 change: 1 addition & 0 deletions packages/core/protocol/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface Consumer {
closed(reason?: string | ReasonError): void
keepalive(msg?: Message): void
send(msg: object): void
notify(event: string, identifier?: Identifier, data?: object): void
}

export type ProcessedMessage = Partial<{
Expand Down
6 changes: 5 additions & 1 deletion packages/core/protocol/testing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*eslint n/no-unsupported-features/es-syntax: ["error", {version: "14.0"}] */
import { Consumer, ReasonError } from '../index.js'
import { Consumer, Identifier, ReasonError } from '../index.js'

type State = 'idle' | 'connected' | 'restored' | 'disconnected' | 'closed'

Expand Down Expand Up @@ -51,4 +51,8 @@ export class TestConsumer implements Consumer {
keepalive(msg: number) {
this.lastPingedAt = msg | 0
}

notify(event: string, identifier?: Identifier, data?: object): void {
this.mailbox.push({ type: 'info', event, identifier, data })
}
}

0 comments on commit 2908314

Please sign in to comment.