Skip to content

Commit

Permalink
Add __timestamp__ meta, channel.receiveAll
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Feb 13, 2023
1 parent b239584 commit 8449ebc
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 45 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning].

## [Unreleased]

## [0.5.0]

- Add `__timestamp__` field to incoming messages with the receive time (as UTC milliseconds). ([@palkan][])

This should be used to determine the actual time when the message was received (not when it reached JS runtime).

- Add `client.Loop`. ([@palkan][])

This makes it possible to use shared data along with `onMessage` callbacks.
Expand Down
39 changes: 0 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,45 +149,6 @@ You can pass the following options to the `connect` method as the second argumen
**NOTE:** `msgpack` and `protobuf` codecs are only supported by [AnyCable PRO](https://anycable.io#pro).
### OnMessage and Loop
It's possible to write scenarios processing incoming messages asynchronously:
```js
const chatChannel = client.subscribe("ChatChannel");
chatChannel.onMessage((msg) => {
// do smth with message
});
const presenceChannel = client.subscribe("PresenceChannel");
presenceChannel.onMessage((msg) => {
// do smth with message
});
// IMPORTANT: the rest of the scenario must be wrapped into a special loop function
// to avoid JS runtime race conditions
let i = 0;
client.loop(() => {
chatChannel.perform("speak", {message: "hello"});
presenceChannel.perform("update", {cursor: 42});
sleep(randomIntBetween(2, 5));
i++;
// Run 5 times and exit
if (i > 5) {
return true
}
})
sleep(5);
client.disconnect();
```
More examples could be found in the [examples/](./examples) folder.
## Contributing
Expand Down
28 changes: 26 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (ch *Channel) Receive(attr goja.Value) interface{} {
func (ch *Channel) ReceiveN(n int, cond goja.Value) []interface{} {
var results []interface{}
timeout := ch.client.recTimeout
timer := time.NewTimer(ch.client.recTimeout)
timer := time.NewTimer(timeout)
matcher, err := ch.buildMatcher(cond)

if err != nil {
Expand All @@ -87,6 +87,30 @@ func (ch *Channel) ReceiveN(n int, cond goja.Value) []interface{} {
}
}

// ReceiveAll fethes all messages for a given number of seconds.
func (ch *Channel) ReceiveAll(sec int, cond goja.Value) []interface{} {
var results []interface{}
timeout := time.Duration(sec) * time.Second
timer := time.NewTimer(timeout)
matcher, err := ch.buildMatcher(cond)

if err != nil {
panic(err)
}

for {
select {
case msg := <-ch.readCh:
if !matcher.Match(msg.Message) {
continue
}
results = append(results, msg.Message)
case <-timer.C:
return results
}
}
}

// Register callback to receive messages asynchronously
func (ch *Channel) OnMessage(fn goja.Value) {
f, isFunc := goja.AssertFunction(fn)
Expand Down Expand Up @@ -121,7 +145,7 @@ func (ch *Channel) handleAsync(msg *cableMsg) {
}

for _, h := range ch.asyncHandlers {
_, err := h(goja.Undefined(), ch.client.vu.Runtime().ToValue(msg))
_, err := h(goja.Undefined(), ch.client.vu.Runtime().ToValue(msg.Message))

if err != nil {
if !strings.Contains(err.Error(), "context canceled") {
Expand Down
11 changes: 10 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *Client) Subscribe(channelName string, paramsIn goja.Value) (*Channel, e
c.logger.Errorf("subscription to `%v`: rejected\n", channelName)
return nil, nil
case <-timer:
c.logger.Errorf("subscription to `%v`: timeout exceeded. Consider increasing receiveTimeoutMs configuration option\n", channelName)
c.logger.Errorf("subscription to `%v`: timeout exceeded. Consider increasing receiveTimeoutMs configuration option (current: %d)\n", channelName, c.recTimeout)
return nil, nil
}
}
Expand Down Expand Up @@ -135,7 +135,9 @@ func (c *Client) Loop(fn goja.Value) {
return
}

c.mu.Lock()
result := ret.ToBoolean()
c.mu.Unlock()

if result {
return
Expand Down Expand Up @@ -266,6 +268,13 @@ func (c *Client) receiveIgnoringPing() (*cableMsg, error) {
continue
}

timestamp := int64(time.Now().UnixNano()) / 1_000_000

if data, ok := msg.Message.(map[string]interface{}); ok {
data["__timestamp__"] = timestamp
msg.Message = data
}

return &msg, nil
}

Expand Down
4 changes: 1 addition & 3 deletions examples/async_loop.js → examples/benchmark_async_loop.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,10 @@ export default function () {

channel.ignoreReads();

channel.onMessage(data => {
channel.onMessage(message => {
let now = Date.now();
let { message } = data;

if (!message) {
console.log(data);
return
}

Expand Down
70 changes: 70 additions & 0 deletions examples/benchmark_receive_all.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Build k6 with xk6-cable like this:
// xk6 build v0.38.3 --with github.com/anycable/[email protected]

import { check, sleep, fail } from "k6";
import cable from "k6/x/cable";
import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.1.0/index.js";

import { Trend, Counter } from "k6/metrics";
let rttTrend = new Trend("rtt", true);
let broadcastsRcvd = new Counter("broadcasts_rcvd");
let broadcastsSent = new Counter("broadcasts_sent");

let config = __ENV

config.URL = config.URL || "ws://localhost:8080/cable";

let url = config.URL;
let channelName = 'BenchmarkChannel';

export default function () {
let cableOptions = {
receiveTimeoutMs: 15000
}

let client = cable.connect(url, cableOptions);

if (
!check(client, {
"successful connection": (obj) => obj,
})
) {
fail("connection failed");
}

let channel = client.subscribe(channelName);

if (
!check(channel, {
"successful subscription": (obj) => obj,
})
) {
fail("failed to subscribe");
}

for(let i = 0; ; i++) {
// Sampling
if (randomIntBetween(1, 10) > 8) {
let start = Date.now();
broadcastsSent.add(1);
// Create message via cable instead of a form
channel.perform("broadcast", { ts: start, content: `hello from ${__VU} numero ${i+1}` });
}

sleep(randomIntBetween(5, 10) / 100);

let incoming = channel.receiveAll(1);

for(let message of incoming) {
let received = message.__timestamp__ || Date.now();

if (message.action == "broadcast") {
broadcastsRcvd.add(1);
let ts = message.ts;
rttTrend.add(received - ts);
}
}

sleep(randomIntBetween(5, 10) / 100);
}
}

0 comments on commit 8449ebc

Please sign in to comment.