diff --git a/src/Pulsar.Client.Proto/Crc32c.cs b/src/Pulsar.Client.Proto/Crc32c.cs index e3b145f1..a1cb6a6a 100644 --- a/src/Pulsar.Client.Proto/Crc32c.cs +++ b/src/Pulsar.Client.Proto/Crc32c.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.IO; using System.Numerics; using Microsoft.IO; @@ -8,19 +9,18 @@ namespace Pulsar.Client.Common { internal static class CRC32C { - internal static uint GetForRMS(RecyclableMemoryStream stream, int size) + internal static uint GetForROS(ReadOnlySequence stream) { var crc = ~0U; //0xFFFFFFFF - var memorySequence = stream.GetReadOnlySequence().Slice(stream.Position); - foreach (var memory in memorySequence) + foreach (var memory in stream) { var span = memory.Span; - CrcAlgorithm(ref size, span, ref crc); + CrcAlgorithm(span, ref crc); } return crc ^ ~0U; //0xFFFFFFFF } - private static void CrcAlgorithm(ref int size, ReadOnlySpan span, ref uint crc) + private static void CrcAlgorithm(ReadOnlySpan span, ref uint crc) { var currentBlockLength = span.Length; var i = 0; @@ -31,24 +31,12 @@ private static void CrcAlgorithm(ref int size, ReadOnlySpan span, ref uint crc = BitOperations.Crc32C(crc, batch); i+=8; } - size -= i; - while (size > 0 && i < currentBlockLength) + while (i < currentBlockLength) { crc = BitOperations.Crc32C(crc, span[i]); - size--; i++; } } - - internal static uint GetForMS(MemoryStream stream, int size) - { - var crc = ~0U; //0xFFFFFFFF - var buf = stream.GetBuffer(); - var offset = (int) stream.Position; - var span = buf.AsSpan(offset, size); - CrcAlgorithm(ref size, span, ref crc); - return crc ^ ~0U; //0xFFFFFFFF - } } } diff --git a/src/Pulsar.Client/Common/Commands.fs b/src/Pulsar.Client/Common/Commands.fs index 2e9fb1e2..2b3e689e 100644 --- a/src/Pulsar.Client/Common/Commands.fs +++ b/src/Pulsar.Client/Common/Commands.fs @@ -92,7 +92,7 @@ let private serializePayloadCommand (command : BaseCommand) (metadata: MessageMe // write CRC temp.Seek(int64 crcPayloadStart, SeekOrigin.Begin) |> ignore - let crc = int32 <| CRC32C.GetForRMS(temp, totalMetadataSize + payloadSize) + let crc = int32 <| CRC32C.GetForROS(temp.GetReadOnlySequence().Slice(temp.Position, totalMetadataSize + payloadSize)) temp.Seek(int64 crcStart, SeekOrigin.Begin) |> ignore binaryWriter.Write(int32ToBigEndian crc) diff --git a/src/Pulsar.Client/Common/Tools.fs b/src/Pulsar.Client/Common/Tools.fs index f14398d3..7c3cee81 100644 --- a/src/Pulsar.Client/Common/Tools.fs +++ b/src/Pulsar.Client/Common/Tools.fs @@ -166,8 +166,8 @@ let postAndAsyncReply (channel: Channel<'T>) f = (f tcs) |> channel.Writer.TryWrite |> ignore tcs.Task -let post (channel: Channel<'T>) = - channel.Writer.TryWrite >> ignore +let post (channel: Channel<'T>) msg = + channel.Writer.TryWrite msg |> ignore let getSpan (stream: MemoryStream) = stream.GetBuffer().AsSpan(0, int stream.Length) diff --git a/src/Pulsar.Client/Internal/ClientCnx.fs b/src/Pulsar.Client/Internal/ClientCnx.fs index 2d4cd398..6ac5b8ce 100644 --- a/src/Pulsar.Client/Internal/ClientCnx.fs +++ b/src/Pulsar.Client/Internal/ClientCnx.fs @@ -338,24 +338,32 @@ and internal ClientCnx (config: PulsarClientConfiguration, Log.Logger.LogInformation("{0} sendMb mailbox has stopped normally", prefix)) |> ignore - let readMessage (reader: BinaryReader) (stream: MemoryStream) frameLength = - reader.ReadInt16() |> int16FromBigEndian |> invalidArgIf ((<>) MagicNumber) "Invalid magicNumber" |> ignore - let messageCheckSum = reader.ReadInt32() |> int32FromBigEndian - let metadataPointer = stream.Position - let metadata = Serializer.DeserializeWithLengthPrefix(stream, PrefixStyle.Fixed32BigEndian) - let payloadPointer = stream.Position - let metadataLength = payloadPointer - metadataPointer |> int - let payloadLength = frameLength - (int payloadPointer) + let readMessage (reader: byref>) frameLength = + let stream = reader.Sequence + let mutable magicNumber = -1s + if (SequenceReaderExtensions.TryReadBigEndian(&reader, &magicNumber) |> not) + || magicNumber <> MagicNumber then + raise <| ArgumentException("Invalid magicNumber") + let mutable messageCheckSum = -1 + SequenceReaderExtensions.TryReadBigEndian(&reader, &messageCheckSum) |> ignore + let metadataPointer = int reader.Consumed + let mutable metadataLength = -1 + SequenceReaderExtensions.TryReadBigEndian(&reader, &metadataLength) |> ignore + let metadata = Serializer.Deserialize(reader.Sequence.Slice(reader.Consumed, metadataLength)) + reader.Advance(metadataLength) + let payloadPointer = int reader.Consumed + let payloadLength = frameLength - payloadPointer let payload = MemoryStreamManager.GetStream("payload", payloadLength) - let payloadBytes = stream.GetBuffer().AsSpan().Slice(int payloadPointer, payloadLength) - payload.Write(payloadBytes) - stream.Seek(metadataPointer, SeekOrigin.Begin) |> ignore - let calculatedCheckSum = CRC32C.GetForMS(stream, metadataLength + payloadLength) |> int32 + let payloadBytes = reader.Sequence.Slice(payloadPointer, payloadLength) + for segment in payloadBytes do + payload.Write(segment.Span) + let size = frameLength-metadataPointer + let calculatedCheckSum = CRC32C.GetForROS(stream.Slice(metadataPointer, size)) |> int32 if (messageCheckSum <> calculatedCheckSum) then Log.Logger.LogError("{0} Invalid checksum. Received: {1} Calculated: {2}", prefix, messageCheckSum, calculatedCheckSum) (metadata, payload, messageCheckSum = calculatedCheckSum) - let readCommand (command: BaseCommand) reader stream frameLength = + let readCommand (command: BaseCommand) (reader: byref>) frameLength = match command.``type`` with | BaseCommand.Type.Connected -> Ok (XCommandConnected command.Connected) @@ -364,7 +372,7 @@ and internal ClientCnx (config: PulsarClientConfiguration, | BaseCommand.Type.SendReceipt -> Ok (XCommandSendReceipt command.SendReceipt) | BaseCommand.Type.Message -> - let metadata,payload,checksumValid = readMessage reader stream frameLength + let metadata,payload,checksumValid = readMessage &reader frameLength Ok (XCommandMessage (command.Message, metadata, payload, checksumValid)) | BaseCommand.Type.LookupResponse -> Ok (XCommandLookupResponse command.lookupTopicResponse) @@ -408,29 +416,24 @@ and internal ClientCnx (config: PulsarClientConfiguration, Result.Error (UnknownCommandType unknownType) let tryParse (buffer: ReadOnlySequence) = - let length = int buffer.Length - if (length >= 8) then - let array = ArrayPool.Shared.Rent length + let length = int buffer.Length // at least 8 + let mutable reader = SequenceReader(buffer) + let mutable totalLength = -1 + SequenceReaderExtensions.TryReadBigEndian(&reader, &totalLength) |> ignore + let frameLength = totalLength + 4 + if (length >= frameLength) then + let mutable baseCommandLength = -1 + SequenceReaderExtensions.TryReadBigEndian(&reader, &baseCommandLength) |> ignore + let command = Serializer.Deserialize(buffer.Slice(reader.Consumed, baseCommandLength)) + reader.Advance baseCommandLength + if Log.Logger.IsEnabled LogLevel.Debug then + Log.Logger.LogDebug("{0} Got message of type {1}", prefix, command.``type``) + let consumed = int64 frameLength |> buffer.GetPosition try - buffer.CopyTo(Span(array)) - use stream = new MemoryStream(array, 0, array.Length, true, true) - use reader = new BinaryReader(stream) - let totalength = reader.ReadInt32() |> int32FromBigEndian - let frameLength = totalength + 4 - if (length >= frameLength) then - let command = Serializer.DeserializeWithLengthPrefix(stream, PrefixStyle.Fixed32BigEndian) - if Log.Logger.IsEnabled LogLevel.Debug then - Log.Logger.LogDebug("{0} Got message of type {1}", prefix, command.``type``) - let consumed = int64 frameLength |> buffer.GetPosition - try - let wrappedCommand = readCommand command reader stream frameLength - wrappedCommand, consumed - with ex -> - Result.Error (CorruptedCommand ex), consumed - else - Result.Error IncompleteCommand, SequencePosition() - finally - ArrayPool.Shared.Return array + let wrappedCommand = readCommand command &reader frameLength + wrappedCommand, consumed + with ex -> + Result.Error (CorruptedCommand ex), consumed else Result.Error IncompleteCommand, SequencePosition() @@ -795,8 +798,7 @@ and internal ClientCnx (config: PulsarClientConfiguration, try while continueLooping do - let! result = reader.ReadAsync() - let buffer = result.Buffer + let! result = reader.ReadAtLeastAsync(8) if result.IsCompleted then if initialConnectionTsc.TrySetException(ConnectException("Unable to initiate connection")) then Log.Logger.LogWarning("{0} New connection was aborted", prefix) @@ -804,6 +806,7 @@ and internal ClientCnx (config: PulsarClientConfiguration, post operationsMb ChannelInactive continueLooping <- false else + let buffer = result.Buffer match tryParse buffer with | Result.Ok xcmd, consumed -> handleCommand xcmd diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index 3bbda9c2..aa0875d9 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -232,7 +232,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c stream.Seek((10+cmdSize) |> int64, SeekOrigin.Begin) |> ignore let checkSum = reader.ReadInt32() |> int32FromBigEndian let checkSumPayload = (int streamSize) - 14 - cmdSize - let computedCheckSum = CRC32C.GetForRMS(stream, checkSumPayload) |> int32 + let computedCheckSum = CRC32C.GetForROS(stream.GetReadOnlySequence().Slice(stream.Position,checkSumPayload)) |> int32 return checkSum <> computedCheckSum } diff --git a/tests/UnitTests/Common/CommandTests.fs b/tests/UnitTests/Common/CommandTests.fs index 7721e321..0c729fbc 100644 --- a/tests/UnitTests/Common/CommandTests.fs +++ b/tests/UnitTests/Common/CommandTests.fs @@ -1,6 +1,7 @@ namespace Pulsar.Client.UnitTests.Common open System +open System.Buffers open System.IO open System.IO.Pipelines open Expecto @@ -118,9 +119,9 @@ module CommandsTests = let crcArrayStart = 8 + commandSize + 6 let crcArray = bytes.AsSpan(crcArrayStart, 4 + medataSize + resultPayload.Length).ToArray() - use crcStream = new MemoryStream(crcArray, 0, crcArray.Length, true, true) + let ros = ReadOnlySequence(crcArray) - let currentCrc32 = CRC32C.GetForMS(crcStream, crcArray.Length) |> int32 + let currentCrc32 = CRC32C.GetForROS(ros) |> int32 magicNumber |> Expect.equal "" MagicNumber crc32 |> Expect.equal "" currentCrc32 diff --git a/tests/UnitTests/Common/HashTests.fs b/tests/UnitTests/Common/HashTests.fs index a60ec222..39534cd0 100644 --- a/tests/UnitTests/Common/HashTests.fs +++ b/tests/UnitTests/Common/HashTests.fs @@ -44,14 +44,7 @@ let crc32cTests = let recycl = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream recycl.Write(input) recycl.Seek(0L, SeekOrigin.Begin) |> ignore - let hash = CRC32C.GetForRMS(recycl, input.Length) - Expect.equal "" 2789859932u hash - } - - test "CRC32Hash MS" { - let input = "Съешь ещё этих мягких французских булок, да выпей чаю" |> System.Text.Encoding.UTF8.GetBytes - let stream = new MemoryStream(input, 0, input.Length, false, true) - let hash = CRC32C.GetForMS(stream, input.Length) + let hash = CRC32C.GetForROS(recycl.GetReadOnlySequence().Slice(0,input.Length)) Expect.equal "" 2789859932u hash } @@ -60,14 +53,7 @@ let crc32cTests = let recycl = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream recycl.Write(input) recycl.Seek(0L, SeekOrigin.Begin) |> ignore - let hash = CRC32C.GetForRMS(recycl, input.Length) - Expect.equal "" 174843280u hash - } - - test "CRC32Hash long MS" { - let input: byte[] = Array.create 750000 1uy - let stream = new MemoryStream(input, 0, input.Length, false, true) - let hash = CRC32C.GetForMS(stream, input.Length) + let hash = CRC32C.GetForROS(recycl.GetReadOnlySequence().Slice(0,input.Length)) Expect.equal "" 174843280u hash }