Skip to content

Commit

Permalink
feat: Implement .toReadable() for streams (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
giraugh committed May 22, 2024
2 parents 035361d + e70811c commit 593a0eb
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-forks-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Implement `.toReadable()` method for streams
5 changes: 5 additions & 0 deletions .changeset/four-donkeys-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

Fix creating streams from arrays with nullish values
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/stream/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ export class StreamBase {
array = [...array];

return Stream.fromNext(async () => {
return array.shift() ?? StreamBase.StreamEnd;
if (array.length === 0) return StreamBase.StreamEnd;
return array.shift()!;
});
}

Expand Down
85 changes: 85 additions & 0 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export class StreamConsumption<T, E> extends StreamBase {
* @param options.atoms - By default, only `ok` values are serialised, however enabling this
* will serialise all values.
*
* @see {@link Stream#toReadable} if serialisation is not required
* @group Consumption
*/
serialise(options?: { single?: boolean; atoms?: boolean }): Readable {
Expand Down Expand Up @@ -133,4 +134,88 @@ export class StreamConsumption<T, E> extends StreamBase {

return s;
}

/**
* Produce a readable node stream with the values from the stream.
*
* @param kind - What kind of readable stream to produce. When "raw" only strings and buffers can be emitted on the stream. Use "object" to preserve
* objects in the readable stream. Note that object values are not serialised, they are emitted as objects.
* @param options - Options for configuring how atoms are output on the stream
*
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "raw" | "object", options?: { atoms?: boolean }): Readable;

/**
* Produce a readable node stream with the raw values from the stream
* @note the stream must only contain atoms of type `string` or `Buffer`. If not, a
* stream error will be emitted.
*
* @param options.single - Whether to emit only the first atom
*
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "raw"): Readable;

/**
* Produce a readable node stream in object mode with the values from the stream
*
* @param options.atoms - By default, only `ok` values are emitted, however enabling this
* will emit all values.
*
* @note When not using `options.atoms`, any `null` atom values will be skipped when piping to the readable stream
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "object", options?: { atoms?: boolean }): Readable;

toReadable(
kind: "raw" | "object",
options: { single?: boolean; atoms?: boolean } = {},
): Readable {
// Set up a new readable stream that does nothing
const s = new Readable({
read() {},
objectMode: kind === "object",
});

// Spin off asynchronously so that the stream can be immediately returned
(async () => {
for await (const atom of this) {
// Determine whether non-ok values should be filtered out
if (options?.atoms !== true && !isOk(atom)) {
continue;
}

// monitor for non raw values when not using object mode
if (
kind === "raw" &&
!(typeof atom.value === "string" || atom.value instanceof Buffer)
) {
const message = `Stream indicated it would emit raw values but emitted a '${typeof atom.value}' object`;
console.error(message);
s.emit("error", new Error(message));
break;
}

// Show a warning if any atom value is null
if (!options?.atoms && atom.value === null) {
console.warn(
"Stream attempted to emit a `null` value in object mode which would have ended the stream early. (Skipping)",
);
continue;
}

// Emit atom or atom value
s.push(options?.atoms ? atom : atom.value);
}

// End the stream
s.push(null);
})();

return s;
}
}
67 changes: 66 additions & 1 deletion test/consumption.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { describe, test } from "vitest";
import $ from "../src";
import { Readable } from "node:stream";

describe.concurrent("stream consumption", () => {
describe.concurrent("to array", () => {
describe.concurrent("toArray", () => {
test("values", async ({ expect }) => {
expect.assertions(1);

Expand Down Expand Up @@ -71,4 +72,68 @@ describe.concurrent("stream consumption", () => {
expect(array).toEqual([]);
});
});

describe.concurrent("toReadable", () => {
test("object values", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([1, 2, 3]).toReadable("object");
expect(stream).to.be.instanceof(Readable);

const values = await promisifyStream(stream);
expect(values).to.deep.equal([1, 2, 3]);
});

test("object atoms", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([$.ok(1), $.ok(2), $.error(3)]).toReadable("object", {
atoms: true,
});
expect(stream).to.be.instanceof(Readable);

const values = await promisifyStream(stream);
expect(values).to.deep.equal([$.ok(1), $.ok(2), $.error(3)]);
});

test("null in object stream", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([1, null, 2, 3]).toReadable("object");
expect(stream).to.be.instanceof(Readable);
const values = await promisifyStream(stream);
expect(values).to.deep.equal([1, 2, 3]);
});

test("raw values", async ({ expect }) => {
expect.assertions(2);

const stream = $.from(["hello", " ", "world"]).toReadable("raw");

expect(stream).to.be.instanceof(Readable);
const values = await promisifyStream(stream);
expect(values.join("")).to.equal("hello world");
});

test("error when using object in raw stream", async ({ expect }) => {
expect.assertions(1);

// Creating the stream wont panic
const stream = $.from([1]).toReadable("raw");

// But reading it will emit an error so this should reject
const streamPromise = promisifyStream(stream);
expect(streamPromise).rejects.toBeTruthy();
});
});
});

function promisifyStream(stream: Readable): Promise<unknown[]> {
const data: unknown[] = [];

return new Promise((resolve, reject) => {
stream.on("data", (value) => data.push(value));
stream.on("error", reject);
stream.on("end", () => resolve(data));
});
}
12 changes: 12 additions & 0 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ describe.concurrent("stream creation", () => {
expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("array with nullish values", async ({ expect }) => {
expect.assertions(1);

const s = $.fromArray([1, null, undefined]);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(null),
$.ok(undefined),
]);
});

test("don't modify original array", async ({ expect }) => {
expect.assertions(2);

Expand Down

0 comments on commit 593a0eb

Please sign in to comment.