Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ReadableStream #36270

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

Patrick-ring-motive
Copy link
Contributor

Description

Added example for constructing a ReadableStream using Response object

Motivation

This a simple and concise way to construct a ReadableStream from a variety of sources. I find that it is easier to use than the actual constructor and, in my experience, most ReadableStream objects that you interact with are made this way.

Additional details

Already added at the bottom of the page here.

@Patrick-ring-motive Patrick-ring-motive requested a review from a team as a code owner October 8, 2024 18:59
@Patrick-ring-motive Patrick-ring-motive requested review from wbamberg and removed request for a team October 8, 2024 18:59
@github-actions github-actions bot added Content:WebAPI Web API docs size/s [PR only] 6-50 LoC changed labels Oct 8, 2024
Copy link
Contributor

github-actions bot commented Oct 8, 2024

Preview URLs

(comment last updated: 2024-10-08 19:20:03)

Patrick-ring-motive and others added 3 commits October 8, 2024 14:01
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
@Josh-Cena
Copy link
Member

I'm confused: why would you ever want to do this? If you have all the data available upfront anyway, you can just directly dispatch it to whatever receiver of the data without making them lazily consume it. This looks like extra overhead for no particular gains.

@Patrick-ring-motive
Copy link
Contributor Author

Patrick-ring-motive commented Oct 9, 2024

For any of the same reasons that you would make one in the way done here.
https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#convert_an_iterator_or_async_iterator_to_a_stream
Also this is one of the easiest ways to create an async iterator. There could be any number of interfaces that require an async iterator. In a scenario where you want to restrict either inputs or outputs to a ReadableStream. The original use case I had was to easily generate an error message in the form a ReadableStream to maintain a consistently typed interface. Something along the lines of

function makeStream(inputs){
  try{
    return new ReadableStream(inputs);
  }catch(e){
    return new Response(e.message).body;
  }
}

It’s a great way to spin up a failover stream with a reasonable guarantee of not throwing an error unlike the other methods.

@Josh-Cena
Copy link
Member

There are two types of data sources: eager and lazy. Lazy sources include streams and iterators, while eager sources include arrays, blobs, buffers, and others that support random access. It makes sense to transform a lazy source to another lazy source, or transform a lazy source to a eager source by collecting all received chunks, but it rarely makes sense to transform an eager source to a lazy source since you can consume everything in one go without chunking it up. Do you have a real world use case for this?

@Patrick-ring-motive
Copy link
Contributor Author

Patrick-ring-motive commented Oct 9, 2024

Yeah I needed this exact thing and when I found it I thought it was cool enough to share. I have a custom stream transformer that I'm working on improving. The above snippet was a shortened version of the scenario. Here's the relevant code.


globalThis.znewReadableStream = function znewReadableStream(){
  try{
    return new ReadableStream(...arguments);
  }catch(e){
    return new Response(e.message).body;
  }
}


String.prototype.toCharCodes = function toCharCodes() {
    let charCodeArr = [];
    for (let i = 0; i < this.length; i++) {
        const code = this.charCodeAt(i);
        charCodeArr.push(code);
    }
    return new Uint8Array(charCodeArr);
}

globalThis.zdecoder = function zdecoder() {
    if (!globalThis.decoder) {
        globalThis.decoder = new TextDecoder();
        globalThis.decoder.zdecode = function zdecode(raw) {
            try {
                return globalThis.decoder.decode(raw);
            } catch (e) {
                return e.message;
            }
        }
    }
    return globalThis.decoder;
}

globalThis.zencoder = function zencoder() {
    if (!globalThis.encoder) {
        globalThis.encoder = new TextEncoder();
        globalThis.encoder.zencode = function zencode(str) {
            try {
                return globalThis.encoder.encode(str);
            } catch (e) {
                return e.message.toCharCodes();
            }
        }
    }
    return globalThis.encoder;
}

globalThis.getReader = function getReader(stream) { 
    const r = Object.create(null);
    r.reader = stream.getReader();
    r.almostDone = false;
    return r;
}

globalThis.zgetReader = function zgetReader(stream) { 
    try{
		return getReader(stream);
	}catch(e){
		return getReader(znewReadableStream(e.message));
	}
}

globalThis.zread = async function zread(reader) {
    if (reader.almostDone) {
        try {
            reader.reader.releaseLock();
        } catch (e) {}
        return {
            value: undefined,
            done: true
        };
    }
    try {
        const rtrn = await reader.reader.read();
        if (rtrn.done) {
            try {
                reader.reader.releaseLock();
            } catch (e) {}
        }
        return rtrn;
    } catch (e) {
        reader.almostDone = true;
        return {
            value: e.message,
            done: false
        };
    }
};


globalThis.transformStream = async function transformStream(res, transform, ctx) {
    let reader = zgetReader(res.body);
    let resolveStreamProcessed;
    const streamProcessed = new Promise(resolve => resolveStreamProcessed = resolve);
    const stream = znewReadableStream({
        async start(controller) {
            let modifiedChunk = {
                value: "",
                done: false
            };
            while (true) {
                try {
                    const chunk = await (zread(reader));
                    if (chunk.done) {
                        break;
                    }
                    let encodedChunk;
                    if (!modifiedChunk.done) {
                        let decodedChunk = zdecoder().zdecode(chunk.value);
                        modifiedChunk = transform(decodedChunk);
                        encodedChunk = zencoder().zencode(modifiedChunk.value);
                    } else {
                        encodedChunk = chunk.value;
                    }
                    controller.enqueue(encodedChunk);
                } catch (e) {
                    console.log(e.message);
                    break;
                }
            }
            controller.close();
            resolveStreamProcessed();
        }
    });

    ctx.waitUntil(streamProcessed);
    tryReleaseLock(reader.reader);
    res = new Response(stream, res);
    return res;

}

Here's an example application that uses this code albeit a silly one https://stream.patrickring.net/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Content:WebAPI Web API docs size/s [PR only] 6-50 LoC changed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants