Skip to content

Commit

Permalink
fix: add timeout for n option with batch
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq committed Jul 5, 2024
1 parent d4f4810 commit d9dcc4e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/four-months-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

fix: add timeout with `n` option for buffer
15 changes: 8 additions & 7 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,12 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
* @param options.yieldEmpty - If `timeout` is reached and no items have been emitted on the
* stream, still emit an empty array.
*/
batch(
options:
| { n: number; yieldRemaining?: boolean }
| { timeout: number; yieldEmpty?: boolean },
): Stream<T[], E> {
batch(options: {
n?: number;
timeout?: number;
yieldRemaining?: boolean;
yieldEmpty?: boolean;
}): Stream<T[], E> {
return this.consume(async function* (it) {
const atoms = it[Symbol.asyncIterator]();

Expand Down Expand Up @@ -480,7 +481,7 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
(result === "timeout" &&
"timeout" in options &&
(batch.length > 0 || options.yieldEmpty)) ||
("n" in options && batch.length >= options.n)
(options?.n && batch.length >= options.n)
) {
const end = ("n" in options && options?.n) || batch.length;
yield ok<T[], E>(batch.splice(0, end));
Expand All @@ -497,7 +498,7 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {

// Yield the rest of the batch
yield ok(batch);
} else if ("n" in options) {
} else if (options?.n) {
while (batch.length >= options.n) {
yield ok(batch.splice(0, options.n));
}
Expand Down
14 changes: 14 additions & 0 deletions test/transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,5 +402,19 @@ describe("stream transforms", () => {
expect(mapper).toHaveBeenCalledTimes(2);
expect(mapper).toHaveBeenNthCalledWith(2, []);
});

test("n with timeout", async ({ expect }) => {
const mapper = vi.fn();

$.from([1, 2, 3, 4, 5]).batch({ n: 3, timeout: 100 }).map(mapper).exhaust();

await vi.advanceTimersByTimeAsync(50);
expect(mapper).toHaveBeenCalledTimes(1);
expect(mapper).toHaveBeenNthCalledWith(1, [1, 2, 3]);

await vi.advanceTimersByTimeAsync(50);
expect(mapper).toHaveBeenCalledTimes(2);
expect(mapper).toHaveBeenNthCalledWith(2, [4, 5]);
});
});
});

0 comments on commit d9dcc4e

Please sign in to comment.