Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with message payload being disposed while resending #282

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ let private serializePayloadCommand (command : BaseCommand) (metadata: MessageMe
try
return! temp.CopyToAsync(output)
finally
payload.Dispose()
temp.Dispose()
binaryWriter.Dispose()
} :> Task
Expand Down
3 changes: 2 additions & 1 deletion src/Pulsar.Client/Common/DTO.fs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ type internal PendingMessage<'T> =
CreatedAt: TimeStamp
SequenceId: SequenceId
HighestSequenceId: SequenceId
Payload: SendTask
SendTask: SendTask
Payload: MemoryStream
Callback : PendingCallback<'T>
}

Expand Down
14 changes: 9 additions & 5 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let failPendingMessages (ex: exn) =
while pendingMessages.Count > 0 do
let msg = pendingMessages.Dequeue()
msg.Payload.Dispose()
failPendingMessage msg ex
while blockedRequests.Count > 0 do
let struct(_, channel, _) = blockedRequests.Dequeue()
Expand Down Expand Up @@ -191,7 +192,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
pendingMessages.Enqueue(pendingMessage)
match connectionHandler.ConnectionState with
| Ready clientCnx ->
clientCnx.SendAndForget pendingMessage.Payload
clientCnx.SendAndForget pendingMessage.SendTask
| _ ->
Log.Logger.LogWarning("{0} not connected, skipping send", prefix)

Expand All @@ -208,12 +209,13 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
|> post this.Mb
i <- i - 1
pendingMessages.Dequeue() |> ignore
pendingMessage.Payload.Dispose()

let resendMessages (clientCnx: ClientCnx) =
if pendingMessages.Count > 0 then
Log.Logger.LogInformation("{0} resending {1} pending messages", prefix, pendingMessages.Count)
for pendingMessage in pendingMessages do
clientCnx.SendAndForget pendingMessage.Payload
clientCnx.SendAndForget pendingMessage.SendTask
else
Log.Logger.LogDebug("{0} No pending messages to resend", prefix)
producerCreatedTsc.TrySetResult() |> ignore
Expand All @@ -222,7 +224,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
backgroundTask {
use stream = MemoryStreamManager.GetStream()
use reader = new BinaryReader(stream)
let struct(send, _) = msg.Payload
let struct(send, _) = msg.SendTask
let writer = PipeWriter.Create(stream, StreamPipeWriterOptions(leaveOpen = true))
do! send writer // materialize stream
do! writer.CompleteAsync()
Expand Down Expand Up @@ -324,7 +326,8 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let pendingMessage = {
SequenceId = lowestSequenceId
HighestSequenceId = highestSequenceId
Payload = sendTask
SendTask = sendTask
Payload = encryptedBatchPayload
Callback = BatchCallbacks batchCallbacks
CreatedAt = %Stopwatch.GetTimestamp()
}
Expand Down Expand Up @@ -468,7 +471,8 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let pendingMessage = {
SequenceId = sequenceId
HighestSequenceId = %(-1L)
Payload = payload
SendTask = payload
Payload = encryptedPayload
Callback = SingleCallback (chunkDetails, message, channel)
CreatedAt = %Stopwatch.GetTimestamp()
}
Expand Down
12 changes: 12 additions & 0 deletions tests/UnitTests/Common/CommandTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,16 @@ module CommandsTests =
command.Seek.ConsumerId |> Expect.equal "" %consumerId
command.Seek.RequestId |> Expect.equal "" %requestId
}

test "newSend shouldn't dispose the payload" {
let producerId: ProducerId = % 5UL
let sequenceId: SequenceId = % 6L
let numMessages = 1
let metadata = MessageMetadata(ProducerName = "TestMe")
let payload = [| 1uy; 17uy; |]
let streamPayload = new MemoryStream(payload)

serializeDeserializePayloadCommand (newSend producerId sequenceId None numMessages metadata streamPayload) |> ignore
Expect.isTrue "Stream should not be disposed" streamPayload.CanRead
}
]
Loading