Skip to content

Commit

Permalink
tweak batching behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq committed Sep 9, 2024
1 parent 0a73db1 commit 6e3d0fa
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
8 changes: 6 additions & 2 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,12 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
(batch) => batch.length >= (options?.n ?? 1) || options?.yieldRemaining,
);

if (ready.length === 0 && options?.yieldEmpty) {
yield ok([]);
if (ready.reduce((total, batch) => total + batch.length, 0) === 0) {
if (options?.yieldEmpty) {
// Only yield an empty batch if there are absolutely no items ready to
// be yielded and if the configuration allows it
yield ok([]);
}
} else {
for (const batch of ready) {
const items = batch.splice(0, options?.n ?? batch.length);
Expand Down
27 changes: 27 additions & 0 deletions test/transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ describe("stream transforms", () => {
});

test("timeout bucket no items", async ({ expect }) => {
expect.assertions(2);

const mapper = vi.fn();

$.fromNext(() => new Promise<number>(() => {}))
Expand All @@ -461,6 +463,31 @@ describe("stream transforms", () => {
expect(mapper).toHaveBeenCalledTimes(0);
});

test("timeout don't yield empty", async ({ expect }) => {
expect.assertions(3);

const mapper = vi.fn();

const testItems = [1, 1, 1, 1];
$.fromNext(async () => {
if (testItems.length > 0) {
return testItems.shift();
}

return new Promise(() => {});
})
.batch({ timeout: 100, n: 10, yieldRemaining: true, yieldEmpty: false })
.map(mapper)
.exhaust();

await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(1);
expect(mapper).toHaveBeenNthCalledWith(1, [1, 1, 1, 1]);

await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(1);
});

describe("batch weirdness", () => {
test("5 items, n = 10", async ({ expect }) => {
expect.assertions(1);
Expand Down

0 comments on commit 6e3d0fa

Please sign in to comment.