Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams without fixed size #337

Closed
wants to merge 42 commits into from

Conversation

vadymshturkhal
Copy link

@vadymshturkhal vadymshturkhal commented Jul 24, 2022

Issue: #334

  • code is properly formatted (npm run fmt)

@vadymshturkhal
Copy link
Author

vadymshturkhal commented Jul 24, 2022

Works fine with @rohiievych metacom-streams-example.
Crucial idea is to add additional byte of delivery status after streamId in encode method in MetacomChunk.
When end() called we encode stream id and change delivery status to end and write to stream like default write().
Therefore we process delivery status in dist/metacom binary()* and lib/channel binary().

Bugs & problems:

  • Too many same constants in files;
  • In lib/streams MetaChunk decode if we delete chunkView = new Uint8Array(chunkView) and try to upload files, unexpected bug will happen;
  • We still can create stream with fixed size but it doesn't have any effect.
  • Dead code in dist/metacom and lib/channel;
  • Delivery status in dist/metacom binary() and lib/channel binary();

@rohiievych
Copy link
Member

@vadymshturkhal
It is not efficient to check every chunk only for a single end case. End command { stream: {id}, status: 'end' } is ok here. One problem I got into was related to a nodejs readable, which calls metacom writable end() and then calls write() sending a few more chunks, which are lost. It's kind of integration problem with native nodejs streams.

@vadymshturkhal
Copy link
Author

Works fine with @rohiievych metacom-streams-example.
Critical bug is here: lib/streams,MetaChunk,decode, chunkView = new Uint8Array(chunkView)

@vadymshturkhal
Copy link
Author

vadymshturkhal commented Jul 25, 2022

Works fine.
Maybe stream default size must be inf.
What should we do if stream.bytesRead > stream.size?

@rohiievych
Copy link
Member

Works fine. Maybe stream default size must be inf. What should we do if stream.bytesRead > stream.size?

You can actually make default stream size as Infinity and update it on end command.
Try this method in MetacomReadable:

updateFinalSize() {
  const sum = (length, chunk) => length + chunk.length;
  this.size = this.queue.reduce(sum, this.bytesRead);
}

If this won't work, try to count all bytes written and send { stream: {id}, status: 'end', totalBytes: {size} }. Then update size:

setFinalSize(size) {
  this.size = size;
}

@vadymshturkhal
Copy link
Author

Unfortunately, JSON can't serialize Infinity

@rohiievych
Copy link
Member

Unfortunately, JSON can't serialize Infinity

We don't need to serialize Infinity - internal only.

@vadymshturkhal
Copy link
Author

Users can create streams with a negative size.
When user calls Writer end() size will normalize.

@vadymshturkhal
Copy link
Author

vadymshturkhal commented Jul 28, 2022

If stream.bytesRead > stream.size maybe we should throw an error.

@rohiievych
Copy link
Member

Users can create streams with a negative size. When user calls Writer end() size will normalize.

This is an exception case and should be validated

lib/streams.js Outdated Show resolved Hide resolved
@vadymshturkhal
Copy link
Author

Task is done.

@tshemsedinov
Copy link
Member

@rohiievych ping, we can't move forward without your review

@rohiievych
Copy link
Member

@rohiievych ping, we can't move forward without your review

pong, will review on these days

dist/streams.js Outdated
@@ -55,8 +57,8 @@ class MetacomReadable extends EventEmitter {
await this.waitEvent(PULL_EVENT);
return this.push(data);
}
this.queue.push(data);
if (this.queue.length === 1) this.emit(PUSH_EVENT);
if (data) this.queue.push(data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain this change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain this change

data can be null

Copy link
Member

@rohiievych rohiievych Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In this case you're skipping not only null, but all nullish values like 0 or ''.
    It's better to allow all possible values by replacing null terminator with a symbol.
    Consider to add these changes:
const STREAM_ID_LENGTH = 4;
const STREAM_FINISHER = Symbol(); // Added symbol
async stop() {
  if (this.packetsRead === this.expectedPackets) {
    this.streaming = false;
    this.emit(PUSH_EVENT, STREAM_FINISHER);
    return;
  }
  await this.waitEvent(PULL_EVENT);
  await this.stop();
}
async read() {
  if (this.queue.length > 0) return this.pull();
  const finisher = await this.waitEvent(PUSH_EVENT);
  if (finisher === STREAM_FINISHER) return STREAM_FINISHER;
  return this.pull();
}
async *[Symbol.asyncIterator]() {
  while (this.streaming) {
    const chunk = await this.read();
    if (chunk === STREAM_FINISHER) return;
    yield chunk;
  }
}
  1. If you remove if (this.queue.length === 1), then stream emits PUSH_EVENT on every push, which is redundant, because readable is being read while this.queue is not empty. Why it was removed?

@vadymshturkhal
Copy link
Author

@rohiievych in this pull request implemented two issues:

  1. Remove size from stream (main);
  2. Restrict write data length with stream size (side).

@rohiievych
Copy link
Member

@vadymshturkhal
What was the bug here?
5d6b118
If you don't stop MetacomReadable, you prevent generator from return, so it stucks.

Copy link
Member

@rohiievych rohiievych left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to introduce message counters, if we already count bytes?

this.packetsRead = 0;
this.expectedPackets = 0;

Instead, we can just add expected bytes:

this.bytesRead = 0;
this.bytesExpected = 0;

lib/streams.js Outdated Show resolved Hide resolved
@vadymshturkhal
Copy link
Author

Too many questions about optional size issue.

At this point we implemented unlimited streams.
Therefore size is unused.
And what should we do with size we can decide in another issue.
As we have seen, it's to complicated to do both in one PR.

@rohiievych
I highly recommend to separate the task in two issues:

  • Implement unlimited stream;
  • Decide what to do with optional size;

@rohiievych
Copy link
Member

Too many questions about optional size issue.

At this point we implemented unlimited streams. Therefore size is unused. And what should we do with size we can decide in another issue. As we have seen, it's to complicated to do both in one PR.

@rohiievych I highly recommend to separate the task in two issues:

  • Implement unlimited stream;
  • Decide what to do with optional size;

I don't see the reason to divide this task. Unlimited stream means that there is no fixed size of data being streamed, thus you don't declare size and have to stop the stream manually. Fixed size is just a piece of information to end stream automatically.

dist/metacom.js Outdated Show resolved Hide resolved
Copy link
Member

@rohiievych rohiievych left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vadymshturkhal
Good job! We'll lend it after testing.

@tshemsedinov
Copy link
Member

@rohiievych May I land before publishing new alpha version?

@rohiievych
Copy link
Member

@rohiievych May I land before publishing new alpha version?

We should test it on different systems with several conditions, but for alpha it's ok I suppose.

@tshemsedinov
Copy link
Member

Should be checked and totally reworked if it is still actual, can't easily rebase on master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants