Skip to content

Commit

Permalink
[WIP] I wasn't happy with the implemnetation so I'm tweaking it whils…
Browse files Browse the repository at this point in the history
…t working on moving stuff to eventsub
  • Loading branch information
BarryCarlyon committed May 23, 2024
1 parent 75c42c5 commit b73d851
Showing 1 changed file with 93 additions and 55 deletions.
148 changes: 93 additions & 55 deletions eventsub/conduits/eventsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,109 +14,143 @@ class eventsubSocket extends EventEmitter {
4007: "Invalid Reconnect",
};

constructor(connect) {
constructor({
url = "wss://eventsub.wss.twitch.tv/ws",
connect = false,
silenceReconnect = true,
disableAutoReconnect = false,
}) {
super();

this.silenceReconnect = silenceReconnect;
this.disableAutoReconnect = disableAutoReconnect;
this.mainUrl = url;

if (connect) {
this.connect();
}
}

mainUrl = "wss://eventsub.wss.twitch.tv/ws";
//mainUrl = "ws://127.0.0.1:8080/ws";
backoff = 0;
backoffStack = 100;

connect(url, is_reconnect) {
this.eventsub = {};
this.counter++;

url = url ? url : "wss://eventsub.wss.twitch.tv/ws";
url = url ? url : this.mainUrl;
is_reconnect = is_reconnect ? is_reconnect : false;

console.log(`Connecting to ${url}|${is_reconnect}`);
console.debug(`Connecting to ${url}`);
// this overrites and kills the old reference
this.eventsub = new WebSocket(url);
this.eventsub.is_reconnecting = is_reconnect;
this.eventsub.counter = this.counter;

this.eventsub.addEventListener("open", () => {
console.log(`Opened Connection to Twitch`);
// tidy/reset flags
this.eventsub.is_reconnecting = false;
this.backoff = 0;
console.debug(`Opened Connection to Twitch`);
});

// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close_event
// https://github.com/Luka967/websocket-close-codes
this.eventsub.addEventListener("close", (close) => {
//console.log('EventSub close', close, this.eventsub);
console.log(
// forward the close event
this.emit("close", close);

console.debug(
`${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Connection Closed: ${close.code} Reason - ${this.closeCodes[close.code]}`,
);

if (!this.eventsub.is_reconnecting) {
console.log(
`${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Is not Twitch reconnecting, Websocket reconnect`,
// 4000 well damn
// 4001 we should never get...
// 4002 make a new socket
if (close.code == 4003) {
console.debug(
"Did not subscribe to anything, the client should decide to reconnect (when it is ready)",
);
//new initSocket();
this.connect();
return;
}
if (close.code == 4004) {
// this is the old connection dying
// we should of made a new connection to the new socket
console.debug("Old Connection is 4004-ing");
return;
}
// 4005 make a new socket
// 4006 make a new socket
// 4007 make a new socket as we screwed up the reconnect?

if (close.code == 1006) {
// do a single retry
// this is wrong?
//this.eventsub.is_reconnecting = true;
// anything else we should auto reconnect
// but only if the user wants
if (this.disableAutoReconnect) {
return;
}

//console.debug(`for ${this.eventsub.counter} making new`);
this.backoff++;
console.debug("retry in", this.backoff * this.backoffStack);
setTimeout(() => {
this.connect();
}, this.backoff * this.backoffStack);
});
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event
this.eventsub.addEventListener("error", (err) => {
console.log(err);
console.log(
//console.debug(err);
console.debug(
`${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Connection Error`,
);
});
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event
this.eventsub.addEventListener("message", (message) => {
//console.log('Message');
//console.log(this.eventsub.counter, message);
//console.debug('Message');
//console.debug(this.eventsub.counter, message);
let { data } = message;
data = JSON.parse(data);

let { metadata, payload } = data;
let { message_id, message_type, message_timestamp } = metadata;
//console.log(`Recv ${message_id} - ${message_type}`);
//console.debug(`Recv ${message_id} - ${message_type}`);

switch (message_type) {
case "session_welcome":
let { session } = payload;
let { id, keepalive_timeout_seconds } = session;

console.log(`${this.eventsub.counter} This is Socket ID ${id}`);
console.debug(`${this.eventsub.counter} This is Socket ID ${id}`);
this.eventsub.twitch_websocket_id = id;

console.log(
console.debug(
`${this.eventsub.counter} This socket declared silence as ${keepalive_timeout_seconds} seconds`,
);

if (!this.eventsub.is_reconnecting) {
console.log("Dirty disconnect or first spawn");
this.emit("connected", id);
// now you would spawn your topics
} else {
// is this a reconnect?
if (is_reconnect) {
// we carried subscriptions over
this.emit("reconnected", id);
// no need to spawn topics as carried over
} else {
// now you would spawn your topics
this.emit("connected", id);
}

this.silence(keepalive_timeout_seconds);

break;
case "session_keepalive":
//console.log(`Recv KeepAlive - ${message_type}`);
//console.debug(`Recv KeepAlive - ${message_type}`);
this.emit("session_keepalive");
this.silence();
break;

case "notification":
//console.log('notification', metadata, payload);
//console.debug('notification', metadata, payload);
let { subscription } = payload;
let { type } = subscription;

// chat.message is NOISY
if (type != "channel.chat.message") {
console.log(
console.debug(
`${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Recv notification ${type}`,
);
}
Expand All @@ -130,32 +164,34 @@ class eventsubSocket extends EventEmitter {
case "session_reconnect":
this.eventsub.is_reconnecting = true;

let reconnect_url = payload.session.reconnect_url;
let { reconnect_url } = payload.session;

console.log("Connect to new url", reconnect_url);
console.log(
console.debug(
`${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Reconnect request ${reconnect_url}`,
);

//this.eventsub.close();
//new initSocket(reconnect_url, true);
this.emit("session_reconnect", reconnect_url);
// stash old socket?
//this.eventsub_dying = this.eventsub;
//this.eventsub_dying.dying = true;
// make new socket
this.connect(reconnect_url, true);

break;
case "websocket_disconnect":
console.log(`${this.eventsub.counter} Recv Disconnect`);
console.log("websocket_disconnect", payload);
console.debug(`${this.eventsub.counter} Recv Disconnect`);
console.debug("websocket_disconnect", payload);

break;

case "revocation":
console.log(`${this.eventsub.counter} Recv Topic Revocation`);
console.log("revocation", payload);
console.debug(`${this.eventsub.counter} Recv Topic Revocation`);
console.debug("revocation", payload);
this.emit("revocation", { metadata, payload });
break;

default:
console.log(`${this.eventsub.counter} unexpected`, metadata, payload);
console.debug(`${this.eventsub.counter} unexpected`, metadata, payload);
break;
}
});
Expand All @@ -179,7 +215,9 @@ class eventsubSocket extends EventEmitter {
clearTimeout(this.silenceHandler);
this.silenceHandler = setTimeout(() => {
this.emit("session_silenced"); // -> self reconnecting
this.close(); // close it and let it self loop
if (this.silenceReconnect) {
this.close(); // close it and let it self loop
}
}, this.silenceTime * 1000);
}
}
Expand Down Expand Up @@ -307,13 +345,13 @@ class Twitch extends EventEmitter {
}
if (this.twitch_client_id == "") {
// infer
console.log("Inferring CID");
console.debug("Inferring CID");
this.twitch_client_id = validateRes.client_id;
}

// check the duration left on the token
// account for legacy inifinity tokens
console.log(`The Token has ${validateRes.expires_in}`);
console.debug(`The Token has ${validateRes.expires_in}`);
if (validateRes.expires_in < 30 * 60) {
// need refresh
if (!this.infinityCheck && validateRes.expires_in == 0) {
Expand All @@ -338,19 +376,19 @@ class Twitch extends EventEmitter {
this.emit("validated", validateRes);

if (!this.allow_auto_maintain) {
console.log("allow auto maitain is off");
console.debug("allow auto maitain is off");
return;
}
console.log("allow auto maitain is on");
console.log(this.twitch_refresh);
console.log(this.twitch_client_secret);
console.debug("allow auto maitain is on");
console.debug(this.twitch_refresh);
console.debug(this.twitch_client_secret);

// initiate maintaince timer
if (this.twitch_refresh != "" || this.twitch_client_secret != "") {
var n = new Date();
console.log("now maintian", n);
console.debug("now maintian", n);
n.setMinutes(n.getMinutes() + 15);
console.log("next maintian", n);
console.debug("next maintian", n);
// we got here as a client secret exists as well
// otherwise we threw earlier
clearTimeout(this._maintainceTimer);
Expand All @@ -368,7 +406,7 @@ class Twitch extends EventEmitter {
"Accept": "application/json",
"Accept-Encoding": "gzip",
};
console.log("headers", this.headers);
console.debug("headers", this.headers);
};
setToken = (token) => {
this.twitch_token = token;
Expand Down Expand Up @@ -696,7 +734,7 @@ class Twitch extends EventEmitter {
};

logHelixResponse = (resp) => {
console.log(
console.debug(
`Helix: ${resp.status} - ${resp.headers.get("ratelimit-remaining")}/${resp.headers.get("ratelimit-limit")}`,
);
};
Expand Down

0 comments on commit b73d851

Please sign in to comment.