Skip to content

Commit

Permalink
fix broken types
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq committed May 7, 2024
1 parent 06686a3 commit 52f03a1
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/loud-jeans-switch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

fix broken types
3 changes: 0 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,4 @@ export type {
// Re-export useful utility types
export type { MaybePromise, Truthy, CallbackOrStream } from "./util";

// Export the `StreamEnd` type
export { StreamEnd } from "./stream";

export default Stream;
26 changes: 14 additions & 12 deletions src/stream/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ import { Stream } from ".";
import { Readable, Writable } from "stream";
import { createNodeCallback } from "../util";

/**
* Marker for the end of a stream.
*/
export const StreamEnd = Symbol.for("STREAM_END");

/**
* Unique type to represent the stream end marker.
*/
export type StreamEnd = typeof StreamEnd;
export type StreamEnd = typeof StreamBase.StreamEnd;

export class StreamBase {
protected stream: Readable;
protected stackTrace: string[] = [];
protected traceComplete: boolean = false;

/**
* Marker for the end of a stream.
*/
static StreamEnd = Symbol.for("STREAM_END");

constructor(stream: Readable) {
this.stream = stream;
}
Expand Down Expand Up @@ -124,7 +124,7 @@ export class StreamBase {

return normalise(await promise);
} else {
return StreamEnd;
return StreamBase.StreamEnd;
}
});
}
Expand All @@ -144,7 +144,7 @@ export class StreamBase {
const { value, done } = result instanceof Promise ? await result : result;

if (done) {
return StreamEnd;
return StreamBase.StreamEnd;
} else {
return normalise(value);
}
Expand Down Expand Up @@ -178,7 +178,7 @@ export class StreamBase {
*/
static fromArray<T, E>(array: MaybeAtom<T, E>[]): Stream<T, E> {
return Stream.fromNext(async () => {
return array.shift() ?? StreamEnd;
return array.shift() ?? StreamBase.StreamEnd;
});
}

Expand All @@ -199,9 +199,11 @@ export class StreamBase {
const value = await next();

// Promise returned as normal
if (value === StreamEnd) {
if (value === StreamBase.StreamEnd) {
this.push(null);
} else {
// @ts-expect-error - The previous `if` statement doesn't cause TS to
// type-narrow out `symbol`
this.push(normalise(value));
}
} catch (e) {
Expand All @@ -226,7 +228,7 @@ export class StreamBase {
consumed = true;
return value;
} else {
return StreamEnd;
return StreamBase.StreamEnd;
}
});
}
Expand Down Expand Up @@ -285,7 +287,7 @@ export class StreamBase {
},
async final(callback) {
// Emit a `StreamEnd` to close the stream
enqueue(StreamEnd);
enqueue(StreamBase.StreamEnd);

callback();
},
Expand Down
2 changes: 1 addition & 1 deletion src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "../atom";
import { HigherOrderStream } from "./higher-order";

export { StreamEnd } from "./base";
export type { StreamEnd } from "./base";

/**
* @template T - Type of the 'values' on the stream.
Expand Down
3 changes: 1 addition & 2 deletions test/benchmarks/index.bench.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { describe, bench } from "vitest";
import Stream from "../../src";
import Highland from "highland";
import { StreamEnd } from "../../src/stream/base";
import { error, ok } from "../../src/atom";

const SAMPLE_SIZE = 10;
Expand All @@ -14,7 +13,7 @@ describe("stream creation from next function", () => {
if (i < SAMPLE_SIZE) {
return i++;
} else {
return StreamEnd;
return Stream.StreamEnd;
}
}).toArray();
});
Expand Down
8 changes: 4 additions & 4 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, test } from "vitest";
import $, { StreamEnd } from "../src";
import $ from "../src";
import { Readable } from "stream";

describe.concurrent("stream creation", () => {
Expand Down Expand Up @@ -109,7 +109,7 @@ describe.concurrent("stream creation", () => {
if (i < 4) {
return i++;
} else {
return StreamEnd;
return $.StreamEnd;
}
});

Expand All @@ -124,7 +124,7 @@ describe.concurrent("stream creation", () => {
if (atoms.length > 0) {
return atoms.shift();
} else {
return StreamEnd;
return $.StreamEnd;
}
});

Expand All @@ -151,7 +151,7 @@ describe.concurrent("stream creation", () => {
return i;
}

return StreamEnd;
return $.StreamEnd;
});

expect(await s.toArray({ atoms: true })).toEqual([
Expand Down

0 comments on commit 52f03a1

Please sign in to comment.