Skip to content

Commit

Permalink
Graceful handling for PulsarClient.CloseAsync (#251)
Browse files Browse the repository at this point in the history
* Fix Client.CloseAsync causing exceptions in the log.

* Move exception matching inside Flatten.

* Fix typo.

* Use correct exception type
  • Loading branch information
LeoSht authored Feb 12, 2024
1 parent 76e397f commit 5db4ef0
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/Pulsar.Client/Internal/ClientCnx.fs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
let mutable numberOfRejectedRequests = 0
let mutable isActive = true
let mutable waitingForPingResponse = false
let socketReadCancellationTokenSource = new CancellationTokenSource()
let requestTimeoutTimer = new Timer()
let keepAliveTimer = new Timer()
let startRequestTimeoutTimer () =
Expand Down Expand Up @@ -809,7 +810,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,

try
while continueLooping do
let! result = reader.ReadAsync()
let! result = reader.ReadAsync(socketReadCancellationTokenSource.Token)
let buffer = result.Buffer
if result.IsCompleted then
if initialConnectionTsc.TrySetException(ConnectException("Unable to initiate connection")) then
Expand All @@ -832,10 +833,14 @@ and internal ClientCnx (config: PulsarClientConfiguration,
Log.Logger.LogError("{0} UnknownCommandType {1}, ignoring message", prefix, unknownType)
reader.AdvanceTo consumed
with Flatten ex ->
if initialConnectionTsc.TrySetException(ConnectException("Unable to initiate connection")) then
Log.Logger.LogWarning("{0} New connection was aborted", prefix)
Log.Logger.LogWarning(ex, "{0} Socket was disconnected exceptionally while reading", prefix)
post operationsMb ChannelInactive
match ex with
| :? OperationCanceledException ->
Log.Logger.LogInformation("{0} Socket read was cancelled", prefix)
| _ ->
if initialConnectionTsc.TrySetException(ConnectException("Unable to initiate connection")) then
Log.Logger.LogWarning("{0} New connection was aborted", prefix)
Log.Logger.LogWarning(ex, "{0} Socket was disconnected exceptionally while reading", prefix)
post operationsMb ChannelInactive

Log.Logger.LogDebug("{0} readSocket stopped", prefix)
} :> Task
Expand Down Expand Up @@ -912,6 +917,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
post operationsMb (AddTransactionMetaStoreHandler (transactionMetaStoreId, transactionMetaStoreOperations))

member this.Dispose() =
socketReadCancellationTokenSource.Cancel()
connection.Dispose()

override this.ToString() =
Expand Down

0 comments on commit 5db4ef0

Please sign in to comment.