Skip to content

Commit

Permalink
Fix: answer the _write function call back, immediately. and remove th…
Browse files Browse the repository at this point in the history
…e call backs mechanism for delayed answering.

Signed-off-by: Sajad Masjoodi <[email protected]>
  • Loading branch information
Sajad Masjoodi committed Sep 28, 2024
1 parent 2aa5d9c commit 027383b
Showing 1 changed file with 10 additions and 24 deletions.
34 changes: 10 additions & 24 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class FluentSender {
if (this._eventMode === 'Message') {
this._sendQueue = []; // queue for items waiting for being sent.
this._flushInterval = 0;
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0;
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 1000;
} else {
this._sendQueue = new Map();
this._flushInterval = options.flushInterval || 100;
Expand Down Expand Up @@ -110,10 +110,11 @@ class FluentSender {
return;
}

this._push(tag, timestamp, data, callback);
this._push(tag, timestamp, data);
this._connect(() => {
this._flushSendQueue();
});
callback()
}

end(label, data, callback) {
Expand Down Expand Up @@ -182,11 +183,10 @@ class FluentSender {
return msgpack.encode([time, data], { codec: codec });
}

_push(tag, time, data, callback) {
_push(tag, time, data) {
if (this._eventMode === 'Message') {
// Message mode
const item = this._makePacketItem(tag, time, data);
item.callback = callback;
if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) {
this._sendQueue.shift();
}
Expand All @@ -199,13 +199,10 @@ class FluentSender {
const eventEntryData = this._sendQueue.get(tag);
eventEntryData.eventEntries.push(eventEntry);
eventEntryData.size += eventEntry.length;
if (callback) eventEntryData.callbacks.push(callback);
} else {
const callbacks = callback ? [callback] : [];
this._sendQueue.set(tag, {
eventEntries: [eventEntry],
size: eventEntry.length,
callbacks: callbacks
});
}
}
Expand Down Expand Up @@ -292,7 +289,7 @@ class FluentSender {
this._socket && this._socket.destroy();
this._socket = null;
this._status = null;
this._connecting = false;
setTimeout(()=>{this._connecting = false},this.reconnectInterval)
}

_handshake(callback) {
Expand Down Expand Up @@ -375,7 +372,7 @@ class FluentSender {
// nothing written;
return;
}
this._doWrite(item.packet, item.options, timeoutId, [item.callback]);
this._doWrite(item.packet, item.options, timeoutId);
} else {
if (this._sendQueue.size === 0) {
this._flushingSendQueue = false;
Expand All @@ -398,11 +395,11 @@ class FluentSender {
eventEntryDataSize: eventEntryData.size
};
const packet = msgpack.encode([tag, entries, options], { codec: codec });
this._doWrite(packet, options, timeoutId, eventEntryData.callbacks);
this._doWrite(packet, options, timeoutId);
}
}

_doWrite(packet, options, timeoutId, callbacks) {
_doWrite(packet, options, timeoutId) {
const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize;
this._socket.write(packet, () => {
if (this.requireAckResponse) {
Expand All @@ -414,13 +411,7 @@ class FluentSender {
'ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: options.chunk }
);
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
} else { // no error on ack
callbacks.forEach((callback) => {
callback && callback();
});
this._handleEvent('error', error);
}
this._sendQueueSize -= sendPacketSize;
process.nextTick(() => {
Expand All @@ -429,15 +420,10 @@ class FluentSender {
});
timeoutId = setTimeout(() => {
const error = new FluentLoggerError.ResponseTimeout('ack response timeout');
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
this._handleEvent('error', error);
}, this.ackResponseTimeout);
} else {
this._sendQueueSize -= sendPacketSize;
callbacks.forEach((callback) => {
callback && callback();
});
process.nextTick(() => {
this._waitToWrite();
});
Expand Down

0 comments on commit 027383b

Please sign in to comment.