diff --git a/lib/sender.js b/lib/sender.js index 1668cb1..2909daf 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -37,7 +37,7 @@ class FluentSender { if (this._eventMode === 'Message') { this._sendQueue = []; // queue for items waiting for being sent. this._flushInterval = 0; - this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0; + this._messageQueueSizeLimit = options.messageQueueSizeLimit || 1000; } else { this._sendQueue = new Map(); this._flushInterval = options.flushInterval || 100; @@ -110,10 +110,11 @@ class FluentSender { return; } - this._push(tag, timestamp, data, callback); + this._push(tag, timestamp, data); this._connect(() => { this._flushSendQueue(); }); + callback() } end(label, data, callback) { @@ -182,11 +183,10 @@ class FluentSender { return msgpack.encode([time, data], { codec: codec }); } - _push(tag, time, data, callback) { + _push(tag, time, data) { if (this._eventMode === 'Message') { // Message mode const item = this._makePacketItem(tag, time, data); - item.callback = callback; if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) { this._sendQueue.shift(); } @@ -199,13 +199,10 @@ class FluentSender { const eventEntryData = this._sendQueue.get(tag); eventEntryData.eventEntries.push(eventEntry); eventEntryData.size += eventEntry.length; - if (callback) eventEntryData.callbacks.push(callback); } else { - const callbacks = callback ? [callback] : []; this._sendQueue.set(tag, { eventEntries: [eventEntry], size: eventEntry.length, - callbacks: callbacks }); } } @@ -292,7 +289,7 @@ class FluentSender { this._socket && this._socket.destroy(); this._socket = null; this._status = null; - this._connecting = false; + setTimeout(()=>{this._connecting = false},this.reconnectInterval) } _handshake(callback) { @@ -375,7 +372,7 @@ class FluentSender { // nothing written; return; } - this._doWrite(item.packet, item.options, timeoutId, [item.callback]); + this._doWrite(item.packet, item.options, timeoutId); } else { if (this._sendQueue.size === 0) { this._flushingSendQueue = false; @@ -398,11 +395,11 @@ class FluentSender { eventEntryDataSize: eventEntryData.size }; const packet = msgpack.encode([tag, entries, options], { codec: codec }); - this._doWrite(packet, options, timeoutId, eventEntryData.callbacks); + this._doWrite(packet, options, timeoutId); } } - _doWrite(packet, options, timeoutId, callbacks) { + _doWrite(packet, options, timeoutId) { const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize; this._socket.write(packet, () => { if (this.requireAckResponse) { @@ -414,13 +411,7 @@ class FluentSender { 'ack in response and chunk id in sent data are different', { ack: response.ack, chunk: options.chunk } ); - callbacks.forEach((callback) => { - this._handleEvent('error', error, callback); - }); - } else { // no error on ack - callbacks.forEach((callback) => { - callback && callback(); - }); + this._handleEvent('error', error); } this._sendQueueSize -= sendPacketSize; process.nextTick(() => { @@ -429,15 +420,10 @@ class FluentSender { }); timeoutId = setTimeout(() => { const error = new FluentLoggerError.ResponseTimeout('ack response timeout'); - callbacks.forEach((callback) => { - this._handleEvent('error', error, callback); - }); + this._handleEvent('error', error); }, this.ackResponseTimeout); } else { this._sendQueueSize -= sendPacketSize; - callbacks.forEach((callback) => { - callback && callback(); - }); process.nextTick(() => { this._waitToWrite(); });