diff --git a/lib/implementations/batch/batch.js b/lib/implementations/batch/batch.js index 7c3c8e0..3464374 100644 --- a/lib/implementations/batch/batch.js +++ b/lib/implementations/batch/batch.js @@ -10,16 +10,16 @@ module.exports = class Batch extends Builder.Flow { if (new.target === Batch) { throw new Error('Cannot construct Batch instances directly'); } - this._storeOptions = _.pick(options, ["number", "groupBy", "attributes", "timeout"]); + this._storeOptions = _.pick(options, ["number", "groupBy", "attributes", "timeout"]); this._lambda = lambda; this._timeout = options.timeout; this._aggrigator = aggrigator } - build(app, name ) { + build(app, name) { const self = this; this._name = name; - this._store = store.get(this._name , this._storeOptions); + this._store = store.get(this._name, this._storeOptions); this._upStream = this._buildUpstream(); this._downStream = this._buildDownStream(); this._store.on('next', (batchData) => { self._process(batchData) }); @@ -27,11 +27,14 @@ module.exports = class Batch extends Builder.Flow { _process(batchData) { let data, error; + if (batchData.error) { + error = batchData.error + } try { - if (_.isArray(batchData)) { - data = this._processGroup(batchData); + if (_.isArray(batchData.result)) { + data = this._processGroup(batchData.result); } else { - data = _.values(batchData).map(batchData => this._processGroup(batchData)) + data = _.values(batchData.result).map(batchData => this._processGroup(batchData)) } } catch (err) { error = err; @@ -43,13 +46,13 @@ module.exports = class Batch extends Builder.Flow { _processGroup(groupData) { let result = { argdata: null, ids: [] }; - result.argdata = this._aggrigation(groupData,this._lambda, _.clone(this._aggrigator)); + result.argdata = this._aggrigation(groupData, this._lambda, _.clone(this._aggrigator)); result.ids = groupData.map(data => data.id); - if(_.isArray(groupData) && groupData.length > 0){ + if (_.isArray(groupData) && groupData.length > 0) { const data = groupData[0]; - if(!_.isEmpty(data)) { + if (!_.isEmpty(data)) { data.groups.map(key => { - if(_.isUndefined(result.groupedBy)) { + if (_.isUndefined(result.groupedBy)) { result.groupedBy = {}; } if (!!data[key]) { @@ -61,7 +64,7 @@ module.exports = class Batch extends Builder.Flow { return result; } - _aggrigation(data, lambda, aggrigator={}) { + _aggrigation(data, lambda, aggrigator = {}) { throw "_aggrigation() method is not implimented" } @@ -71,9 +74,6 @@ module.exports = class Batch extends Builder.Flow { return new Klass({ objectMode: true, write(message, encoding, callback) { - if (!_.isEmpty(message.error)) { - return callback(); - } trans._store.add(message); callback(); } @@ -87,7 +87,7 @@ module.exports = class Batch extends Builder.Flow { objectMode: true, readableObjectMode: true, write(data, encoding, callback) { - this.push({ data: data }); + this.push({ data, error: data.error }); callback(); }, read(size) { diff --git a/lib/implementations/lambda/sink.js b/lib/implementations/lambda/sink.js index 0611ad5..a7d494e 100644 --- a/lib/implementations/lambda/sink.js +++ b/lib/implementations/lambda/sink.js @@ -2,7 +2,7 @@ const { ProducerStream } = require('kafka-node'), { Writable } = require('stream'), { Builder } = require('../../builder'), store = require('./../../storage'); - _ = require('lodash'); +_ = require('lodash'); $ = require('steeltoe'); class Producer extends Builder.Sink { @@ -18,14 +18,11 @@ class Producer extends Builder.Sink { this._writeStream = new Klass({ objectMode: true, write(message, encoding, callback) { - if (!_.isEmpty(message.error)) { - return callback(); - } if (message.data.aggrigated) { let storage = store.get(name); if ($(message)('data')('data')() && _.isArrayLike(message.data.data)) { message.data.data.forEach(x => (storage.pop(x.ids))); - } else if ($ (message)('data')('data')('ids')()) { + } else if ($(message)('data')('data')('ids')()) { storage.pop(message.data.data.ids); } } @@ -36,6 +33,10 @@ class Producer extends Builder.Sink { return callback(); } transaction.commit(message); + } else { + if (_.isError(message.error)) { + console.log(message.error); + } } callback(); }); diff --git a/lib/storage/inmemory.js b/lib/storage/inmemory.js index f55f4ae..bcd11d0 100644 --- a/lib/storage/inmemory.js +++ b/lib/storage/inmemory.js @@ -7,6 +7,8 @@ module.exports = class Inmemory { this._maxBucketSize = options.number || Math.floor(maxRecordSize / 2); this._timeout = options.timeout; this._timerRef; + this._active = true; + this._error = null; if (this._maxBucketSize > maxRecordSize / 2) { throw `Bucket size cannot be larger than ${maxRecordSize / 2}`; } @@ -26,7 +28,7 @@ module.exports = class Inmemory { } _timer() { - setTimeout(() => { + return setTimeout(() => { if (this._currentBucket().length > 0) { this.next(); } @@ -75,10 +77,17 @@ module.exports = class Inmemory { }, ""); } }); - return result; + return { result, error: this._error }; } } + _flushBucket(error) { + this._active = false; + this._error = error; + clearTimeout(this._timerRef); + this._events.emit('next', this._getLastBucket()); + } + _moveBucket() { this._buckts.unshift([]); this._events.emit('next', this._getLastBucket()); @@ -89,12 +98,18 @@ module.exports = class Inmemory { } add(obj) { - try { - const id = this._incriment(); - this._tupels.set(id, obj); - this._addToBucket(id, obj.data); - } catch (err) { - this._events.emit('error', err) + if (_.isError(obj.error) && this._active) { + this._flushBucket(obj.error); + } else if (!this._active) { + console.error(`can't store store inactive`); + } else { + try { + const id = this._incriment(); + this._tupels.set(id, obj); + this._addToBucket(id, obj.data); + } catch (err) { + this._flushBucket(obj.error); + } } } @@ -105,7 +120,6 @@ module.exports = class Inmemory { } else { viewObj = _.clone(data); } - viewObj['id'] = id; viewObj['groups'] = this._groupBy; this._currentBucket().push(viewObj);