Skip to content

Commit

Permalink
Fixes for chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Oct 19, 2023
1 parent fba20c0 commit a9a063a
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,14 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
metadata.ChunkId <- chunkId
metadata.NumChunksFromMsg <- totalChunks
metadata.TotalChunkMsgSize <- payloadLength
let chunkStream = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream
let chunkStream = MemoryStreamManager.GetStream("chunk") :?> RecyclableMemoryStream
compressedPayload.Seek(readStartIndex, SeekOrigin.Begin) |> ignore
let chunkSize = Math.Min(maxMessageSize, payloadLength - readStartIndex)
let targetSpan = chunkStream.GetSpan(chunkSize).Slice(0, chunkSize)
compressedPayload.Read(targetSpan) |> ignore
compressedPayload.Dispose()
let writtenBytes = compressedPayload.Read(targetSpan)
chunkStream.Advance(writtenBytes)
if compressedPayload.Position = compressedPayload.Length then
compressedPayload.Dispose()
chunkStream :> MemoryStream
else
compressedPayload
Expand Down

0 comments on commit a9a063a

Please sign in to comment.