diff --git a/README.md b/README.md index 853ed76..477381b 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,10 @@ const consumer = new Consumer({ receiveQueueSize: 1000, logLevel: logLevel.INFO, // you can also provide logCreator function + stateChangeHandler: ({previousState, newState}) => { + console.log(`Consumer previous state ${previousState}.`) + console.log(`Consumer new state ${newState}.`) + } }) const run = async () => { diff --git a/package.json b/package.json index f593033..af7af77 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pulsar-flex", - "version": "1.0.1-beta.9", + "version": "1.1.0-beta.0", "description": "A package that natively supports pulsar api", "main": "src/index.js", "scripts": { diff --git a/src/consumer/index.js b/src/consumer/index.js index 56c194b..5fc8df6 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -36,6 +36,7 @@ module.exports = class Consumer { reconnectInterval = 5000, logLevel, logCreator = defaultLogger, + stateChangeHandler = null, }) { this._logger = createLogger({ logLevel, logCreator }); this._client = new Pulsar({ @@ -63,6 +64,8 @@ module.exports = class Consumer { this._onMessageParams = {}; this._processTimeoutInterval = null; + this._onStateChangeHandler = stateChangeHandler; + this._receiveQueue = new PriorityQueue({ maxQueueSize: receiveQueueSize, logger: this._logger, @@ -177,12 +180,21 @@ module.exports = class Consumer { }; _setState = (state) => { + const previousState = this._consumerState; this._consumerState = state; this._logger.info( `Changing consumer state -> consumer: ${this._consumerName}(${ this._consumerId }) STATE: ${this.getState()}` ); + if (this._onStateChangeHandler) { + this._logger.debug('Executing consumer state change handler.'); + try { + this._onStateChangeHandler({ previousState, newState: this._consumerState }); + } catch (e) { + this._logger.error(`Error executing state change handler function ${e}.`); + } + } }; _setRedeliveringUnacknowledgedMessages = (redeliveringUnacknowledgedMessages) => { @@ -331,7 +343,7 @@ module.exports = class Consumer { await this._flow(this._receiveQueueSize); this._logger.trace(`Started processing messages...`); process().catch((e) => { - this._logger.error(`Error with the first process call, error: ${e}`); + this._logger.error(`Error with the first process call, error: ${e}`); }); }; }; diff --git a/test/consumer/index-spec.js b/test/consumer/index-spec.js index 1cc1b34..7bef750 100644 --- a/test/consumer/index-spec.js +++ b/test/consumer/index-spec.js @@ -74,27 +74,22 @@ describe('Consumer tests', function () { receiveQueueSize: 5, logLevel: LEVELS.INFO, }); + const consumers = [ + cons, + cons2, + sharedConsumer1, + sharedConsumer2, + unackPrioritySharedConsumer, + smallReceiveQueueConsumer, + ]; beforeEach(async function () { await utils.clearBacklog(); }); afterEach(async function () { - if (cons._isSubscribed) { - await cons.unsubscribe(); - } - if (cons2._isSubscribed) { - await cons2.unsubscribe(); - } - if (sharedConsumer1._isSubscribed) { - await sharedConsumer1.unsubscribe(); - } - if (sharedConsumer2._isSubscribed) { - await sharedConsumer2.unsubscribe(); - } - if (unackPrioritySharedConsumer._isSubscribed) { - await unackPrioritySharedConsumer.unsubscribe(); - } - if (smallReceiveQueueConsumer._isSubscribed) { - await smallReceiveQueueConsumer.unsubscribe(); + for (const consumer of consumers) { + if (consumer._isSubscribed) { + await consumer.unsubscribe(); + } } }); describe('Flow tests', function () { @@ -623,4 +618,71 @@ describe('Consumer tests', function () { assert.deepEqual(receivedMessages, ['first', 'second', 'third', 'second', 'third']); }); }); + describe('Consumer State Change Handling Tests', function () { + let stateChangeConsumerRef; + const stateChangeErrorConsumer = new Consumer({ + discoveryServers, + jwt, + topic: 'persistent://public/default/test', + subscription: 'subscription', + subType: Consumer.SUB_TYPES.FAILOVER, + consumerName: 'Consy7', + readCompacted: false, + receiveQueueSize, + logLevel: LEVELS.TRACE, + stateChangeHandler: ({ previousState, newState }) => { + throw new Error('Unexpected fake error!'); + }, + }); + afterEach(async function () { + if (stateChangeErrorConsumer._isSubscribed) { + await stateChangeErrorConsumer.unsubscribe(); + } + if (stateChangeConsumerRef._isSubscribed) { + await stateChangeConsumerRef.unsubscribe(); + } + }); + it('Should run custom state change function provided.', async function () { + const stateChanged = await new Promise(async (resolve, reject) => { + const stateChangeConsumer = new Consumer({ + discoveryServers, + jwt, + topic: 'persistent://public/default/test', + subscription: 'subscription', + subType: Consumer.SUB_TYPES.FAILOVER, + consumerName: 'Consy6', + readCompacted: false, + receiveQueueSize, + logLevel: LEVELS.INFO, + stateChangeHandler: ({ previousState, newState }) => { + if (previousState !== newState) { + resolve(true); + } + }, + }); + stateChangeConsumerRef = stateChangeConsumer; + // triggers state change + await stateChangeConsumer.subscribe(); + }); + assert.ok(stateChanged); + }); + it('Should continue reading even if custom consumer state change function throws errors.', async function () { + let expectedNumOfMessages = 20; + let actualNumOfMessages = 0; + const messages = Array(expectedNumOfMessages).fill('message'); + await utils.produceMessages({ messages }); + + await new Promise(async (resolve, reject) => { + // triggers state change + await stateChangeErrorConsumer.subscribe(); + await stateChangeErrorConsumer.run({ + onMessage: async ({ message }) => { + actualNumOfMessages++; + if (actualNumOfMessages === 10) await utils.unloadTopic(); + if (actualNumOfMessages >= expectedNumOfMessages) resolve(); + }, + }); + }); + }); + }); });