Skip to content

Commit

Permalink
Merge pull request #82 from ayeo-flex-org/feature/consumer-state-chan…
Browse files Browse the repository at this point in the history
…ge-handler

Consumer State Change Handling Feature
  • Loading branch information
ronfarkash authored Jan 31, 2022
2 parents 0d38a98 + e805956 commit 4986bd9
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 19 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const consumer = new Consumer({
receiveQueueSize: 1000,
logLevel: logLevel.INFO,
// you can also provide logCreator function
stateChangeHandler: ({previousState, newState}) => {
console.log(`Consumer previous state ${previousState}.`)
console.log(`Consumer new state ${newState}.`)
}
})

const run = async () => {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pulsar-flex",
"version": "1.0.1-beta.9",
"version": "1.1.0-beta.0",
"description": "A package that natively supports pulsar api",
"main": "src/index.js",
"scripts": {
Expand Down
14 changes: 13 additions & 1 deletion src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module.exports = class Consumer {
reconnectInterval = 5000,
logLevel,
logCreator = defaultLogger,
stateChangeHandler = null,
}) {
this._logger = createLogger({ logLevel, logCreator });
this._client = new Pulsar({
Expand Down Expand Up @@ -63,6 +64,8 @@ module.exports = class Consumer {
this._onMessageParams = {};
this._processTimeoutInterval = null;

this._onStateChangeHandler = stateChangeHandler;

this._receiveQueue = new PriorityQueue({
maxQueueSize: receiveQueueSize,
logger: this._logger,
Expand Down Expand Up @@ -177,12 +180,21 @@ module.exports = class Consumer {
};

_setState = (state) => {
const previousState = this._consumerState;
this._consumerState = state;
this._logger.info(
`Changing consumer state -> consumer: ${this._consumerName}(${
this._consumerId
}) STATE: ${this.getState()}`
);
if (this._onStateChangeHandler) {
this._logger.debug('Executing consumer state change handler.');
try {
this._onStateChangeHandler({ previousState, newState: this._consumerState });
} catch (e) {
this._logger.error(`Error executing state change handler function ${e}.`);
}
}
};

_setRedeliveringUnacknowledgedMessages = (redeliveringUnacknowledgedMessages) => {
Expand Down Expand Up @@ -331,7 +343,7 @@ module.exports = class Consumer {
await this._flow(this._receiveQueueSize);
this._logger.trace(`Started processing messages...`);
process().catch((e) => {
this._logger.error(`Error with the first process call, error: ${e}`);
this._logger.error(`Error with the first process call, error: ${e}`);
});
};
};
96 changes: 79 additions & 17 deletions test/consumer/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,22 @@ describe('Consumer tests', function () {
receiveQueueSize: 5,
logLevel: LEVELS.INFO,
});
const consumers = [
cons,
cons2,
sharedConsumer1,
sharedConsumer2,
unackPrioritySharedConsumer,
smallReceiveQueueConsumer,
];
beforeEach(async function () {
await utils.clearBacklog();
});
afterEach(async function () {
if (cons._isSubscribed) {
await cons.unsubscribe();
}
if (cons2._isSubscribed) {
await cons2.unsubscribe();
}
if (sharedConsumer1._isSubscribed) {
await sharedConsumer1.unsubscribe();
}
if (sharedConsumer2._isSubscribed) {
await sharedConsumer2.unsubscribe();
}
if (unackPrioritySharedConsumer._isSubscribed) {
await unackPrioritySharedConsumer.unsubscribe();
}
if (smallReceiveQueueConsumer._isSubscribed) {
await smallReceiveQueueConsumer.unsubscribe();
for (const consumer of consumers) {
if (consumer._isSubscribed) {
await consumer.unsubscribe();
}
}
});
describe('Flow tests', function () {
Expand Down Expand Up @@ -623,4 +618,71 @@ describe('Consumer tests', function () {
assert.deepEqual(receivedMessages, ['first', 'second', 'third', 'second', 'third']);
});
});
describe('Consumer State Change Handling Tests', function () {
let stateChangeConsumerRef;
const stateChangeErrorConsumer = new Consumer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
subscription: 'subscription',
subType: Consumer.SUB_TYPES.FAILOVER,
consumerName: 'Consy7',
readCompacted: false,
receiveQueueSize,
logLevel: LEVELS.TRACE,
stateChangeHandler: ({ previousState, newState }) => {
throw new Error('Unexpected fake error!');
},
});
afterEach(async function () {
if (stateChangeErrorConsumer._isSubscribed) {
await stateChangeErrorConsumer.unsubscribe();
}
if (stateChangeConsumerRef._isSubscribed) {
await stateChangeConsumerRef.unsubscribe();
}
});
it('Should run custom state change function provided.', async function () {
const stateChanged = await new Promise(async (resolve, reject) => {
const stateChangeConsumer = new Consumer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
subscription: 'subscription',
subType: Consumer.SUB_TYPES.FAILOVER,
consumerName: 'Consy6',
readCompacted: false,
receiveQueueSize,
logLevel: LEVELS.INFO,
stateChangeHandler: ({ previousState, newState }) => {
if (previousState !== newState) {
resolve(true);
}
},
});
stateChangeConsumerRef = stateChangeConsumer;
// triggers state change
await stateChangeConsumer.subscribe();
});
assert.ok(stateChanged);
});
it('Should continue reading even if custom consumer state change function throws errors.', async function () {
let expectedNumOfMessages = 20;
let actualNumOfMessages = 0;
const messages = Array(expectedNumOfMessages).fill('message');
await utils.produceMessages({ messages });

await new Promise(async (resolve, reject) => {
// triggers state change
await stateChangeErrorConsumer.subscribe();
await stateChangeErrorConsumer.run({
onMessage: async ({ message }) => {
actualNumOfMessages++;
if (actualNumOfMessages === 10) await utils.unloadTopic();
if (actualNumOfMessages >= expectedNumOfMessages) resolve();
},
});
});
});
});
});

0 comments on commit 4986bd9

Please sign in to comment.