diff --git a/package-lock.json b/package-lock.json index 1e9c991..d993b94 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "pulsar-flex", - "version": "1.0.0-beta.2", + "version": "1.0.1-beta.3", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 1fcbbef..b345a08 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "pretest": "npm run test-setup", "test-setup": "node ./scripts/test-setup.js 2.8.0 60000", "test": "mocha --recursive --timeout 60000 --exit test/", - "test-without-setup": "mocha --recursive --exit test/", + "test-without-setup": "mocha --recursive --timeout 60000 --exit test/", "test-consumer": "mocha --timeout 30000 test/consumer/index-spec.js --exit", "test-producer": "mocha test/producer/index-spec.js --timeout 60000 --exit", "interact": "scripts/docker exec -it pulsar /bin/bash" diff --git a/src/client/resolver/data.js b/src/client/resolver/data.js index 86429c2..21377a5 100644 --- a/src/client/resolver/data.js +++ b/src/client/resolver/data.js @@ -39,9 +39,9 @@ const data = (emitter) => { const { type, command } = serde.simpleCommand.deserializer(slicedBuffer); emitter.emit(type, { command }); } else { - const { type, command, payload, metadata } = + const { type, command, messages, metadata } = serde.payloadCommand.deserializer(slicedBuffer); - emitter.emit(type, { command, metadata, payload }); + emitter.emit(type, { command, metadata, messages }); } currentBufferIndex += expectedFrameSize; } diff --git a/src/client/serde/payloadCommand/batch/deserializer.js b/src/client/serde/payloadCommand/batch/deserializer.js index 945dab7..ca78176 100644 --- a/src/client/serde/payloadCommand/batch/deserializer.js +++ b/src/client/serde/payloadCommand/batch/deserializer.js @@ -10,8 +10,8 @@ const deserializer = ({ metadata, buffer, messages }) => { ); const objectSingleMetadata = singleMetadata.toObject(); index += singleMetadataSize; - const message = buffer.slice(index, objectSingleMetadata.payloadSize + index); - messages.push(message); + const payload = buffer.slice(index, objectSingleMetadata.payloadSize + index); + messages.push({ payload, singleMessageMetadata: objectSingleMetadata }); index += objectSingleMetadata.payloadSize; } return messages; diff --git a/src/client/serde/payloadCommand/deserializer.js b/src/client/serde/payloadCommand/deserializer.js index 802816f..d92d1f9 100644 --- a/src/client/serde/payloadCommand/deserializer.js +++ b/src/client/serde/payloadCommand/deserializer.js @@ -31,7 +31,7 @@ const deserializer = (buffer) => { const typeNumber = deserializedBaseCommand.getType(); const [type, command] = Object.entries(baseCommandObject)[typeNumber - 1]; - const payload = deserializePayload({ + const messages = deserializePayload({ metadata, messageId: command.messageId, buffer: payloadBuffer, @@ -42,17 +42,17 @@ const deserializer = (buffer) => { type, command, metadata, - payload, + messages, }; }; const deserializePayload = ({ metadata, buffer, isBatch }) => { - let messages = []; + const messages = []; if (isBatch) { // mutating batch.deserializer({ metadata, buffer, messages }); } else { - messages.push(buffer); + messages.push({ payload: buffer }); } return messages; diff --git a/src/consumer/index.js b/src/consumer/index.js index 8d6f3ba..d7c2b97 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -75,12 +75,14 @@ module.exports = class Consumer { this._isSubscribed = false; this._enqueueMessage = (data) => { // Classic for, for performance - for (let i = 0; i < data.payload.length; i++) { + for (let i = 0; i < data.messages.length; i++) { this._receiveQueue.enqueue( { command: data.command, - metadata: data.metadata, - payload: data.payload[i], + metadata: data.messages[i].singleMessageMetadata + ? data.messages[i].singleMessageMetadata + : data.metadata, + payload: data.messages[i].payload, }, this._isRedeliveringUnacknowledgedMessages && this._prioritizeUnacknowledgedMessages ? 1 diff --git a/test/consumer/index-spec.js b/test/consumer/index-spec.js index 7fff275..277dba8 100644 --- a/test/consumer/index-spec.js +++ b/test/consumer/index-spec.js @@ -30,7 +30,7 @@ describe('Consumer tests', function () { receiveQueueSize: 1000, logLevel: LEVELS.INFO, }); - const sharedConsumer = new Consumer({ + const sharedConsumer1 = new Consumer({ discoveryServers, jwt, topic: 'persistent://public/default/test', @@ -41,7 +41,7 @@ describe('Consumer tests', function () { receiveQueueSize: 1000, logLevel: LEVELS.INFO, }); - const unackPrioritySharedConsumer = new Consumer({ + const sharedConsumer2 = new Consumer({ discoveryServers, jwt, topic: 'persistent://public/default/test', @@ -51,6 +51,17 @@ describe('Consumer tests', function () { readCompacted: false, receiveQueueSize: 1000, logLevel: LEVELS.INFO, + }); + const unackPrioritySharedConsumer = new Consumer({ + discoveryServers, + jwt, + topic: 'persistent://public/default/test', + subscription: 'subscription', + subType: Consumer.SUB_TYPES.SHARED, + consumerName: 'Consy5', + readCompacted: false, + receiveQueueSize: 1000, + logLevel: LEVELS.INFO, prioritizeUnacknowledgedMessages: true, }); beforeEach(async function () { @@ -63,8 +74,11 @@ describe('Consumer tests', function () { if (cons2._isSubscribed) { await cons2.unsubscribe(); } - if (sharedConsumer._isSubscribed) { - await sharedConsumer.unsubscribe(); + if (sharedConsumer1._isSubscribed) { + await sharedConsumer1.unsubscribe(); + } + if (sharedConsumer2._isSubscribed) { + await sharedConsumer2.unsubscribe(); } if (unackPrioritySharedConsumer._isSubscribed) { await unackPrioritySharedConsumer.unsubscribe(); @@ -151,7 +165,42 @@ describe('Consumer tests', function () { onMessage: ({ message, properties }) => { messages.push(message.toString()); receivedProperties = properties; - console.log(properties); + if (messages.length >= expectedMessages.length) { + resolve(); + } + }, + }); + }); + assert.deepEqual(messages, expectedMessages); + assert.deepEqual(receivedProperties, properties); + } catch (e) { + console.log(e); + assert.ok(false); + } + }); + it('should consume the batch successfully with headers', async function () { + try { + await cons.subscribe(); + let expectedMessages = ['galrose']; + let properties = { sinai: 'noob' }; + let messages = []; + let receivedProperties; + + const producer = new Producer({ + discoveryServers, + jwt, + topic: 'persistent://public/default/test', + }); + await producer.create(); + await producer.sendBatch({ + messages: [{ payload: 'galrose', properties: { sinai: 'noob' } }], + }); + await producer.close(); + await new Promise((resolve, reject) => { + cons.run({ + onMessage: ({ message, properties }) => { + messages.push(message.toString()); + receivedProperties = properties; if (messages.length >= expectedMessages.length) { resolve(); } @@ -350,7 +399,7 @@ describe('Consumer tests', function () { }); }); describe('Redeliver unacknowledged messages', function () { - it('Should read the unacknowledged messages again, in failover consumer', async function () { + it('Should read the unacknowledged messages again, in failover subscription', async function () { const messages = ['first', 'second', 'third']; const receivedMessages = []; await cons.subscribe(); @@ -370,7 +419,7 @@ describe('Consumer tests', function () { }); assert.deepEqual(receivedMessages, ['first', 'first', 'second', 'third']); }); - it('Should read the unacknowledged messages again,in batch, in failover consumer', async function () { + it('Should read the unacknowledged messages again,in batch, in failover subscription', async function () { const messages = ['first', 'second', 'third']; const receivedMessages = []; await cons.subscribe(); @@ -397,16 +446,56 @@ describe('Consumer tests', function () { }); assert.deepEqual(receivedMessages, ['first', 'first', 'second', 'third']); }); - it('Should read the unacknowledged message again after the current flow, in shared consumer', async function () { + it('Should increase the redeliveryCount on multiple consumers in the same shared subscription', async function () { + let expectedRedeliveryCount = 0; + let receivedRedeliveryCount = 0; + let readByConsumer1 = false; + let readByConsumer2 = false; + await sharedConsumer1.subscribe(); + await sharedConsumer2.subscribe(); + await utils.produceMessages({ messages: ['galrose'] }); + await new Promise((resolve, reject) => { + sharedConsumer1.run({ + onMessage: async ({ ack, redeliveryCount }) => { + readByConsumer1 = true; + if (!readByConsumer1 || !readByConsumer2) { + expectedRedeliveryCount++; + await ack({ type: Consumer.ACK_TYPES.NEGATIVE }); + } else { + receivedRedeliveryCount = redeliveryCount; + await ack({ type: Consumer.ACK_TYPES.INDIVIDUAL }); + resolve(); + } + }, + autoAck: false, + }); + sharedConsumer2.run({ + onMessage: async ({ ack, redeliveryCount }) => { + readByConsumer2 = true; + if (!readByConsumer1 || !readByConsumer2) { + expectedRedeliveryCount++; + await ack({ type: Consumer.ACK_TYPES.NEGATIVE }); + } else { + receivedRedeliveryCount = redeliveryCount; + await ack({ type: Consumer.ACK_TYPES.INDIVIDUAL }); + resolve(); + } + }, + autoAck: false, + }); + }); + assert(expectedRedeliveryCount, receivedRedeliveryCount); + }); + it('Should read the unacknowledged message again after the current flow, in shared subscription', async function () { const messages = ['first', 'second', 'third']; const expectedRedeliveryCount = [0, 0, 0, 1]; const receivedMessages = []; const receivedRedeliveryCount = []; - await sharedConsumer.subscribe(); + await sharedConsumer1.subscribe(); let messageCounter = 0; await utils.produceMessages({ messages }); await new Promise((resolve, reject) => { - sharedConsumer.run({ + sharedConsumer1.run({ onMessage: async ({ ack, message, redeliveryCount }) => { receivedRedeliveryCount.push(redeliveryCount); if (messageCounter === 0) await ack({ type: Consumer.ACK_TYPES.NEGATIVE }); @@ -421,7 +510,7 @@ describe('Consumer tests', function () { assert.deepEqual(receivedRedeliveryCount, expectedRedeliveryCount); assert.deepEqual(receivedMessages, ['first', 'second', 'third', 'first']); }); - it('Should read the unacknowledged message again before the rest of the flow, in shared consumer', async function () { + it('Should read the unacknowledged message again before the rest of the flow, in shared subscription', async function () { const messages = ['first', 'second', 'third']; const expectedRedeliveryCount = [0, 1, 0, 0]; const receivedMessages = []; @@ -445,7 +534,7 @@ describe('Consumer tests', function () { assert.deepEqual(receivedRedeliveryCount, expectedRedeliveryCount); assert.deepEqual(receivedMessages, ['first', 'first', 'second', 'third']); }); - it('Should read the unacknowledged message again before the rest of the flow,in batch, in shared consumer', async function () { + it('Should read the unacknowledged message again before the rest of the flow,in batch, in shared subscription', async function () { const messages = ['first', 'second', 'third']; const receivedMessages = []; await unackPrioritySharedConsumer.subscribe();