diff --git a/src/stream/transforms.ts b/src/stream/transforms.ts index 4e2352e..8ad13b2 100644 --- a/src/stream/transforms.ts +++ b/src/stream/transforms.ts @@ -517,8 +517,12 @@ export class StreamTransforms extends StreamConsumption { (batch) => batch.length >= (options?.n ?? 1) || options?.yieldRemaining, ); - if (ready.length === 0 && options?.yieldEmpty) { - yield ok([]); + if (ready.reduce((total, batch) => total + batch.length, 0) === 0) { + if (options?.yieldEmpty) { + // Only yield an empty batch if there are absolutely no items ready to + // be yielded and if the configuration allows it + yield ok([]); + } } else { for (const batch of ready) { const items = batch.splice(0, options?.n ?? batch.length); diff --git a/test/transforms.test.ts b/test/transforms.test.ts index 00d89e1..d713504 100644 --- a/test/transforms.test.ts +++ b/test/transforms.test.ts @@ -447,6 +447,8 @@ describe("stream transforms", () => { }); test("timeout bucket no items", async ({ expect }) => { + expect.assertions(2); + const mapper = vi.fn(); $.fromNext(() => new Promise(() => {})) @@ -461,6 +463,31 @@ describe("stream transforms", () => { expect(mapper).toHaveBeenCalledTimes(0); }); + test("timeout don't yield empty", async ({ expect }) => { + expect.assertions(3); + + const mapper = vi.fn(); + + const testItems = [1, 1, 1, 1]; + $.fromNext(async () => { + if (testItems.length > 0) { + return testItems.shift(); + } + + return new Promise(() => {}); + }) + .batch({ timeout: 100, n: 10, yieldRemaining: true, yieldEmpty: false }) + .map(mapper) + .exhaust(); + + await vi.advanceTimersByTimeAsync(100); + expect(mapper).toHaveBeenCalledTimes(1); + expect(mapper).toHaveBeenNthCalledWith(1, [1, 1, 1, 1]); + + await vi.advanceTimersByTimeAsync(100); + expect(mapper).toHaveBeenCalledTimes(1); + }); + describe("batch weirdness", () => { test("5 items, n = 10", async ({ expect }) => { expect.assertions(1);