Skip to content

Commit

Permalink
feat: Adds cachedFlatMap operator (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq committed May 7, 2024
2 parents ebd7c47 + 0136323 commit a49c4ad
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/tame-geese-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Adds the `cachedFlatMap` operator
48 changes: 48 additions & 0 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,54 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
);
}

/**
* Map over each value in the stream, produce a stream from it, cache the resultant stream
* and flatten all the value streams together
*
* @group Higher Order
*/
cachedFlatMap<U>(
cb: (value: T) => MaybePromise<Stream<U, E>>,
keyFn: (value: T) => string | number | symbol,
): Stream<U, E> {
const trace = this.trace("cachedFlatMap");

return this.consume(async function* (it) {
const cache = new Map<PropertyKey, Atom<U, E>[]>();

for await (const atom of it) {
if (!isOk(atom)) {
yield atom;
continue;
}

const key = keyFn(atom.value);
const cachedValues = cache.get(key);

if (cachedValues !== undefined) {
yield* cachedValues;
continue;
}

// Run the flat map handler
const streamAtom = await run(() => cb(atom.value), trace);

// If an error was emitted whilst initialising the new stream, return it
if (!isOk(streamAtom)) {
yield streamAtom;
continue;
}

// Otherwise, consume the iterator
const values = await streamAtom.value.toArray({ atoms: true });

cache.set(key, values);

yield* values;
}
});
}

/**
* Produce a new stream from the stream that has any nested streams flattened
*
Expand Down
105 changes: 105 additions & 0 deletions test/higher-order.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,111 @@ describe.concurrent("higher order streams", () => {
});
});

describe.concurrent("cachedFlatMap", () => {
test("lookup non-repeating strings returning single atom", async ({ expect }) => {
expect.assertions(2);

const lookup = vi.fn((param: string) => $.of(param));

const s = $.from(["a", "b", "c"]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([$.ok("a"), $.ok("b"), $.ok("c")]);
expect(lookup).toBeCalledTimes(3);
});

test("lookup repeating strings returning single atom", async ({ expect }) => {
expect.assertions(2);

const lookup = vi.fn((param: string) => $.of(param));

const s = $.from(["a", "b", "c", "a", "a"]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok("a"),
$.ok("b"),
$.ok("c"),
$.ok("a"),
$.ok("a"),
]);
expect(lookup).toBeCalledTimes(3);
});

test("lookup repeating numbers returning multiple atoms", async ({ expect }) => {
expect.assertions(2);

const lookup = vi.fn((n: number) => $.fromArray([n, n * 2, n * 4]));

const s = $.from([1, 100, 200, 1, 10]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(2),
$.ok(4),
$.ok(100),
$.ok(200),
$.ok(400),
$.ok(200),
$.ok(400),
$.ok(800),
$.ok(1),
$.ok(2),
$.ok(4),
$.ok(10),
$.ok(20),
$.ok(40),
]);
expect(lookup).toBeCalledTimes(4);
});

test("lookup repeating numbers returning multiple atoms", async ({ expect }) => {
expect.assertions(2);

const oneHundredDividedBy = vi.fn((n: number) => {
if (n === 0) {
throw "Cannot divide by zero!";
}

return $.of(100 / n);
});

const s = $.from([5, 0, 50, 5, 5]).cachedFlatMap(oneHundredDividedBy, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(20),
$.unknown("Cannot divide by zero!", ["cachedFlatMap"]),
$.ok(2),
$.ok(20),
$.ok(20),
]);
expect(oneHundredDividedBy).toBeCalledTimes(3);
});

test("lookup repeating numbers, including an error, returning multiple atoms", async ({
expect,
}) => {
expect.assertions(2);

const lookup = vi.fn((n: number) => $.of(n));

const s = $.from<number, unknown>([
$.ok(1),
$.ok(2),
$.error("oh no!"),
$.ok(2),
$.ok(1),
]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(2),
$.error("oh no!"),
$.ok(2),
$.ok(1),
]);
expect(lookup).toBeCalledTimes(2);
});
});

describe.concurrent("flatten", () => {
test("simple nested stream", async ({ expect }) => {
expect.assertions(1);
Expand Down

0 comments on commit a49c4ad

Please sign in to comment.