Skip to content

Commit

Permalink
Tests fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Oct 9, 2023
1 parent cbb9fa5 commit c232288
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 37 deletions.
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Internal/ChunkedMessageTracker.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Pulsar.Client.Internal
open System.Buffers
open System.Collections.Generic
open System.Diagnostics
open System.IO
open Pulsar.Client.Common
open FSharp.UMX
open Microsoft.Extensions.Logging
Expand All @@ -18,6 +19,7 @@ type internal ChunkedMessageCtx(totalChunksCount: int, totalChunksSize: int) =
member this.MessageReceived(msg: RawMessage) =
// append the chunked payload and update lastChunkedMessage-id
chunkedMessageIds[%msg.Metadata.ChunkId] <- msg.MessageId
msg.Payload.Seek(0L, SeekOrigin.Begin) |> ignore
msg.Payload.Read(chunkedMsgBuffer, currentBufferLength, int msg.Payload.Length) |> ignore
currentBufferLength <- currentBufferLength + int msg.Payload.Length
lastChunkId <- msg.Metadata.ChunkId
Expand Down
11 changes: 8 additions & 3 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,17 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
metadata.ChunkId <- chunkId
metadata.NumChunksFromMsg <- totalChunks
metadata.TotalChunkMsgSize <- payloadLength
new MemoryStream(payloadBuffer, readStartIndex, Math.Min(maxMessageSize, payloadLength - readStartIndex) )
let chunkStream = MemoryStreamManager.GetStream()
compressedPayload.Seek(readStartIndex, SeekOrigin.Begin) |> ignore
let chunkSize = Math.Min(maxMessageSize, payloadLength - readStartIndex)
for i in 1..chunkSize do
chunkStream.WriteByte(compressedPayload.ReadByte() |> byte)
chunkStream
else
compressedPayload
let encryptResult = encrypt metadata chunkPayload
match encryptResult with
| Ok encryptedPayload ->
stats.UpdateNumMsgsSent(1, int chunkPayload.Length)
let payload =
Commands.newSend producerId sequenceId None 1 metadata encryptedPayload
let chunkDetails =
Expand All @@ -467,8 +471,9 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
Callback = SingleCallback (chunkDetails, message, channel)
CreatedAt = %Stopwatch.GetTimestamp()
}
lastSequenceIdPushed <- %Math.Max(%lastSequenceIdPushed, %sequenceId)
stats.UpdateNumMsgsSent(1, int encryptedPayload.Length)
sendMessage pendingMessage
lastSequenceIdPushed <- %Math.Max(%lastSequenceIdPushed, %sequenceId)
chunkId <- chunkId + 1
readStartIndex <- chunkId * maxMessageSize
| Error ex ->
Expand Down
68 changes: 34 additions & 34 deletions tests/IntegrationTests/Chunks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ open Serilog
[<Tests>]
let tests =
testList "Chunks" [

testTask "Two chunks-message delivered successfully" {
Log.Debug("Started Two chunks-message delivered successfully")
let client = getClient()
Expand All @@ -30,14 +30,14 @@ let tests =
.EnableBatching(false)
.EnableChunking(true)
.CompressionType(CompressionType.Snappy)
.CreateAsync()
.CreateAsync()

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.ConsumerName(name)
.SubscriptionName("test-subscription")
.SubscribeAsync()
.SubscribeAsync()

let payload = Array.zeroCreate 10_000_000
Random().NextBytes(payload)
Expand All @@ -48,23 +48,23 @@ let tests =
let! msgId =
producer.NewMessage(payload)
|> producer.SendAsync


let! (msg : Message<byte[]>) =
consumer.ReceiveAsync()


Expect.equal "" msgId msg.MessageId
Expect.equal "" 1uy msg.Data.[1]
Expect.equal "" 1uy msg.Data.[8_000_000]
Expect.equal "" 0uy msg.Data.[0]
Expect.equal "" 0uy msg.Data.[9_000_000]
do! consumer.UnsubscribeAsync()
Expect.equal "" 1uy msg.Data.[1]
Expect.equal "" 1uy msg.Data.[8_000_000]
Expect.equal "" 0uy msg.Data.[0]
Expect.equal "" 0uy msg.Data.[9_000_000]

do! consumer.UnsubscribeAsync()
do! Task.Delay 100
Log.Debug("Ended Two chunks-message delivered successfully")
}

testTask "Two parallel chunks-message delivered successfully with short queue" {
Log.Debug("Started Two parallel chunks-message delivered successfully with short queue")
let client = getClient()
Expand All @@ -77,24 +77,24 @@ let tests =
.ProducerName(name + "1")
.EnableBatching(false)
.EnableChunking(true)
.CreateAsync()
.CreateAsync()

let! (producer2 : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.ProducerName(name + "2")
.EnableBatching(false)
.EnableChunking(true)
.CreateAsync()
.CreateAsync()

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.ConsumerName(name)
.SubscriptionName("test-subscription")
.MaxPendingChunkedMessage(1)
.AckTimeout(TimeSpan.FromMilliseconds(1000.0))
.SubscribeAsync()
.SubscribeAsync()

let payload1 = Array.zeroCreate 10_000_000
let payload2 = Array.zeroCreate 10_000_000
Expand All @@ -106,8 +106,8 @@ let tests =
[| producer1.NewMessage(payload1)|> producer1.SendAsync
producer2.NewMessage(payload2)|> producer2.SendAsync |]
|> Task.WhenAll


let! ([| msg1; msg2 |] : Message<byte[]>[]) =
[| task {
let! msg = consumer.ReceiveAsync()
Expand All @@ -120,9 +120,9 @@ let tests =
return msg
} |]
|> Task.WhenAll
let [ one; two ] =


let [ one; two ] =
if msg1.Data.[0] = 0uy then
[ msg1; msg2 ]
else
Expand All @@ -137,10 +137,10 @@ let tests =
Expect.equal "" 0uy two.Data.[8_000_000]
Expect.equal "" 1uy two.Data.[0]
Expect.equal "" 1uy two.Data.[9_000_000]
do! consumer.UnsubscribeAsync()

do! consumer.UnsubscribeAsync()
do! Task.Delay 100

Log.Debug("Ended Two parallel chunks-message delivered successfully with short queue")
}

Expand All @@ -157,19 +157,19 @@ let tests =
.EnableBatching(false)
.EnableChunking(true)
.CompressionType(CompressionType.Snappy)
.CreateAsync()
.CreateAsync()

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.ConsumerName(name)
.StartMessageIdInclusive()
.SubscriptionName("test-subscription")
.SubscribeAsync()
.SubscribeAsync()

let payload = Array.zeroCreate 10_000_000
Random().NextBytes(payload)

let! (msgIds: MessageId[]) =
[| for i in 0 .. 9 do
producer.NewMessage(payload)
Expand All @@ -183,19 +183,19 @@ let tests =
for i in 1 .. 9 do
let! (msgAfterSeek : Message<byte[]>) = consumer.ReceiveAsync()
Expect.equal "" msgIds.[i] msgAfterSeek.MessageId

do! consumer.UnsubscribeAsync()

let! (reader : IReader<byte[]>) =
client.NewReader()
.Topic(topicName)
.StartMessageIdInclusive()
.StartMessageId(msgIds.[1])
.CreateAsync()

let! (readMsg : Message<byte[]>) = reader.ReadNextAsync()
Expect.equal "" msgIds.[1] readMsg.MessageId
Expect.equal "" msgIds.[1] readMsg.MessageId

Log.Debug("Ended Seek chunk messages and receive correctly")
}
]

0 comments on commit c232288

Please sign in to comment.