From 7218360f9aeb951c9a6607b6db6101131032af07 Mon Sep 17 00:00:00 2001 From: James Elias Sigurdarson Date: Sat, 12 Jun 2021 10:45:08 +0000 Subject: [PATCH] fix socket issues Signed-off-by: James Elias Sigurdarson --- lib/sender.js | 73 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 1668cb1..0cd0566 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -299,8 +299,20 @@ class FluentSender { if (this._status === 'established') { return; } + if (!this._socket) { + const error = new FluentLoggerError.HandshakeError('socket went away before handshake'); + this._handleEvent('error', error); + this._disconnect(); + return; + } this._status = 'helo'; this._socket.once('data', (data) => { + if (!this._socket) { + const error = new FluentLoggerError.HandshakeError('socket went away during handshake'); + this._handleEvent('error', error); + this._disconnect(); + return; + } this._socket.pause(); const heloStatus = this._checkHelo(data); if (!heloStatus.succeeded) { @@ -310,6 +322,12 @@ class FluentSender { } this._status = 'pingpong'; this._socket.write(this._generatePing(), () => { + if (!this._socket) { + const error = new FluentLoggerError.HandshakeError('socket went away during ping'); + this._handleEvent('error', error); + this._disconnect(); + return; + } this._socket.resume(); this._socket.once('data', (data) => { const pongStatus = this._checkPong(data); @@ -406,33 +424,40 @@ class FluentSender { const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize; this._socket.write(packet, () => { if (this.requireAckResponse) { - this._socket.once('data', (data) => { - timeoutId && clearTimeout(timeoutId); - const response = msgpack.decode(data, { codec: codec }); - if (response.ack !== options.chunk) { - const error = new FluentLoggerError.ResponseError( - 'ack in response and chunk id in sent data are different', - { ack: response.ack, chunk: options.chunk } - ); - callbacks.forEach((callback) => { - this._handleEvent('error', error, callback); - }); - } else { // no error on ack - callbacks.forEach((callback) => { - callback && callback(); - }); - } - this._sendQueueSize -= sendPacketSize; - process.nextTick(() => { - this._waitToWrite(); - }); - }); - timeoutId = setTimeout(() => { - const error = new FluentLoggerError.ResponseTimeout('ack response timeout'); + if (!this._socket) { + const error = new FluentLoggerError.ResponseError('server went away'); callbacks.forEach((callback) => { this._handleEvent('error', error, callback); }); - }, this.ackResponseTimeout); + } else { + this._socket.once('data', (data) => { + timeoutId && clearTimeout(timeoutId); + const response = msgpack.decode(data, { codec: codec }); + if (response.ack !== options.chunk) { + const error = new FluentLoggerError.ResponseError( + 'ack in response and chunk id in sent data are different', + { ack: response.ack, chunk: options.chunk } + ); + callbacks.forEach((callback) => { + this._handleEvent('error', error, callback); + }); + } else { // no error on ack + callbacks.forEach((callback) => { + callback && callback(); + }); + } + this._sendQueueSize -= sendPacketSize; + process.nextTick(() => { + this._waitToWrite(); + }); + }); + timeoutId = setTimeout(() => { + const error = new FluentLoggerError.ResponseTimeout('ack response timeout'); + callbacks.forEach((callback) => { + this._handleEvent('error', error, callback); + }); + }, this.ackResponseTimeout); + } } else { this._sendQueueSize -= sendPacketSize; callbacks.forEach((callback) => {