Skip to content

Commit

Permalink
moved the sequenceId to increase before sending, so it always increases
Browse files Browse the repository at this point in the history
  • Loading branch information
galrose committed Nov 29, 2021
1 parent 1c1722b commit 3d94eba
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/producer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class Producer {
throw new errors.PulsarFlexProducerCreationError({
message: 'Cannot send messages over not connected producer',
});
this._sequenceId++;
const { command } = await services.sendMessage({
producerId: this._producerId,
producerName: this._producerName,
Expand Down Expand Up @@ -160,7 +161,6 @@ class Producer {
);
});
}
this._sequenceId++;
return true;
};

Expand All @@ -182,6 +182,7 @@ class Producer {
throw new errors.PulsarFlexProducerCreationError({
message: 'Cannot send batch over not connected producer',
});
this._sequenceId++;
const { command } = await services.sendBatch({
producerId: this._producerId,
producerName: this._producerName,
Expand Down Expand Up @@ -212,7 +213,6 @@ class Producer {
);
});
}
this._sequenceId++;
return true;
};

Expand Down
30 changes: 30 additions & 0 deletions test/producer/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,36 @@ describe('Producer tests', function () {
await producer.close();
});
});
describe('on sending messages check that sequenceId increases', function () {
it('should increase the sequenceId, per message on send message', async function () {
const expectedSequenceId = 50;
const producer = new Producer({
discoveryServers,
jwt,
topic,
});
await producer.create();
for (let i = 0; i < expectedSequenceId; i++) {
await producer.sendMessage({ payload: 'galrose', properties: { sinai: 'noob' } });
}
assert.deepStrictEqual(producer._sequenceId, expectedSequenceId);
});
it('should increase the sequenceId, per batch on send batch', async function () {
const expectedSequenceId = 50;
const producer = new Producer({
discoveryServers,
jwt,
topic,
});
await producer.create();
for (let i = 0; i < expectedSequenceId; i++) {
await producer.sendBatch({
messages: [{ payload: 'galrose', properties: { sinai: 'noob' } }],
});
}
assert.deepStrictEqual(producer._sequenceId, expectedSequenceId);
});
});
describe('on sending message should contain payload and properties', function () {
it('should not throw exception', async function () {
const topic = 'public/default/testSendMessage';
Expand Down

0 comments on commit 3d94eba

Please sign in to comment.