Skip to content

Commit

Permalink
fix: catch unhandled errors in fromNext (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq committed May 7, 2024
2 parents 5589c04 + af01d2f commit ebd7c47
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changeset/bright-kangaroos-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

catch unhandled errors in `fromNext` stream creation
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ export type {
export type { MaybePromise, Truthy, CallbackOrStream } from "./util";

// Export the `StreamEnd` type
export type { StreamEnd } from "./stream";
export { StreamEnd } from "./stream";

export default Stream;
18 changes: 12 additions & 6 deletions src/stream/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,18 @@ export class StreamBase {
new Readable({
objectMode: true,
async read() {
const value = await next();

if (value === StreamEnd) {
this.push(null);
} else {
this.push(normalise(value));
try {
const value = await next();

// Promise returned as normal
if (value === StreamEnd) {
this.push(null);
} else {
this.push(normalise(value));
}
} catch (e) {
// Promise was rejected, add as an unknown error
this.push(unknown(e, []));
}
},
}),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "../atom";
import { HigherOrderStream } from "./higher-order";

export type { StreamEnd } from "./base";
export { StreamEnd } from "./base";

/**
* @template T - Type of the 'values' on the stream.
Expand Down
63 changes: 62 additions & 1 deletion test/creation.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, test } from "vitest";
import $ from "../src";
import $, { StreamEnd } from "../src";
import { Readable } from "stream";

describe.concurrent("stream creation", () => {
Expand Down Expand Up @@ -99,4 +99,65 @@ describe.concurrent("stream creation", () => {
expect(await s.toArray({ atoms: true })).toEqual([$.error("an error")]);
});
});

describe.concurrent("from next function", () => {
test("simple count up", async ({ expect }) => {
expect.assertions(1);

let i = 0;
const s = $.fromNext(async () => {
if (i < 4) {
return i++;
} else {
return StreamEnd;
}
});

expect(await s.toArray({ atoms: true })).toEqual([$.ok(0), $.ok(1), $.ok(2), $.ok(3)]);
});

test("next atoms produces atoms", async ({ expect }) => {
expect.assertions(1);

const atoms = [$.ok(0), $.error("some error"), $.ok(1), $.unknown("unknown error", [])];
const s = $.fromNext(async () => {
if (atoms.length > 0) {
return atoms.shift();
} else {
return StreamEnd;
}
});

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(0),
$.error("some error"),
$.ok(1),
$.unknown("unknown error", []),
]);
});

test("next catches unhandled errors", async ({ expect }) => {
expect.assertions(1);

let i = 0;
const s = $.fromNext(async () => {
i += 1;

if (i === 1) {
throw "some error";
}

if (i == 2) {
return i;
}

return StreamEnd;
});

expect(await s.toArray({ atoms: true })).toEqual([
$.unknown("some error", []),
$.ok(2),
]);
});
});
});

0 comments on commit ebd7c47

Please sign in to comment.