diff --git a/eventsub/conduits/eventsub.js b/eventsub/conduits/eventsub.js index 9b13e2c..48efbe3 100644 --- a/eventsub/conduits/eventsub.js +++ b/eventsub/conduits/eventsub.js @@ -14,109 +14,143 @@ class eventsubSocket extends EventEmitter { 4007: "Invalid Reconnect", }; - constructor(connect) { + constructor({ + url = "wss://eventsub.wss.twitch.tv/ws", + connect = false, + silenceReconnect = true, + disableAutoReconnect = false, + }) { super(); + this.silenceReconnect = silenceReconnect; + this.disableAutoReconnect = disableAutoReconnect; + this.mainUrl = url; + if (connect) { this.connect(); } } + mainUrl = "wss://eventsub.wss.twitch.tv/ws"; + //mainUrl = "ws://127.0.0.1:8080/ws"; + backoff = 0; + backoffStack = 100; + connect(url, is_reconnect) { this.eventsub = {}; this.counter++; - url = url ? url : "wss://eventsub.wss.twitch.tv/ws"; + url = url ? url : this.mainUrl; is_reconnect = is_reconnect ? is_reconnect : false; - console.log(`Connecting to ${url}|${is_reconnect}`); + console.debug(`Connecting to ${url}`); + // this overrites and kills the old reference this.eventsub = new WebSocket(url); - this.eventsub.is_reconnecting = is_reconnect; this.eventsub.counter = this.counter; this.eventsub.addEventListener("open", () => { - console.log(`Opened Connection to Twitch`); - // tidy/reset flags - this.eventsub.is_reconnecting = false; + this.backoff = 0; + console.debug(`Opened Connection to Twitch`); }); + // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close_event // https://github.com/Luka967/websocket-close-codes this.eventsub.addEventListener("close", (close) => { - //console.log('EventSub close', close, this.eventsub); - console.log( + // forward the close event + this.emit("close", close); + + console.debug( `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Connection Closed: ${close.code} Reason - ${this.closeCodes[close.code]}`, ); - if (!this.eventsub.is_reconnecting) { - console.log( - `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Is not Twitch reconnecting, Websocket reconnect`, + // 4000 well damn + // 4001 we should never get... + // 4002 make a new socket + if (close.code == 4003) { + console.debug( + "Did not subscribe to anything, the client should decide to reconnect (when it is ready)", ); - //new initSocket(); - this.connect(); + return; + } + if (close.code == 4004) { + // this is the old connection dying + // we should of made a new connection to the new socket + console.debug("Old Connection is 4004-ing"); + return; } + // 4005 make a new socket + // 4006 make a new socket + // 4007 make a new socket as we screwed up the reconnect? - if (close.code == 1006) { - // do a single retry - // this is wrong? - //this.eventsub.is_reconnecting = true; + // anything else we should auto reconnect + // but only if the user wants + if (this.disableAutoReconnect) { + return; } + + //console.debug(`for ${this.eventsub.counter} making new`); + this.backoff++; + console.debug("retry in", this.backoff * this.backoffStack); + setTimeout(() => { + this.connect(); + }, this.backoff * this.backoffStack); }); // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event this.eventsub.addEventListener("error", (err) => { - console.log(err); - console.log( + //console.debug(err); + console.debug( `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Connection Error`, ); }); // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event this.eventsub.addEventListener("message", (message) => { - //console.log('Message'); - //console.log(this.eventsub.counter, message); + //console.debug('Message'); + //console.debug(this.eventsub.counter, message); let { data } = message; data = JSON.parse(data); let { metadata, payload } = data; let { message_id, message_type, message_timestamp } = metadata; - //console.log(`Recv ${message_id} - ${message_type}`); + //console.debug(`Recv ${message_id} - ${message_type}`); switch (message_type) { case "session_welcome": let { session } = payload; let { id, keepalive_timeout_seconds } = session; - console.log(`${this.eventsub.counter} This is Socket ID ${id}`); + console.debug(`${this.eventsub.counter} This is Socket ID ${id}`); this.eventsub.twitch_websocket_id = id; - console.log( + console.debug( `${this.eventsub.counter} This socket declared silence as ${keepalive_timeout_seconds} seconds`, ); - if (!this.eventsub.is_reconnecting) { - console.log("Dirty disconnect or first spawn"); - this.emit("connected", id); - // now you would spawn your topics - } else { + // is this a reconnect? + if (is_reconnect) { + // we carried subscriptions over this.emit("reconnected", id); - // no need to spawn topics as carried over + } else { + // now you would spawn your topics + this.emit("connected", id); } this.silence(keepalive_timeout_seconds); break; case "session_keepalive": - //console.log(`Recv KeepAlive - ${message_type}`); + //console.debug(`Recv KeepAlive - ${message_type}`); this.emit("session_keepalive"); this.silence(); break; case "notification": - //console.log('notification', metadata, payload); + //console.debug('notification', metadata, payload); let { subscription } = payload; let { type } = subscription; // chat.message is NOISY if (type != "channel.chat.message") { - console.log( + console.debug( `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Recv notification ${type}`, ); } @@ -130,32 +164,34 @@ class eventsubSocket extends EventEmitter { case "session_reconnect": this.eventsub.is_reconnecting = true; - let reconnect_url = payload.session.reconnect_url; + let { reconnect_url } = payload.session; - console.log("Connect to new url", reconnect_url); - console.log( + console.debug( `${this.eventsub.twitch_websocket_id}/${this.eventsub.counter} Reconnect request ${reconnect_url}`, ); - //this.eventsub.close(); - //new initSocket(reconnect_url, true); + this.emit("session_reconnect", reconnect_url); + // stash old socket? + //this.eventsub_dying = this.eventsub; + //this.eventsub_dying.dying = true; + // make new socket this.connect(reconnect_url, true); break; case "websocket_disconnect": - console.log(`${this.eventsub.counter} Recv Disconnect`); - console.log("websocket_disconnect", payload); + console.debug(`${this.eventsub.counter} Recv Disconnect`); + console.debug("websocket_disconnect", payload); break; case "revocation": - console.log(`${this.eventsub.counter} Recv Topic Revocation`); - console.log("revocation", payload); + console.debug(`${this.eventsub.counter} Recv Topic Revocation`); + console.debug("revocation", payload); this.emit("revocation", { metadata, payload }); break; default: - console.log(`${this.eventsub.counter} unexpected`, metadata, payload); + console.debug(`${this.eventsub.counter} unexpected`, metadata, payload); break; } }); @@ -179,7 +215,9 @@ class eventsubSocket extends EventEmitter { clearTimeout(this.silenceHandler); this.silenceHandler = setTimeout(() => { this.emit("session_silenced"); // -> self reconnecting - this.close(); // close it and let it self loop + if (this.silenceReconnect) { + this.close(); // close it and let it self loop + } }, this.silenceTime * 1000); } } @@ -307,13 +345,13 @@ class Twitch extends EventEmitter { } if (this.twitch_client_id == "") { // infer - console.log("Inferring CID"); + console.debug("Inferring CID"); this.twitch_client_id = validateRes.client_id; } // check the duration left on the token // account for legacy inifinity tokens - console.log(`The Token has ${validateRes.expires_in}`); + console.debug(`The Token has ${validateRes.expires_in}`); if (validateRes.expires_in < 30 * 60) { // need refresh if (!this.infinityCheck && validateRes.expires_in == 0) { @@ -338,19 +376,19 @@ class Twitch extends EventEmitter { this.emit("validated", validateRes); if (!this.allow_auto_maintain) { - console.log("allow auto maitain is off"); + console.debug("allow auto maitain is off"); return; } - console.log("allow auto maitain is on"); - console.log(this.twitch_refresh); - console.log(this.twitch_client_secret); + console.debug("allow auto maitain is on"); + console.debug(this.twitch_refresh); + console.debug(this.twitch_client_secret); // initiate maintaince timer if (this.twitch_refresh != "" || this.twitch_client_secret != "") { var n = new Date(); - console.log("now maintian", n); + console.debug("now maintian", n); n.setMinutes(n.getMinutes() + 15); - console.log("next maintian", n); + console.debug("next maintian", n); // we got here as a client secret exists as well // otherwise we threw earlier clearTimeout(this._maintainceTimer); @@ -368,7 +406,7 @@ class Twitch extends EventEmitter { "Accept": "application/json", "Accept-Encoding": "gzip", }; - console.log("headers", this.headers); + console.debug("headers", this.headers); }; setToken = (token) => { this.twitch_token = token; @@ -696,7 +734,7 @@ class Twitch extends EventEmitter { }; logHelixResponse = (resp) => { - console.log( + console.debug( `Helix: ${resp.status} - ${resp.headers.get("ratelimit-remaining")}/${resp.headers.get("ratelimit-limit")}`, ); };