Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams without fixed size #337

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
07e6465
Implemented optional size for streams.
vadymshturkhal Jul 23, 2022
b8cf147
Added consistency.
vadymshturkhal Jul 23, 2022
6873248
Implemented end of file based on last data size.
vadymshturkhal Jul 24, 2022
34841d8
Duplicated to lib/.
vadymshturkhal Jul 24, 2022
5eef9af
Changed approach. Works fine with local example. Tests not passed.
vadymshturkhal Jul 25, 2022
1528d4d
Merge branch 'metarhia:master' into master
vadymshturkhal Jul 25, 2022
d33feab
Added return value to write() in Writable.
vadymshturkhal Jul 25, 2022
5d6b118
Fixed bug which called stop() after end.
vadymshturkhal Jul 25, 2022
cfcf94b
Bug fix duplicated.
vadymshturkhal Jul 25, 2022
bb9c444
Updated final stream size to bytes it has read.
vadymshturkhal Jul 25, 2022
ec14dd6
Created const for default stream size.
vadymshturkhal Jul 25, 2022
1156d15
Added Infinity as a default size for a stream.
vadymshturkhal Jul 26, 2022
2aae3c0
Merge branch 'metarhia:master' into master
vadymshturkhal Jul 26, 2022
34723f0
Sync with latest metacom changes.
vadymshturkhal Jul 26, 2022
4cfa4d3
Sync streams lib with latest.
vadymshturkhal Jul 26, 2022
e50d79e
Merge branch 'metarhia:master' into master
vadymshturkhal Jul 28, 2022
f723a38
Implemented size overflow checking for streams.
vadymshturkhal Jul 29, 2022
3c7317a
Fixed stream close parameter.
vadymshturkhal Jul 29, 2022
f259e0a
Update dist/metacom.js
vadymshturkhal Jul 30, 2022
349309a
Reviewed console warn.
vadymshturkhal Jul 30, 2022
bb3725e
Reviewed stream default size.
vadymshturkhal Jul 30, 2022
89126b3
Update dist/streams.js
vadymshturkhal Jul 30, 2022
4cf32bf
Added more compact comparison.
vadymshturkhal Jul 30, 2022
f9fa1bc
Merge branch 'metarhia:master' into master
vadymshturkhal Jul 30, 2022
91e1de1
Removed stream.stop() from overflow block.
vadymshturkhal Jul 30, 2022
2889d1a
Fixed timeout bug.
vadymshturkhal Jul 30, 2022
5f11378
Deleted comments.
vadymshturkhal Jul 31, 2022
a2d6c17
Fixed stream default and optional size.
vadymshturkhal Jul 31, 2022
d8098bf
Slightly simplified implementation.
vadymshturkhal Aug 3, 2022
b7568ea
Slightly simplified implementation of streams.
vadymshturkhal Aug 3, 2022
2fad94f
Renamed variable stream.packetsNeedToRead to stream.expectedPackets.
vadymshturkhal Aug 3, 2022
9450db4
Removed addPacket method.
vadymshturkhal Aug 3, 2022
13c57ea
Update dist/metacom.js
tshemsedinov Aug 3, 2022
bd82b8f
Apply suggestions from code review
tshemsedinov Aug 3, 2022
a4e9c6b
Duplicated simplification.
vadymshturkhal Aug 4, 2022
3357782
Duplicated.
vadymshturkhal Aug 4, 2022
cd48608
Slightly optimized binary method.
vadymshturkhal Aug 11, 2022
0420eff
Implemented unlimited streams.
vadymshturkhal Aug 23, 2022
c57c21b
Throw error if stream size exceeded.
vadymshturkhal Aug 25, 2022
f56ca3d
Update dist/metacom.js
tshemsedinov Aug 30, 2022
3ff5ac1
Merge branch 'metarhia:master' into master
vadymshturkhal Aug 31, 2022
f77eeaf
Changed packets to bytes.
vadymshturkhal Aug 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions dist/metacom.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export class Metacom extends EventEmitter {
} else if (callType === 'stream') {
const { stream: streamId, name, size, status } = packet;
const stream = this.streams.get(streamId);
if (name && typeof name === 'string' && Number.isSafeInteger(size)) {
if (typeof name === 'string' && name !== '') {
if (stream) {
console.error(new Error(`Stream ${name} is already initialized`));
} else {
Expand All @@ -123,6 +123,7 @@ export class Metacom extends EventEmitter {
} else if (!stream) {
console.error(new Error(`Stream ${streamId} is not initialized`));
} else if (status === 'end') {
stream.expectedPackets = packet.totalSent;
await stream.close();
this.streams.delete(streamId);
} else if (status === 'terminate') {
Expand All @@ -140,8 +141,21 @@ export class Metacom extends EventEmitter {
const byteView = new Uint8Array(buffer);
const { streamId, payload } = MetacomChunk.decode(byteView);
const stream = this.streams.get(streamId);
if (stream) await stream.push(payload);
else console.warn(`Stream ${streamId} is not initialized`);
if (!stream) {
console.warn(`Stream ${streamId} is not initialized`);
return;
}
if (stream.size < stream.bytesRead + payload.length) {
console.warn(`
Stream ${streamId} overflow occurred.
Stream size: ${stream.size},
Required size: ${stream.bytesRead + payload.length}`);
vadymshturkhal marked this conversation as resolved.
Show resolved Hide resolved
// Because we can't use payload = undefined;
stream.push();
return;
}

await stream.push(payload);
}

async load(...interfaces) {
Expand Down
31 changes: 20 additions & 11 deletions dist/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ class MetacomReadable extends EventEmitter {
super();
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.size = Number.isSafeInteger(initData.size) ? initData.size : Infinity;
this.highWaterMark = options.highWaterMark || DEFAULT_HIGH_WATER_MARK;
this.queue = [];
this.streaming = true;
this.status = null;
this.bytesRead = 0;
this.packetsRead = 0;
this.expectedPackets = 0;
this.maxListenersCount = this.getMaxListeners() - 1;
}

Expand All @@ -55,8 +57,8 @@ class MetacomReadable extends EventEmitter {
await this.waitEvent(PULL_EVENT);
return this.push(data);
}
this.queue.push(data);
if (this.queue.length === 1) this.emit(PUSH_EVENT);
if (data) this.queue.push(data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain this change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain this change

data can be null

Copy link
Member

@rohiievych rohiievych Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In this case you're skipping not only null, but all nullish values like 0 or ''.
    It's better to allow all possible values by replacing null terminator with a symbol.
    Consider to add these changes:
const STREAM_ID_LENGTH = 4;
const STREAM_FINISHER = Symbol(); // Added symbol
async stop() {
  if (this.packetsRead === this.expectedPackets) {
    this.streaming = false;
    this.emit(PUSH_EVENT, STREAM_FINISHER);
    return;
  }
  await this.waitEvent(PULL_EVENT);
  await this.stop();
}
async read() {
  if (this.queue.length > 0) return this.pull();
  const finisher = await this.waitEvent(PUSH_EVENT);
  if (finisher === STREAM_FINISHER) return STREAM_FINISHER;
  return this.pull();
}
async *[Symbol.asyncIterator]() {
  while (this.streaming) {
    const chunk = await this.read();
    if (chunk === STREAM_FINISHER) return;
    yield chunk;
  }
}
  1. If you remove if (this.queue.length === 1), then stream emits PUSH_EVENT on every push, which is redundant, because readable is being read while this.queue is not empty. Why it was removed?

this.emit(PUSH_EVENT);
return data;
}

Expand All @@ -70,7 +72,6 @@ class MetacomReadable extends EventEmitter {
this.emit('end');
writable.end();
await waitWritableEvent('close');
await this.close();
}

pipe(writable) {
Expand All @@ -97,13 +98,13 @@ class MetacomReadable extends EventEmitter {
}

async stop() {
if (this.bytesRead === this.size) {
if (this.packetsRead === this.expectedPackets) {
this.streaming = false;
this.emit(PUSH_EVENT, null);
} else {
await this.waitEvent(PULL_EVENT);
await this.stop();
return;
}
await this.waitEvent(PULL_EVENT);
await this.stop();
}

async read() {
Expand All @@ -115,7 +116,8 @@ class MetacomReadable extends EventEmitter {

pull() {
const data = this.queue.shift();
this.bytesRead += data.length;
this.bytesRead += data?.length ? data.length : 0;
this.packetsRead += 1;
this.emit(PULL_EVENT);
return data;
}
Expand Down Expand Up @@ -150,7 +152,8 @@ class MetacomWritable extends EventEmitter {
this.transport = transport;
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.size = Number.isSafeInteger(initData.size) ? initData.size : Infinity;
this.totalSent = 0;
this.init();
}

Expand All @@ -166,10 +169,16 @@ class MetacomWritable extends EventEmitter {
write(data) {
const chunk = MetacomChunk.encode(this.streamId, data);
this.transport.send(chunk);
this.totalSent += 1;
return true;
}

end() {
const packet = { stream: this.streamId, status: 'end' };
const packet = {
stream: this.streamId,
status: 'end',
totalSent: this.totalSent,
};
this.transport.send(JSON.stringify(packet));
}

Expand Down
25 changes: 17 additions & 8 deletions lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class Client {
throw new Error(`Can't send metacom streams to http transport`);
}
if (!name) throw new Error('Stream name is not provided');
if (!size) throw new Error('Stream size is not provided');
const streamId = --this.streamId;
const initData = { streamId, name, size };
const transport = channel.connection;
Expand Down Expand Up @@ -161,13 +160,22 @@ class Channel {
binary(data) {
try {
const { streamId, payload } = MetacomChunk.decode(data);
const upstream = this.client.streams.get(streamId);
if (upstream) {
upstream.push(payload);
} else {
const error = new Error(`Stream ${streamId} is not initialized`);
this.error(400, { callId: streamId, error, pass: true });
const stream = this.client.streams.get(streamId);
if (!stream) {
console.warn(`Stream ${streamId} is not initialized`);
return;
}
if (stream.size < stream.bytesRead + payload.length) {
console.warn(`
Stream ${streamId} overflow occurred.
Stream size: ${stream.size},
Required size: ${stream.bytesRead + payload.length}`);
// Because we can't use payload = undefined;
stream.push();
return;
}
// Why not await?
stream.push(payload);
} catch (error) {
this.error(400, { callId: 0, error });
}
Expand All @@ -189,7 +197,7 @@ class Channel {
async handleStreamPacket(packet) {
const { stream: streamId, name, size, status } = packet;
const stream = this.client.streams.get(streamId);
if (name && typeof name === 'string' && Number.isSafeInteger(size)) {
if (name && typeof name === 'string') {
if (stream) {
const error = new Error(`Stream ${name} is already initialized`);
this.error(400, { callId: streamId, error, pass: true });
Expand All @@ -202,6 +210,7 @@ class Channel {
const error = new Error(`Stream ${streamId} is not initialized`);
this.error(400, { callId: streamId, error, pass: true });
} else if (status === 'end') {
stream.expectedPackets = packet.totalSent;
await stream.close();
this.client.streams.delete(streamId);
} else if (status === 'terminate') {
Expand Down
30 changes: 19 additions & 11 deletions lib/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ class MetacomReadable extends EventEmitter {
super();
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.size = Number.isSafeInteger(initData.size) ? initData.size : Infinity;
this.highWaterMark = options.highWaterMark || DEFAULT_HIGH_WATER_MARK;
this.queue = [];
this.streaming = true;
this.status = null;
this.bytesRead = 0;
this.packetsRead = 0;
this.expectedPackets = 0;
this.maxListenersCount = this.getMaxListeners() - 1;
}

Expand All @@ -58,8 +60,8 @@ class MetacomReadable extends EventEmitter {
await this.waitEvent(PULL_EVENT);
return this.push(data);
}
this.queue.push(data);
if (this.queue.length === 1) this.emit(PUSH_EVENT);
if (data) this.queue.push(data);
this.emit(PUSH_EVENT);
return data;
}

Expand All @@ -73,7 +75,6 @@ class MetacomReadable extends EventEmitter {
this.emit('end');
writable.end();
await waitWritableEvent('close');
await this.close();
}

// implements nodejs readable pipe method
Expand Down Expand Up @@ -101,13 +102,13 @@ class MetacomReadable extends EventEmitter {
}

async stop() {
if (this.bytesRead === this.size) {
if (this.packetsRead === this.expectedPackets) {
this.streaming = false;
this.emit(PUSH_EVENT, null);
} else {
await this.waitEvent(PULL_EVENT);
await this.stop();
return;
}
await this.waitEvent(PULL_EVENT);
await this.stop();
}

async read() {
Expand All @@ -119,7 +120,8 @@ class MetacomReadable extends EventEmitter {

pull() {
const data = this.queue.shift();
this.bytesRead += data.length;
this.bytesRead += data?.length ? data.length : 0;
tshemsedinov marked this conversation as resolved.
Show resolved Hide resolved
this.packetsRead += 1;
this.emit(PULL_EVENT);
return data;
}
Expand Down Expand Up @@ -154,7 +156,8 @@ class MetacomWritable extends EventEmitter {
this.transport = transport;
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.size = Number.isSafeInteger(initData.size) ? initData.size : Infinity;
this.totalSent = 0;
this.init();
}

Expand All @@ -171,12 +174,17 @@ class MetacomWritable extends EventEmitter {
write(data) {
const chunk = MetacomChunk.encode(this.streamId, data);
this.transport.send(chunk);
this.totalSent += 1;
return true;
}

// implements nodejs writable end method
end() {
const packet = { stream: this.streamId, status: 'end' };
const packet = {
stream: this.streamId,
status: 'end',
totalSent: this.totalSent,
rohiievych marked this conversation as resolved.
Show resolved Hide resolved
};
this.transport.send(JSON.stringify(packet));
}

Expand Down