Skip to content

Commit

Permalink
Buferless consume
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Oct 4, 2023
1 parent 2ddb151 commit bb4099a
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 78 deletions.
24 changes: 6 additions & 18 deletions src/Pulsar.Client.Proto/Crc32c.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.IO;
using System.Numerics;
using Microsoft.IO;
Expand All @@ -8,19 +9,18 @@ namespace Pulsar.Client.Common
{
internal static class CRC32C
{
internal static uint GetForRMS(RecyclableMemoryStream stream, int size)
internal static uint GetForROS(ReadOnlySequence<byte> stream)
{
var crc = ~0U; //0xFFFFFFFF
var memorySequence = stream.GetReadOnlySequence().Slice(stream.Position);
foreach (var memory in memorySequence)
foreach (var memory in stream)
{
var span = memory.Span;
CrcAlgorithm(ref size, span, ref crc);
CrcAlgorithm(span, ref crc);
}
return crc ^ ~0U; //0xFFFFFFFF
}

private static void CrcAlgorithm(ref int size, ReadOnlySpan<byte> span, ref uint crc)
private static void CrcAlgorithm(ReadOnlySpan<byte> span, ref uint crc)
{
var currentBlockLength = span.Length;
var i = 0;
Expand All @@ -31,24 +31,12 @@ private static void CrcAlgorithm(ref int size, ReadOnlySpan<byte> span, ref uint
crc = BitOperations.Crc32C(crc, batch);
i+=8;
}
size -= i;
while (size > 0 && i < currentBlockLength)
while (i < currentBlockLength)
{
crc = BitOperations.Crc32C(crc, span[i]);
size--;
i++;
}
}

internal static uint GetForMS(MemoryStream stream, int size)
{
var crc = ~0U; //0xFFFFFFFF
var buf = stream.GetBuffer();
var offset = (int) stream.Position;
var span = buf.AsSpan(offset, size);
CrcAlgorithm(ref size, span, ref crc);
return crc ^ ~0U; //0xFFFFFFFF
}
}
}

