From 909d5a1dcc823b0e8c853fc7cf453b010cbc7ee5 Mon Sep 17 00:00:00 2001 From: mdboon Date: Tue, 7 May 2024 16:01:02 +1000 Subject: [PATCH] Adds cachedFlatMap operator --- .changeset/tame-geese-allow.md | 5 ++ src/stream/higher-order.ts | 48 +++++++++++++++ test/higher-order.test.ts | 105 +++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 .changeset/tame-geese-allow.md diff --git a/.changeset/tame-geese-allow.md b/.changeset/tame-geese-allow.md new file mode 100644 index 0000000..94f72ea --- /dev/null +++ b/.changeset/tame-geese-allow.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +Adds the `cachedFlatMap` operator diff --git a/src/stream/higher-order.ts b/src/stream/higher-order.ts index 3aeb3b8..d8833f5 100644 --- a/src/stream/higher-order.ts +++ b/src/stream/higher-order.ts @@ -125,6 +125,54 @@ export class HigherOrderStream extends StreamTransforms { ); } + /** + * Map over each value in the stream, produce a stream from it, cache the resultant stream + * and flatten all the value streams together + * + * @group Higher Order + */ + cachedFlatMap( + cb: (value: T) => MaybePromise>, + keyFn: (value: T) => string | number | symbol, + ): Stream { + const trace = this.trace("cachedFlatMap"); + + return this.consume(async function* (it) { + const cache = new Map[]>(); + + for await (const atom of it) { + if (!isOk(atom)) { + yield atom; + continue; + } + + const key = keyFn(atom.value); + const cachedValues = cache.get(key); + + if (cachedValues !== undefined) { + yield* cachedValues; + continue; + } + + // Run the flat map handler + const streamAtom = await run(() => cb(atom.value), trace); + + // If an error was emitted whilst initialising the new stream, return it + if (!isOk(streamAtom)) { + yield streamAtom; + continue; + } + + // Otherwise, consume the iterator + const values = await streamAtom.value.toArray({ atoms: true }); + + cache.set(key, values); + + yield* values; + } + }); + } + /** * Produce a new stream from the stream that has any nested streams flattened * diff --git a/test/higher-order.test.ts b/test/higher-order.test.ts index 6957b32..3d6701a 100644 --- a/test/higher-order.test.ts +++ b/test/higher-order.test.ts @@ -127,6 +127,111 @@ describe.concurrent("higher order streams", () => { }); }); + describe.concurrent("cachedFlatMap", () => { + test("lookup non-repeating strings returning single atom", async ({ expect }) => { + expect.assertions(2); + + const lookup = vi.fn((param: string) => $.of(param)); + + const s = $.from(["a", "b", "c"]).cachedFlatMap(lookup, (v) => v); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok("a"), $.ok("b"), $.ok("c")]); + expect(lookup).toBeCalledTimes(3); + }); + + test("lookup repeating strings returning single atom", async ({ expect }) => { + expect.assertions(2); + + const lookup = vi.fn((param: string) => $.of(param)); + + const s = $.from(["a", "b", "c", "a", "a"]).cachedFlatMap(lookup, (v) => v); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok("a"), + $.ok("b"), + $.ok("c"), + $.ok("a"), + $.ok("a"), + ]); + expect(lookup).toBeCalledTimes(3); + }); + + test("lookup repeating numbers returning multiple atoms", async ({ expect }) => { + expect.assertions(2); + + const lookup = vi.fn((n: number) => $.fromArray([n, n * 2, n * 4])); + + const s = $.from([1, 100, 200, 1, 10]).cachedFlatMap(lookup, (v) => v); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.ok(2), + $.ok(4), + $.ok(100), + $.ok(200), + $.ok(400), + $.ok(200), + $.ok(400), + $.ok(800), + $.ok(1), + $.ok(2), + $.ok(4), + $.ok(10), + $.ok(20), + $.ok(40), + ]); + expect(lookup).toBeCalledTimes(4); + }); + + test("lookup repeating numbers returning multiple atoms", async ({ expect }) => { + expect.assertions(2); + + const oneHundredDividedBy = vi.fn((n: number) => { + if (n === 0) { + throw "Cannot divide by zero!"; + } + + return $.of(100 / n); + }); + + const s = $.from([5, 0, 50, 5, 5]).cachedFlatMap(oneHundredDividedBy, (v) => v); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(20), + $.unknown("Cannot divide by zero!", ["cachedFlatMap"]), + $.ok(2), + $.ok(20), + $.ok(20), + ]); + expect(oneHundredDividedBy).toBeCalledTimes(3); + }); + + test("lookup repeating numbers, including an error, returning multiple atoms", async ({ + expect, + }) => { + expect.assertions(2); + + const lookup = vi.fn((n: number) => $.of(n)); + + const s = $.from([ + $.ok(1), + $.ok(2), + $.error("oh no!"), + $.ok(2), + $.ok(1), + ]).cachedFlatMap(lookup, (v) => v); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.ok(2), + $.error("oh no!"), + $.ok(2), + $.ok(1), + ]); + expect(lookup).toBeCalledTimes(2); + }); + }); + describe.concurrent("flatten", () => { test("simple nested stream", async ({ expect }) => { expect.assertions(1);