Skip to content

Commit

Permalink
Duplicated to lib/.
Browse files Browse the repository at this point in the history
  • Loading branch information
vadymshturkhal committed Jul 24, 2022
1 parent 6873248 commit 34841d8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 41 deletions.
18 changes: 8 additions & 10 deletions lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ const {
MetacomChunk,
} = require('./streams.js');

const DELIVERY_STATUS_IN_PROCESS = 1;
const DELIVERY_STATUS_END = 0;
const EMPTY_PACKET = Buffer.from('{}');

class Session {
Expand Down Expand Up @@ -160,17 +158,17 @@ class Channel {
this.error(500, { error, pass: true });
}

async binary(data) {
binary(data) {
try {
const { streamId, deliveryStatus, payload } = MetacomChunk.decode(data);
const { streamId, payload } = MetacomChunk.decode(data);
if (payload.length === 0) {
const packet = { stream: streamId, status: 'end' };
this.handleStreamPacket(packet);
return;
}
const upstream = this.client.streams.get(streamId);
if (upstream) {
if (deliveryStatus === DELIVERY_STATUS_END) {
await upstream.close();
this.client.streams.delete(streamId);
} else if (deliveryStatus === DELIVERY_STATUS_IN_PROCESS) {
upstream.push(payload);
}
upstream.push(payload);
} else {
const error = new Error(`Stream ${streamId} is not initialized`);
this.error(400, { callId: streamId, error, pass: true });
Expand Down
39 changes: 8 additions & 31 deletions lib/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ const { Blob } = require('buffer');
// todo implement remote backpressure: send msg to client to pause stream

const STREAM_ID_LENGTH = 4;
const DELIVERY_STATUS_LENGTH = 1;
const DELIVERY_STATUS_IN_PROCESS = 1;
const DELIVERY_STATUS_END = 0;

const createStreamIdBuffer = (num) => {
const buffer = new ArrayBuffer(STREAM_ID_LENGTH);
Expand All @@ -22,43 +19,24 @@ const createStreamIdBuffer = (num) => {

const getStreamId = (buffer) => {
const view = new DataView(buffer);
return view.getInt32(0);
};

const createDeliveryStatus = (num) => {
const buffer = new ArrayBuffer(DELIVERY_STATUS_LENGTH);
const view = new DataView(buffer);
view.setInt8(0, num);
return buffer;
};

const getDeliveryStatus = (buffer) => {
const view = new DataView(buffer);
return view.getInt8(STREAM_ID_LENGTH);
return view.getInt32(buffer);
};

class MetacomChunk {
static encode(streamId, payload, deliverySt = DELIVERY_STATUS_IN_PROCESS) {
static encode(streamId, payload) {
const streamIdView = new Uint8Array(createStreamIdBuffer(streamId));
const deliveryStatus = new Uint8Array(createDeliveryStatus(deliverySt));
const chunkView = new Uint8Array(
STREAM_ID_LENGTH + DELIVERY_STATUS_LENGTH + payload.length,
);
const chunkView = new Uint8Array(STREAM_ID_LENGTH + payload.length);
chunkView.set(streamIdView);
chunkView.set(deliveryStatus, STREAM_ID_LENGTH);
chunkView.set(payload, STREAM_ID_LENGTH + DELIVERY_STATUS_LENGTH);
chunkView.set(payload, STREAM_ID_LENGTH);
return chunkView;
}

static decode(chunkView) {
// Need for avoid unexpected bug
// For avoid critical bug
chunkView = new Uint8Array(chunkView);
const streamId = getStreamId(chunkView.buffer);
const deliveryStatus = getDeliveryStatus(chunkView.buffer);
const payload = chunkView.subarray(
STREAM_ID_LENGTH + DELIVERY_STATUS_LENGTH,
);
return { streamId, deliveryStatus, payload };
const payload = chunkView.subarray(STREAM_ID_LENGTH);
return { streamId, payload };
}
}

Expand Down Expand Up @@ -200,8 +178,7 @@ class MetacomWritable extends EventEmitter {

// implements nodejs writable end method
end() {
const data = [];
const chunk = MetacomChunk.encode(this.streamId, data, DELIVERY_STATUS_END);
const chunk = MetacomChunk.encode(this.streamId, []);
this.transport.send(chunk);
}

Expand Down

0 comments on commit 34841d8

Please sign in to comment.