Skip to content

Commit

Permalink
fix this issue #90. Closes underlying socket and dangling timeout (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffhendrey authored Dec 28, 2024
1 parent 019df6e commit 8509ae2
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/producer/services/close.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ const close = async ({ producerId, client, connected, requestId, responseMediato
message: 'Cannot close not connected producer',
});

const { sendSimpleCommandRequest } = client.getCnx();
const { sendSimpleCommandRequest, close } = client.getCnx();
const closeProducer = commands.closeProducer({ producerId, requestId });
const { command } = await sendSimpleCommandRequest({ command: closeProducer }, responseMediator);
if (!utils.isNil(command.error))
throw new errors.PulsarFlexProducerCloseError({ message: command.message });
close(); //close the underlying socket to ensure there are no dangling resources
};

module.exports = close;
4 changes: 3 additions & 1 deletion src/responseMediators/abstract/responseMediator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class ResponseMediator {
this._commands = [];
this._responseEvents = client.getResponseEvents();
this._timeout = timeout;
this._pendingTimeout = null;
}

_startToMediate() {
Expand All @@ -28,13 +29,14 @@ class ResponseMediator {

purgeRequests({ error }) {
Object.values(this._requests).forEach(({ reject }) => reject(new error({})));
clearTimeout(this._pendingTimeout); //ensure there are no dangling resources
}

response({ data, autoResolve }) {
const id = this._idFunc(this._parseCommand(data));
return new Promise((resolve, reject) => {
autoResolve && resolve();
setTimeout(() => reject(new errors.PulsarFlexResponseTimeoutError({ id })), this._timeout);
this._pendingTimeout = setTimeout(() => reject(new errors.PulsarFlexResponseTimeoutError({ id })), this._timeout);
this._requests[id] = { resolve, reject };
});
}
Expand Down

0 comments on commit 8509ae2

Please sign in to comment.