Skip to content

Commit

Permalink
fix socket issues
Browse files Browse the repository at this point in the history
Signed-off-by: James Elias Sigurdarson <[email protected]>
  • Loading branch information
jamiees2 committed Jun 12, 2021
1 parent 2aa5d9c commit 7218360
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,20 @@ class FluentSender {
if (this._status === 'established') {
return;
}
if (!this._socket) {
const error = new FluentLoggerError.HandshakeError('socket went away before handshake');
this._handleEvent('error', error);
this._disconnect();
return;
}
this._status = 'helo';
this._socket.once('data', (data) => {
if (!this._socket) {
const error = new FluentLoggerError.HandshakeError('socket went away during handshake');
this._handleEvent('error', error);
this._disconnect();
return;
}
this._socket.pause();
const heloStatus = this._checkHelo(data);
if (!heloStatus.succeeded) {
Expand All @@ -310,6 +322,12 @@ class FluentSender {
}
this._status = 'pingpong';
this._socket.write(this._generatePing(), () => {
if (!this._socket) {
const error = new FluentLoggerError.HandshakeError('socket went away during ping');
this._handleEvent('error', error);
this._disconnect();
return;
}
this._socket.resume();
this._socket.once('data', (data) => {
const pongStatus = this._checkPong(data);
Expand Down Expand Up @@ -406,33 +424,40 @@ class FluentSender {
const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize;
this._socket.write(packet, () => {
if (this.requireAckResponse) {
this._socket.once('data', (data) => {
timeoutId && clearTimeout(timeoutId);
const response = msgpack.decode(data, { codec: codec });
if (response.ack !== options.chunk) {
const error = new FluentLoggerError.ResponseError(
'ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: options.chunk }
);
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
} else { // no error on ack
callbacks.forEach((callback) => {
callback && callback();
});
}
this._sendQueueSize -= sendPacketSize;
process.nextTick(() => {
this._waitToWrite();
});
});
timeoutId = setTimeout(() => {
const error = new FluentLoggerError.ResponseTimeout('ack response timeout');
if (!this._socket) {
const error = new FluentLoggerError.ResponseError('server went away');
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
}, this.ackResponseTimeout);
} else {
this._socket.once('data', (data) => {
timeoutId && clearTimeout(timeoutId);
const response = msgpack.decode(data, { codec: codec });
if (response.ack !== options.chunk) {
const error = new FluentLoggerError.ResponseError(
'ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: options.chunk }
);
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
} else { // no error on ack
callbacks.forEach((callback) => {
callback && callback();
});
}
this._sendQueueSize -= sendPacketSize;
process.nextTick(() => {
this._waitToWrite();
});
});
timeoutId = setTimeout(() => {
const error = new FluentLoggerError.ResponseTimeout('ack response timeout');
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
}, this.ackResponseTimeout);
}
} else {
this._sendQueueSize -= sendPacketSize;
callbacks.forEach((callback) => {
Expand Down

0 comments on commit 7218360

Please sign in to comment.