Skip to content

Commit

Permalink
Merge pull request #67 from ayeo-flex-org/task/batchConsumeProperties
Browse files Browse the repository at this point in the history
added properties for batch
  • Loading branch information
galrose authored Aug 30, 2021
2 parents 076bd79 + 831d5a1 commit 15eeacd
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 25 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"pretest": "npm run test-setup",
"test-setup": "node ./scripts/test-setup.js 2.8.0 60000",
"test": "mocha --recursive --timeout 60000 --exit test/",
"test-without-setup": "mocha --recursive --exit test/",
"test-without-setup": "mocha --recursive --timeout 60000 --exit test/",
"test-consumer": "mocha --timeout 30000 test/consumer/index-spec.js --exit",
"test-producer": "mocha test/producer/index-spec.js --timeout 60000 --exit",
"interact": "scripts/docker exec -it pulsar /bin/bash"
Expand Down
4 changes: 2 additions & 2 deletions src/client/resolver/data.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ const data = (emitter) => {
const { type, command } = serde.simpleCommand.deserializer(slicedBuffer);
emitter.emit(type, { command });
} else {
const { type, command, payload, metadata } =
const { type, command, messages, metadata } =
serde.payloadCommand.deserializer(slicedBuffer);
emitter.emit(type, { command, metadata, payload });
emitter.emit(type, { command, metadata, messages });
}
currentBufferIndex += expectedFrameSize;
}
Expand Down
4 changes: 2 additions & 2 deletions src/client/serde/payloadCommand/batch/deserializer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const deserializer = ({ metadata, buffer, messages }) => {
);
const objectSingleMetadata = singleMetadata.toObject();
index += singleMetadataSize;
const message = buffer.slice(index, objectSingleMetadata.payloadSize + index);
messages.push(message);
const payload = buffer.slice(index, objectSingleMetadata.payloadSize + index);
messages.push({ payload, singleMessageMetadata: objectSingleMetadata });
index += objectSingleMetadata.payloadSize;
}
return messages;
Expand Down
8 changes: 4 additions & 4 deletions src/client/serde/payloadCommand/deserializer.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const deserializer = (buffer) => {
const typeNumber = deserializedBaseCommand.getType();
const [type, command] = Object.entries(baseCommandObject)[typeNumber - 1];

const payload = deserializePayload({
const messages = deserializePayload({
metadata,
messageId: command.messageId,
buffer: payloadBuffer,
Expand All @@ -42,17 +42,17 @@ const deserializer = (buffer) => {
type,
command,
metadata,
payload,
messages,
};
};

const deserializePayload = ({ metadata, buffer, isBatch }) => {
let messages = [];
const messages = [];
if (isBatch) {
// mutating
batch.deserializer({ metadata, buffer, messages });
} else {
messages.push(buffer);
messages.push({ payload: buffer });
}

return messages;
Expand Down
8 changes: 5 additions & 3 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ module.exports = class Consumer {
this._isSubscribed = false;
this._enqueueMessage = (data) => {
// Classic for, for performance
for (let i = 0; i < data.payload.length; i++) {
for (let i = 0; i < data.messages.length; i++) {
this._receiveQueue.enqueue(
{
command: data.command,
metadata: data.metadata,
payload: data.payload[i],
metadata: data.messages[i].singleMessageMetadata
? data.messages[i].singleMessageMetadata
: data.metadata,
payload: data.messages[i].payload,
},
this._isRedeliveringUnacknowledgedMessages && this._prioritizeUnacknowledgedMessages
? 1
Expand Down
113 changes: 101 additions & 12 deletions test/consumer/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('Consumer tests', function () {
receiveQueueSize: 1000,
logLevel: LEVELS.INFO,
});
const sharedConsumer = new Consumer({
const sharedConsumer1 = new Consumer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
Expand All @@ -41,7 +41,7 @@ describe('Consumer tests', function () {
receiveQueueSize: 1000,
logLevel: LEVELS.INFO,
});
const unackPrioritySharedConsumer = new Consumer({
const sharedConsumer2 = new Consumer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
Expand All @@ -51,6 +51,17 @@ describe('Consumer tests', function () {
readCompacted: false,
receiveQueueSize: 1000,
logLevel: LEVELS.INFO,
});
const unackPrioritySharedConsumer = new Consumer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
subscription: 'subscription',
subType: Consumer.SUB_TYPES.SHARED,
consumerName: 'Consy5',
readCompacted: false,
receiveQueueSize: 1000,
logLevel: LEVELS.INFO,
prioritizeUnacknowledgedMessages: true,
});
beforeEach(async function () {
Expand All @@ -63,8 +74,11 @@ describe('Consumer tests', function () {
if (cons2._isSubscribed) {
await cons2.unsubscribe();
}
if (sharedConsumer._isSubscribed) {
await sharedConsumer.unsubscribe();
if (sharedConsumer1._isSubscribed) {
await sharedConsumer1.unsubscribe();
}
if (sharedConsumer2._isSubscribed) {
await sharedConsumer2.unsubscribe();
}
if (unackPrioritySharedConsumer._isSubscribed) {
await unackPrioritySharedConsumer.unsubscribe();
Expand Down Expand Up @@ -151,7 +165,42 @@ describe('Consumer tests', function () {
onMessage: ({ message, properties }) => {
messages.push(message.toString());
receivedProperties = properties;
console.log(properties);
if (messages.length >= expectedMessages.length) {
resolve();
}
},
});
});
assert.deepEqual(messages, expectedMessages);
assert.deepEqual(receivedProperties, properties);
} catch (e) {
console.log(e);
assert.ok(false);
}
});
it('should consume the batch successfully with headers', async function () {
try {
await cons.subscribe();
let expectedMessages = ['galrose'];
let properties = { sinai: 'noob' };
let messages = [];
let receivedProperties;

const producer = new Producer({
discoveryServers,
jwt,
topic: 'persistent://public/default/test',
});
await producer.create();
await producer.sendBatch({
messages: [{ payload: 'galrose', properties: { sinai: 'noob' } }],
});
await producer.close();
await new Promise((resolve, reject) => {
cons.run({
onMessage: ({ message, properties }) => {
messages.push(message.toString());
receivedProperties = properties;
if (messages.length >= expectedMessages.length) {
resolve();
}
Expand Down Expand Up @@ -350,7 +399,7 @@ describe('Consumer tests', function () {
});
});
describe('Redeliver unacknowledged messages', function () {
it('Should read the unacknowledged messages again, in failover consumer', async function () {
it('Should read the unacknowledged messages again, in failover subscription', async function () {
const messages = ['first', 'second', 'third'];
const receivedMessages = [];
await cons.subscribe();
Expand All @@ -370,7 +419,7 @@ describe('Consumer tests', function () {
});
assert.deepEqual(receivedMessages, ['first', 'first', 'second', 'third']);
});
it('Should read the unacknowledged messages again,in batch, in failover consumer', async function () {
it('Should read the unacknowledged messages again,in batch, in failover subscription', async function () {
const messages = ['first', 'second', 'third'];
const receivedMessages = [];
await cons.subscribe();
Expand All @@ -397,16 +446,56 @@ describe('Consumer tests', function () {
});
assert.deepEqual(receivedMessages, ['first', 'first', 'second', 'third']);
});
it('Should read the unacknowledged message again after the current flow, in shared consumer', async function () {
it('Should increase the redeliveryCount on multiple consumers in the same shared subscription', async function () {
let expectedRedeliveryCount = 0;
let receivedRedeliveryCount = 0;
let readByConsumer1 = false;
let readByConsumer2 = false;
await sharedConsumer1.subscribe();
await sharedConsumer2.subscribe();
await utils.produceMessages({ messages: ['galrose'] });
await new Promise((resolve, reject) => {
sharedConsumer1.run({
onMessage: async ({ ack, redeliveryCount }) => {
readByConsumer1 = true;
if (!readByConsumer1 || !readByConsumer2) {
expectedRedeliveryCount++;
await ack({ type: Consumer.ACK_TYPES.NEGATIVE });
} else {
receivedRedeliveryCount = redeliveryCount;
await ack({ type: Consumer.ACK_TYPES.INDIVIDUAL });
resolve();
}
},
autoAck: false,
});
sharedConsumer2.run({
onMessage: async ({ ack, redeliveryCount }) => {
readByConsumer2 = true;
if (!readByConsumer1 || !readByConsumer2) {
expectedRedeliveryCount++;
await ack({ type: Consumer.ACK_TYPES.NEGATIVE });
} else {
receivedRedeliveryCount = redeliveryCount;
await ack({ type: Consumer.ACK_TYPES.INDIVIDUAL });
resolve();
}
},
autoAck: false,
});
});
assert(expectedRedeliveryCount, receivedRedeliveryCount);
});
it('Should read the unacknowledged message again after the current flow, in shared subscription', async function () {
const messages = ['first', 'second', 'third'];
const expectedRedeliveryCount = [0, 0, 0, 1];
const receivedMessages = [];
const receivedRedeliveryCount = [];
await sharedConsumer.subscribe();
await sharedConsumer1.subscribe();
let messageCounter = 0;
await utils.produceMessages({ messages });
await new Promise((resolve, reject) => {
sharedConsumer.run({
sharedConsumer1.run({
onMessage: async ({ ack, message, redeliveryCount }) => {
receivedRedeliveryCount.push(redeliveryCount);
if (messageCounter === 0) await ack({ type: Consumer.ACK_TYPES.NEGATIVE });
Expand All @@ -421,7 +510,7 @@ describe('Consumer tests', function () {
assert.deepEqual(receivedRedeliveryCount, expectedRedeliveryCount);
assert.deepEqual(receivedMessages, ['first', 'second', 'third', 'first']);
});
it('Should read the unacknowledged message again before the rest of the flow, in shared consumer', async function () {
it('Should read the unacknowledged message again before the rest of the flow, in shared subscription', async function () {
const messages = ['first', 'second', 'third'];
const expectedRedeliveryCount = [0, 1, 0, 0];
const receivedMessages = [];
Expand All @@ -445,7 +534,7 @@ describe('Consumer tests', function () {
assert.deepEqual(receivedRedeliveryCount, expectedRedeliveryCount);
assert.deepEqual(receivedMessages, ['first', 'first', 'second', 'third']);
});
it('Should read the unacknowledged message again before the rest of the flow,in batch, in shared consumer', async function () {
it('Should read the unacknowledged message again before the rest of the flow,in batch, in shared subscription', async function () {
const messages = ['first', 'second', 'third'];
const receivedMessages = [];
await unackPrioritySharedConsumer.subscribe();
Expand Down

0 comments on commit 15eeacd

Please sign in to comment.