diff --git a/src/Pulsar.Client/Internal/ChunkedMessageTracker.fs b/src/Pulsar.Client/Internal/ChunkedMessageTracker.fs index 632b991a..b994a71a 100644 --- a/src/Pulsar.Client/Internal/ChunkedMessageTracker.fs +++ b/src/Pulsar.Client/Internal/ChunkedMessageTracker.fs @@ -3,6 +3,7 @@ namespace Pulsar.Client.Internal open System.Buffers open System.Collections.Generic open System.Diagnostics +open System.IO open Pulsar.Client.Common open FSharp.UMX open Microsoft.Extensions.Logging @@ -18,6 +19,7 @@ type internal ChunkedMessageCtx(totalChunksCount: int, totalChunksSize: int) = member this.MessageReceived(msg: RawMessage) = // append the chunked payload and update lastChunkedMessage-id chunkedMessageIds[%msg.Metadata.ChunkId] <- msg.MessageId + msg.Payload.Seek(0L, SeekOrigin.Begin) |> ignore msg.Payload.Read(chunkedMsgBuffer, currentBufferLength, int msg.Payload.Length) |> ignore currentBufferLength <- currentBufferLength + int msg.Payload.Length lastChunkId <- msg.Metadata.ChunkId diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index 4c2c279b..2fc87dbd 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -446,13 +446,17 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c metadata.ChunkId <- chunkId metadata.NumChunksFromMsg <- totalChunks metadata.TotalChunkMsgSize <- payloadLength - new MemoryStream(payloadBuffer, readStartIndex, Math.Min(maxMessageSize, payloadLength - readStartIndex) ) + let chunkStream = MemoryStreamManager.GetStream() + compressedPayload.Seek(readStartIndex, SeekOrigin.Begin) |> ignore + let chunkSize = Math.Min(maxMessageSize, payloadLength - readStartIndex) + for i in 1..chunkSize do + chunkStream.WriteByte(compressedPayload.ReadByte() |> byte) + chunkStream else compressedPayload let encryptResult = encrypt metadata chunkPayload match encryptResult with | Ok encryptedPayload -> - stats.UpdateNumMsgsSent(1, int chunkPayload.Length) let payload = Commands.newSend producerId sequenceId None 1 metadata encryptedPayload let chunkDetails = @@ -467,8 +471,9 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c Callback = SingleCallback (chunkDetails, message, channel) CreatedAt = %Stopwatch.GetTimestamp() } - lastSequenceIdPushed <- %Math.Max(%lastSequenceIdPushed, %sequenceId) + stats.UpdateNumMsgsSent(1, int encryptedPayload.Length) sendMessage pendingMessage + lastSequenceIdPushed <- %Math.Max(%lastSequenceIdPushed, %sequenceId) chunkId <- chunkId + 1 readStartIndex <- chunkId * maxMessageSize | Error ex -> diff --git a/tests/IntegrationTests/Chunks.fs b/tests/IntegrationTests/Chunks.fs index 06ff8a60..34bda178 100644 --- a/tests/IntegrationTests/Chunks.fs +++ b/tests/IntegrationTests/Chunks.fs @@ -16,7 +16,7 @@ open Serilog [] let tests = testList "Chunks" [ - + testTask "Two chunks-message delivered successfully" { Log.Debug("Started Two chunks-message delivered successfully") let client = getClient() @@ -30,14 +30,14 @@ let tests = .EnableBatching(false) .EnableChunking(true) .CompressionType(CompressionType.Snappy) - .CreateAsync() + .CreateAsync() let! (consumer : IConsumer) = client.NewConsumer() .Topic(topicName) .ConsumerName(name) .SubscriptionName("test-subscription") - .SubscribeAsync() + .SubscribeAsync() let payload = Array.zeroCreate 10_000_000 Random().NextBytes(payload) @@ -48,23 +48,23 @@ let tests = let! msgId = producer.NewMessage(payload) |> producer.SendAsync - - + + let! (msg : Message) = consumer.ReceiveAsync() - - + + Expect.equal "" msgId msg.MessageId - Expect.equal "" 1uy msg.Data.[1] - Expect.equal "" 1uy msg.Data.[8_000_000] - Expect.equal "" 0uy msg.Data.[0] - Expect.equal "" 0uy msg.Data.[9_000_000] - - do! consumer.UnsubscribeAsync() + Expect.equal "" 1uy msg.Data.[1] + Expect.equal "" 1uy msg.Data.[8_000_000] + Expect.equal "" 0uy msg.Data.[0] + Expect.equal "" 0uy msg.Data.[9_000_000] + + do! consumer.UnsubscribeAsync() do! Task.Delay 100 Log.Debug("Ended Two chunks-message delivered successfully") } - + testTask "Two parallel chunks-message delivered successfully with short queue" { Log.Debug("Started Two parallel chunks-message delivered successfully with short queue") let client = getClient() @@ -77,7 +77,7 @@ let tests = .ProducerName(name + "1") .EnableBatching(false) .EnableChunking(true) - .CreateAsync() + .CreateAsync() let! (producer2 : IProducer) = client.NewProducer() @@ -85,8 +85,8 @@ let tests = .ProducerName(name + "2") .EnableBatching(false) .EnableChunking(true) - .CreateAsync() - + .CreateAsync() + let! (consumer : IConsumer) = client.NewConsumer() .Topic(topicName) @@ -94,7 +94,7 @@ let tests = .SubscriptionName("test-subscription") .MaxPendingChunkedMessage(1) .AckTimeout(TimeSpan.FromMilliseconds(1000.0)) - .SubscribeAsync() + .SubscribeAsync() let payload1 = Array.zeroCreate 10_000_000 let payload2 = Array.zeroCreate 10_000_000 @@ -106,8 +106,8 @@ let tests = [| producer1.NewMessage(payload1)|> producer1.SendAsync producer2.NewMessage(payload2)|> producer2.SendAsync |] |> Task.WhenAll - - + + let! ([| msg1; msg2 |] : Message[]) = [| task { let! msg = consumer.ReceiveAsync() @@ -120,9 +120,9 @@ let tests = return msg } |] |> Task.WhenAll - - - let [ one; two ] = + + + let [ one; two ] = if msg1.Data.[0] = 0uy then [ msg1; msg2 ] else @@ -137,10 +137,10 @@ let tests = Expect.equal "" 0uy two.Data.[8_000_000] Expect.equal "" 1uy two.Data.[0] Expect.equal "" 1uy two.Data.[9_000_000] - - do! consumer.UnsubscribeAsync() + + do! consumer.UnsubscribeAsync() do! Task.Delay 100 - + Log.Debug("Ended Two parallel chunks-message delivered successfully with short queue") } @@ -157,7 +157,7 @@ let tests = .EnableBatching(false) .EnableChunking(true) .CompressionType(CompressionType.Snappy) - .CreateAsync() + .CreateAsync() let! (consumer : IConsumer) = client.NewConsumer() @@ -165,11 +165,11 @@ let tests = .ConsumerName(name) .StartMessageIdInclusive() .SubscriptionName("test-subscription") - .SubscribeAsync() + .SubscribeAsync() let payload = Array.zeroCreate 10_000_000 Random().NextBytes(payload) - + let! (msgIds: MessageId[]) = [| for i in 0 .. 9 do producer.NewMessage(payload) @@ -183,19 +183,19 @@ let tests = for i in 1 .. 9 do let! (msgAfterSeek : Message) = consumer.ReceiveAsync() Expect.equal "" msgIds.[i] msgAfterSeek.MessageId - + do! consumer.UnsubscribeAsync() - + let! (reader : IReader) = client.NewReader() .Topic(topicName) .StartMessageIdInclusive() .StartMessageId(msgIds.[1]) .CreateAsync() - + let! (readMsg : Message) = reader.ReadNextAsync() - Expect.equal "" msgIds.[1] readMsg.MessageId - + Expect.equal "" msgIds.[1] readMsg.MessageId + Log.Debug("Ended Seek chunk messages and receive correctly") } ] \ No newline at end of file