From 8509ae22513d200cb801c84036015545aa45d232 Mon Sep 17 00:00:00 2001 From: Geoffrey Hendrey Date: Fri, 27 Dec 2024 18:29:20 -0800 Subject: [PATCH] fix this issue https://github.com/ayeo-flex-org/pulsar-flex/issues/90. Closes underlying socket and dangling timeout (#91) --- src/producer/services/close.js | 3 ++- src/responseMediators/abstract/responseMediator.js | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/producer/services/close.js b/src/producer/services/close.js index c0e6ede..27aad2a 100644 --- a/src/producer/services/close.js +++ b/src/producer/services/close.js @@ -8,11 +8,12 @@ const close = async ({ producerId, client, connected, requestId, responseMediato message: 'Cannot close not connected producer', }); - const { sendSimpleCommandRequest } = client.getCnx(); + const { sendSimpleCommandRequest, close } = client.getCnx(); const closeProducer = commands.closeProducer({ producerId, requestId }); const { command } = await sendSimpleCommandRequest({ command: closeProducer }, responseMediator); if (!utils.isNil(command.error)) throw new errors.PulsarFlexProducerCloseError({ message: command.message }); + close(); //close the underlying socket to ensure there are no dangling resources }; module.exports = close; diff --git a/src/responseMediators/abstract/responseMediator.js b/src/responseMediators/abstract/responseMediator.js index 04ee868..622fab4 100644 --- a/src/responseMediators/abstract/responseMediator.js +++ b/src/responseMediators/abstract/responseMediator.js @@ -6,6 +6,7 @@ class ResponseMediator { this._commands = []; this._responseEvents = client.getResponseEvents(); this._timeout = timeout; + this._pendingTimeout = null; } _startToMediate() { @@ -28,13 +29,14 @@ class ResponseMediator { purgeRequests({ error }) { Object.values(this._requests).forEach(({ reject }) => reject(new error({}))); + clearTimeout(this._pendingTimeout); //ensure there are no dangling resources } response({ data, autoResolve }) { const id = this._idFunc(this._parseCommand(data)); return new Promise((resolve, reject) => { autoResolve && resolve(); - setTimeout(() => reject(new errors.PulsarFlexResponseTimeoutError({ id })), this._timeout); + this._pendingTimeout = setTimeout(() => reject(new errors.PulsarFlexResponseTimeoutError({ id })), this._timeout); this._requests[id] = { resolve, reject }; }); }