diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d925c6..63e301c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## [0.1.1] - 2015-11-16 +### Fixed +- Do not write if there are no records in the queue when the stream gets closed + ## [0.1.0] - 2015-11-16 ### Changed - Add property `records` to error events which contains the records diff --git a/index.js b/index.js index d3b527b..34e9625 100644 --- a/index.js +++ b/index.js @@ -60,21 +60,15 @@ function ElasticsearchBulkIndexWritable(client, options) { } /** - * Write items in queue to Elasticsearch + * Bulk write records to Elasticsearch * * @private + * @param {array} records * @param {Function} callback - * @return {undefined} */ -ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) { - try { - var records = transformRecords(this.queue); - } catch (error) { - return callback(error); - } - +ElasticsearchBulkIndexWritable.prototype.bulkWrite = function bulkWrite(records, callback) { if (this.logger) { - this.logger.debug('Writing %d records to Elasticsearch', this.queue.length); + this.logger.debug('Writing %d records to Elasticsearch', records.length); } this.client.bulk({ body: records }, function bulkCallback(err, data) { @@ -102,7 +96,34 @@ ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) { } if (this.logger) { - this.logger.info('Wrote %d records to Elasticsearch', this.queue.length); + this.logger.info('Wrote %d records to Elasticsearch', records.length); + } + + callback(); + }.bind(this)); +}; + +/** + * Flush method needed by the underlying stream implementation + * + * @private + * @param {Function} callback + * @return {undefined} + */ +ElasticsearchBulkIndexWritable.prototype._flush = function _flush(callback) { + if (this.queue.length === 0) { + return callback(); + } + + try { + var records = transformRecords(this.queue); + } catch (error) { + return callback(error); + } + + this.bulkWrite(records, function(err) { + if (err) { + return callback(err); } this.writtenRecords += this.queue.length; diff --git a/package.json b/package.json index 9e22914..860149a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "elasticsearch-bulk-index-stream", - "version": "0.1.0", + "version": "0.1.1", "description": "A writable stream for bulk indexing records in Elasticsearch", "main": "index.js", "scripts": { diff --git a/test/elasticsearch-bulk-stream.js b/test/elasticsearch-bulk-stream.js index b4b1a77..63a1df4 100644 --- a/test/elasticsearch-bulk-stream.js +++ b/test/elasticsearch-bulk-stream.js @@ -93,7 +93,9 @@ describe('ElastisearchBulkIndexWritable', function() { bulk: this.sinon.stub() }; - this.stream = new ElasticsearchBulkIndexWritable(this.client); + this.stream = new ElasticsearchBulkIndexWritable(this.client, { + highWaterMark: 6 + }); }); it('should write records to elasticsearch', function(done) { @@ -106,6 +108,22 @@ describe('ElastisearchBulkIndexWritable', function() { }.bind(this)); }); + it('should do nothing if there is nothing in the queue when the stream is closed', function(done) { + this.client.bulk.yields(null, successResponseFixture); + + this.stream.on('finish', function() { + expect(this.client.bulk).to.have.been.calledOnce; + + done(); + }.bind(this)); + + for (var i = 0; i < 6; i++) { + this.stream.write(recordFixture); + } + + this.stream.end(); + }); + it('should trigger error on elasticsearch error', function(done) { this.client.bulk.yields(new Error('Fail'));