From 3d94ebae7817567c9f80183f5a882520ab079a21 Mon Sep 17 00:00:00 2001 From: galrose Date: Mon, 29 Nov 2021 12:14:28 +0200 Subject: [PATCH] moved the sequenceId to increase before sending, so it always increases --- src/producer/index.js | 4 ++-- test/producer/index-spec.js | 30 ++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/producer/index.js b/src/producer/index.js index c3907df..ad0fa5c 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -126,6 +126,7 @@ class Producer { throw new errors.PulsarFlexProducerCreationError({ message: 'Cannot send messages over not connected producer', }); + this._sequenceId++; const { command } = await services.sendMessage({ producerId: this._producerId, producerName: this._producerName, @@ -160,7 +161,6 @@ class Producer { ); }); } - this._sequenceId++; return true; }; @@ -182,6 +182,7 @@ class Producer { throw new errors.PulsarFlexProducerCreationError({ message: 'Cannot send batch over not connected producer', }); + this._sequenceId++; const { command } = await services.sendBatch({ producerId: this._producerId, producerName: this._producerName, @@ -212,7 +213,6 @@ class Producer { ); }); } - this._sequenceId++; return true; }; diff --git a/test/producer/index-spec.js b/test/producer/index-spec.js index ee722c9..e3aa706 100644 --- a/test/producer/index-spec.js +++ b/test/producer/index-spec.js @@ -281,6 +281,36 @@ describe('Producer tests', function () { await producer.close(); }); }); + describe('on sending messages check that sequenceId increases', function () { + it('should increase the sequenceId, per message on send message', async function () { + const expectedSequenceId = 50; + const producer = new Producer({ + discoveryServers, + jwt, + topic, + }); + await producer.create(); + for (let i = 0; i < expectedSequenceId; i++) { + await producer.sendMessage({ payload: 'galrose', properties: { sinai: 'noob' } }); + } + assert.deepStrictEqual(producer._sequenceId, expectedSequenceId); + }); + it('should increase the sequenceId, per batch on send batch', async function () { + const expectedSequenceId = 50; + const producer = new Producer({ + discoveryServers, + jwt, + topic, + }); + await producer.create(); + for (let i = 0; i < expectedSequenceId; i++) { + await producer.sendBatch({ + messages: [{ payload: 'galrose', properties: { sinai: 'noob' } }], + }); + } + assert.deepStrictEqual(producer._sequenceId, expectedSequenceId); + }); + }); describe('on sending message should contain payload and properties', function () { it('should not throw exception', async function () { const topic = 'public/default/testSendMessage';