Skip to content

Commit

Permalink
Merge pull request #76 from ayeo-flex-org/task/fixSequenceId
Browse files Browse the repository at this point in the history
moved the sequenceId to increase before sending, so it always increases
  • Loading branch information
galrose authored Nov 29, 2021
2 parents 1c1722b + 5411c87 commit df6d956
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pulsar-flex",
"version": "1.0.1-beta.6",
"version": "1.0.1-beta.7",
"description": "A package that natively supports pulsar api",
"main": "src/index.js",
"scripts": {
Expand Down
4 changes: 2 additions & 2 deletions src/producer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class Producer {
throw new errors.PulsarFlexProducerCreationError({
message: 'Cannot send messages over not connected producer',
});
this._sequenceId++;
const { command } = await services.sendMessage({
producerId: this._producerId,
producerName: this._producerName,
Expand Down Expand Up @@ -160,7 +161,6 @@ class Producer {
);
});
}
this._sequenceId++;
return true;
};

Expand All @@ -182,6 +182,7 @@ class Producer {
throw new errors.PulsarFlexProducerCreationError({
message: 'Cannot send batch over not connected producer',
});
this._sequenceId++;
const { command } = await services.sendBatch({
producerId: this._producerId,
producerName: this._producerName,
Expand Down Expand Up @@ -212,7 +213,6 @@ class Producer {
);
});
}
this._sequenceId++;
return true;
};

Expand Down
30 changes: 30 additions & 0 deletions test/producer/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,36 @@ describe('Producer tests', function () {
await producer.close();
});
});
describe('on sending messages check that sequenceId increases', function () {
it('should increase the sequenceId, per message on send message', async function () {
const expectedSequenceId = 50;
const producer = new Producer({
discoveryServers,
jwt,
topic,
});
await producer.create();
for (let i = 0; i < expectedSequenceId; i++) {
await producer.sendMessage({ payload: 'galrose', properties: { sinai: 'noob' } });
}
assert.deepStrictEqual(producer._sequenceId, expectedSequenceId);
});
it('should increase the sequenceId, per batch on send batch', async function () {
const expectedSequenceId = 50;
const producer = new Producer({
discoveryServers,
jwt,
topic,
});
await producer.create();
for (let i = 0; i < expectedSequenceId; i++) {
await producer.sendBatch({
messages: [{ payload: 'galrose', properties: { sinai: 'noob' } }],
});
}
assert.deepStrictEqual(producer._sequenceId, expectedSequenceId);
});
});
describe('on sending message should contain payload and properties', function () {
it('should not throw exception', async function () {
const topic = 'public/default/testSendMessage';
Expand Down

0 comments on commit df6d956

Please sign in to comment.