2 changes: 1 addition & 1 deletion src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ let private serializePayloadCommand (command : BaseCommand) (metadata: MessageMe

// write CRC
temp.Seek(int64 crcPayloadStart, SeekOrigin.Begin) |> ignore
let crc = int32 <| CRC32C.GetForRMS(temp, totalMetadataSize + payloadSize)
let crc = int32 <| CRC32C.GetForROS(temp.GetReadOnlySequence().Slice(temp.Position, totalMetadataSize + payloadSize))
temp.Seek(int64 crcStart, SeekOrigin.Begin) |> ignore
binaryWriter.Write(int32ToBigEndian crc)

Expand Down
4 changes: 2 additions & 2 deletions src/Pulsar.Client/Common/Tools.fs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ let postAndAsyncReply (channel: Channel<'T>) f =
(f tcs) |> channel.Writer.TryWrite |> ignore
tcs.Task

let post (channel: Channel<'T>) =
channel.Writer.TryWrite >> ignore
let post (channel: Channel<'T>) msg =
channel.Writer.TryWrite msg |> ignore

let getSpan (stream: MemoryStream) =
stream.GetBuffer().AsSpan(0, int stream.Length)
Expand Down
79 changes: 41 additions & 38 deletions src/Pulsar.Client/Internal/ClientCnx.fs
Original file line number Diff line number Diff line change
Expand Up @@ -338,24 +338,32 @@ and internal ClientCnx (config: PulsarClientConfiguration,
Log.Logger.LogInformation("{0} sendMb mailbox has stopped normally", prefix))
|> ignore

let readMessage (reader: BinaryReader) (stream: MemoryStream) frameLength =
reader.ReadInt16() |> int16FromBigEndian |> invalidArgIf ((<>) MagicNumber) "Invalid magicNumber" |> ignore
let messageCheckSum = reader.ReadInt32() |> int32FromBigEndian
let metadataPointer = stream.Position
let metadata = Serializer.DeserializeWithLengthPrefix<MessageMetadata>(stream, PrefixStyle.Fixed32BigEndian)
let payloadPointer = stream.Position
let metadataLength = payloadPointer - metadataPointer |> int
let payloadLength = frameLength - (int payloadPointer)
let readMessage (reader: byref<SequenceReader<byte>>) frameLength =
let stream = reader.Sequence
let mutable magicNumber = -1s
if (SequenceReaderExtensions.TryReadBigEndian(&reader, &magicNumber) |> not)
|| magicNumber <> MagicNumber then
raise <| ArgumentException("Invalid magicNumber")
let mutable messageCheckSum = -1
SequenceReaderExtensions.TryReadBigEndian(&reader, &messageCheckSum) |> ignore
let metadataPointer = int reader.Consumed
let mutable metadataLength = -1
SequenceReaderExtensions.TryReadBigEndian(&reader, &metadataLength) |> ignore
let metadata = Serializer.Deserialize<MessageMetadata>(reader.Sequence.Slice(reader.Consumed, metadataLength))
reader.Advance(metadataLength)
let payloadPointer = int reader.Consumed
let payloadLength = frameLength - payloadPointer
let payload = MemoryStreamManager.GetStream("payload", payloadLength)
let payloadBytes = stream.GetBuffer().AsSpan().Slice(int payloadPointer, payloadLength)
payload.Write(payloadBytes)
stream.Seek(metadataPointer, SeekOrigin.Begin) |> ignore
let calculatedCheckSum = CRC32C.GetForMS(stream, metadataLength + payloadLength) |> int32
let payloadBytes = reader.Sequence.Slice(payloadPointer, payloadLength)
for segment in payloadBytes do
payload.Write(segment.Span)
let size = frameLength-metadataPointer
let calculatedCheckSum = CRC32C.GetForROS(stream.Slice(metadataPointer, size)) |> int32
if (messageCheckSum <> calculatedCheckSum) then
Log.Logger.LogError("{0} Invalid checksum. Received: {1} Calculated: {2}", prefix, messageCheckSum, calculatedCheckSum)
(metadata, payload, messageCheckSum = calculatedCheckSum)

let readCommand (command: BaseCommand) reader stream frameLength =
let readCommand (command: BaseCommand) (reader: byref<SequenceReader<byte>>) frameLength =
match command.``type`` with
| BaseCommand.Type.Connected ->
Ok (XCommandConnected command.Connected)
Expand All @@ -364,7 +372,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
| BaseCommand.Type.SendReceipt ->
Ok (XCommandSendReceipt command.SendReceipt)
| BaseCommand.Type.Message ->
let metadata,payload,checksumValid = readMessage reader stream frameLength
let metadata,payload,checksumValid = readMessage &reader frameLength
Ok (XCommandMessage (command.Message, metadata, payload, checksumValid))
| BaseCommand.Type.LookupResponse ->
Ok (XCommandLookupResponse command.lookupTopicResponse)
Expand Down Expand Up @@ -408,29 +416,24 @@ and internal ClientCnx (config: PulsarClientConfiguration,
Result.Error (UnknownCommandType unknownType)

let tryParse (buffer: ReadOnlySequence<byte>) =
let length = int buffer.Length
if (length >= 8) then
let array = ArrayPool.Shared.Rent length
let length = int buffer.Length // at least 8
let mutable reader = SequenceReader<byte>(buffer)
let mutable totalLength = -1
SequenceReaderExtensions.TryReadBigEndian(&reader, &totalLength) |> ignore
let frameLength = totalLength + 4
if (length >= frameLength) then
let mutable baseCommandLength = -1
SequenceReaderExtensions.TryReadBigEndian(&reader, &baseCommandLength) |> ignore
let command = Serializer.Deserialize<BaseCommand>(buffer.Slice(reader.Consumed, baseCommandLength))
reader.Advance baseCommandLength
if Log.Logger.IsEnabled LogLevel.Debug then
Log.Logger.LogDebug("{0} Got message of type {1}", prefix, command.``type``)
let consumed = int64 frameLength |> buffer.GetPosition
try
buffer.CopyTo(Span(array))
use stream = new MemoryStream(array, 0, array.Length, true, true)
use reader = new BinaryReader(stream)
let totalength = reader.ReadInt32() |> int32FromBigEndian
let frameLength = totalength + 4
if (length >= frameLength) then
let command = Serializer.DeserializeWithLengthPrefix<BaseCommand>(stream, PrefixStyle.Fixed32BigEndian)
if Log.Logger.IsEnabled LogLevel.Debug then
Log.Logger.LogDebug("{0} Got message of type {1}", prefix, command.``type``)
let consumed = int64 frameLength |> buffer.GetPosition
try
let wrappedCommand = readCommand command reader stream frameLength
wrappedCommand, consumed
with ex ->
Result.Error (CorruptedCommand ex), consumed
else
Result.Error IncompleteCommand, SequencePosition()
finally
ArrayPool.Shared.Return array
let wrappedCommand = readCommand command &reader frameLength
wrappedCommand, consumed
with ex ->
Result.Error (CorruptedCommand ex), consumed
else
Result.Error IncompleteCommand, SequencePosition()

Expand Down Expand Up @@ -795,15 +798,15 @@ and internal ClientCnx (config: PulsarClientConfiguration,

try
while continueLooping do
let! result = reader.ReadAsync()
let buffer = result.Buffer
let! result = reader.ReadAtLeastAsync(8)
if result.IsCompleted then
if initialConnectionTsc.TrySetException(ConnectException("Unable to initiate connection")) then
Log.Logger.LogWarning("{0} New connection was aborted", prefix)
Log.Logger.LogWarning("{0} Socket was disconnected normally while reading", prefix)
post operationsMb ChannelInactive
continueLooping <- false
else
let buffer = result.Buffer
match tryParse buffer with
| Result.Ok xcmd, consumed ->
handleCommand xcmd
Expand Down
2 changes: 1 addition & 1 deletion src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
stream.Seek((10+cmdSize) |> int64, SeekOrigin.Begin) |> ignore
let checkSum = reader.ReadInt32() |> int32FromBigEndian
let checkSumPayload = (int streamSize) - 14 - cmdSize
let computedCheckSum = CRC32C.GetForRMS(stream, checkSumPayload) |> int32
let computedCheckSum = CRC32C.GetForROS(stream.GetReadOnlySequence().Slice(stream.Position,checkSumPayload)) |> int32
return checkSum <> computedCheckSum
}

Expand Down
5 changes: 3 additions & 2 deletions tests/UnitTests/Common/CommandTests.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Pulsar.Client.UnitTests.Common

open System
open System.Buffers
open System.IO
open System.IO.Pipelines
open Expecto
Expand Down Expand Up @@ -118,9 +119,9 @@ module CommandsTests =

let crcArrayStart = 8 + commandSize + 6
let crcArray = bytes.AsSpan(crcArrayStart, 4 + medataSize + resultPayload.Length).ToArray()
use crcStream = new MemoryStream(crcArray, 0, crcArray.Length, true, true)
let ros = ReadOnlySequence(crcArray)

let currentCrc32 = CRC32C.GetForMS(crcStream, crcArray.Length) |> int32
let currentCrc32 = CRC32C.GetForROS(ros) |> int32

magicNumber |> Expect.equal "" MagicNumber
crc32 |> Expect.equal "" currentCrc32
Expand Down
18 changes: 2 additions & 16 deletions tests/UnitTests/Common/HashTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,7 @@ let crc32cTests =
let recycl = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream
recycl.Write(input)
recycl.Seek(0L, SeekOrigin.Begin) |> ignore
let hash = CRC32C.GetForRMS(recycl, input.Length)
Expect.equal "" 2789859932u hash
}

test "CRC32Hash MS" {
let input = "Съешь ещё этих мягких французских булок, да выпей чаю" |> System.Text.Encoding.UTF8.GetBytes
let stream = new MemoryStream(input, 0, input.Length, false, true)
let hash = CRC32C.GetForMS(stream, input.Length)
let hash = CRC32C.GetForROS(recycl.GetReadOnlySequence().Slice(0,input.Length))
Expect.equal "" 2789859932u hash
}

Expand All @@ -60,14 +53,7 @@ let crc32cTests =
let recycl = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream
recycl.Write(input)
recycl.Seek(0L, SeekOrigin.Begin) |> ignore
let hash = CRC32C.GetForRMS(recycl, input.Length)
Expect.equal "" 174843280u hash
}

test "CRC32Hash long MS" {
let input: byte[] = Array.create 750000 1uy
let stream = new MemoryStream(input, 0, input.Length, false, true)
let hash = CRC32C.GetForMS(stream, input.Length)
let hash = CRC32C.GetForROS(recycl.GetReadOnlySequence().Slice(0,input.Length))
Expect.equal "" 174843280u hash
}

Expand Down

0 comments on commit bb4099a

Please sign in to comment.