-
-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streams without fixed size #337
Conversation
Works fine with @rohiievych metacom-streams-example. Bugs & problems:
|
@vadymshturkhal |
Works fine with @rohiievych metacom-streams-example. |
Works fine. |
You can actually make default stream size as updateFinalSize() {
const sum = (length, chunk) => length + chunk.length;
this.size = this.queue.reduce(sum, this.bytesRead);
} If this won't work, try to count all bytes written and send setFinalSize(size) {
this.size = size;
} |
Unfortunately, JSON can't serialize Infinity |
We don't need to serialize |
Users can create streams with a negative size. |
If |
This is an exception case and should be validated |
Task is done. |
@rohiievych ping, we can't move forward without your review |
pong, will review on these days |
dist/streams.js
Outdated
@@ -55,8 +57,8 @@ class MetacomReadable extends EventEmitter { | |||
await this.waitEvent(PULL_EVENT); | |||
return this.push(data); | |||
} | |||
this.queue.push(data); | |||
if (this.queue.length === 1) this.emit(PUSH_EVENT); | |||
if (data) this.queue.push(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please explain this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please explain this change
data can be null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- In this case you're skipping not only
null
, but all nullish values like0
or''
.
It's better to allow all possible values by replacingnull
terminator with a symbol.
Consider to add these changes:
const STREAM_ID_LENGTH = 4;
const STREAM_FINISHER = Symbol(); // Added symbol
async stop() {
if (this.packetsRead === this.expectedPackets) {
this.streaming = false;
this.emit(PUSH_EVENT, STREAM_FINISHER);
return;
}
await this.waitEvent(PULL_EVENT);
await this.stop();
}
async read() {
if (this.queue.length > 0) return this.pull();
const finisher = await this.waitEvent(PUSH_EVENT);
if (finisher === STREAM_FINISHER) return STREAM_FINISHER;
return this.pull();
}
async *[Symbol.asyncIterator]() {
while (this.streaming) {
const chunk = await this.read();
if (chunk === STREAM_FINISHER) return;
yield chunk;
}
}
- If you remove
if (this.queue.length === 1)
, then stream emits PUSH_EVENT on every push, which is redundant, because readable is being read whilethis.queue
is not empty. Why it was removed?
@rohiievych in this pull request implemented two issues:
|
@vadymshturkhal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to introduce message counters, if we already count bytes?
this.packetsRead = 0;
this.expectedPackets = 0;
Instead, we can just add expected bytes:
this.bytesRead = 0;
this.bytesExpected = 0;
Too many questions about optional size issue. At this point we implemented unlimited streams. @rohiievych
|
I don't see the reason to divide this task. Unlimited stream means that there is no fixed size of data being streamed, thus you don't declare size and have to stop the stream manually. Fixed size is just a piece of information to end stream automatically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vadymshturkhal
Good job! We'll lend it after testing.
@rohiievych May I land before publishing new alpha version? |
We should test it on different systems with several conditions, but for alpha it's ok I suppose. |
Should be checked and totally reworked if it is still actual, can't easily rebase on master |
Issue: #334
npm run fmt
)