Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RecyclableMemoryStream instead of MemoryStream #133

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/Websocket.Client/RequestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;

namespace Websocket.Client
{
internal abstract class RequestMessage { }

internal class RequestTextMessage : RequestMessage
{
public string Text { get; }

public RequestTextMessage(string text)
{
Text = text;
}
}

internal class RequestBinaryMessage : RequestMessage
{
public byte[] Data { get; }

public RequestBinaryMessage(byte[] data)
{
Data = data;
}
}

internal class RequestBinarySegmentMessage : RequestMessage
{
public ArraySegment<byte> Data { get; }

public RequestBinarySegmentMessage(ArraySegment<byte> data)
{
Data = data;
}
}
}
35 changes: 26 additions & 9 deletions src/Websocket.Client/ResponseMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.WebSockets;
using System.IO;
using System.Net.WebSockets;

namespace Websocket.Client
{
Expand All @@ -7,22 +8,30 @@ namespace Websocket.Client
/// </summary>
public class ResponseMessage
{
private ResponseMessage(byte[]? binary, string? text, WebSocketMessageType messageType)
private readonly byte[]? _binary;

private ResponseMessage(MemoryStream? memoryStream, byte[]? binary, string? text, WebSocketMessageType messageType)
{
Binary = binary;
Stream = memoryStream;
_binary = binary;
Text = text;
MessageType = messageType;
}

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Text)
/// Received text message (only if type = <see cref="WebSocketMessageType.Text"/>)
/// </summary>
public string? Text { get; }

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Binary)
/// Received text message (only if type = <see cref="WebSocketMessageType.Binary"/>)
/// </summary>
public byte[]? Binary { get; }
public byte[]? Binary => Stream is null ? _binary : Stream.ToArray();

/// <summary>
/// Received stream message (only if type = <see cref="WebSocketMessageType.Binary"/> and <see cref="WebsocketClient.IsStreamDisposedAutomatically"/> = false)
/// </summary>
public MemoryStream? Stream { get; }

/// <summary>
/// Current message type (Text or Binary)
Expand All @@ -47,15 +56,23 @@ public override string ToString()
/// </summary>
public static ResponseMessage TextMessage(string? data)
{
return new ResponseMessage(null, data, WebSocketMessageType.Text);
return new ResponseMessage(null, null, data, WebSocketMessageType.Text);
}

/// <summary>
/// Create binary response message
/// </summary>
public static ResponseMessage BinaryMessage(byte[]? data)
{
return new ResponseMessage(data, null, WebSocketMessageType.Binary);
return new ResponseMessage(null, data, null, WebSocketMessageType.Binary);
}

/// <summary>
/// Create stream response message
/// </summary>
public static ResponseMessage BinaryStreamMessage(MemoryStream? memoryStream)
{
return new ResponseMessage(memoryStream, null, null, WebSocketMessageType.Binary);
}
}
}
}
1 change: 1 addition & 0 deletions src/Websocket.Client/Websocket.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.1" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.3.2" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
</ItemGroup>
Expand Down
138 changes: 124 additions & 14 deletions src/Websocket.Client/WebsocketClient.Sending.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using System;
using Microsoft.Extensions.Logging;
using System;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Websocket.Client
{
public partial class WebsocketClient
{
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
private readonly Channel<RequestMessage> _messagesTextToSendQueue = Channel.CreateUnbounded<RequestMessage>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
Expand All @@ -30,7 +31,7 @@ public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(message);
_messagesTextToSendQueue.Writer.TryWrite(new RequestTextMessage(message));
}

/// <summary>
Expand All @@ -57,6 +58,45 @@ public void Send(ArraySegment<byte> message)
_messagesBinaryToSendQueue.Writer.TryWrite(message);
}

/// <summary>
/// Send text message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Text message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(string message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestTextMessage(message), cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(new ArraySegment<byte>(message), cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(message, cancellationToken);
}

/// <summary>
/// Send text message to the websocket channel.
/// It doesn't use a sending queue,
Expand All @@ -68,7 +108,7 @@ public Task SendInstant(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

return SendInternalSynchronized(message);
return SendInternalSynchronized(new RequestTextMessage(message));
}

/// <summary>
Expand All @@ -83,6 +123,60 @@ public Task SendInstant(byte[] message)
return SendInternalSynchronized(new ArraySegment<byte>(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(byte[] message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinaryMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(ArraySegment<byte> message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinarySegmentMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinaryMessage(message), cancellationToken);
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinarySegmentMessage(message), cancellationToken);
}

/// <summary>
/// Stream/publish fake message (via 'MessageReceived' observable).
/// Use for testing purposes to simulate a server message.
Expand Down Expand Up @@ -188,15 +282,15 @@ private void StartBackgroundThreadForSendingBinary()
_ = Task.Factory.StartNew(_ => SendBinaryFromQueue(), TaskCreationOptions.LongRunning, _cancellationTotal?.Token ?? CancellationToken.None);
}

private async Task SendInternalSynchronized(string message)
private async Task SendInternalSynchronized(RequestMessage message)
{
using (await _locker.LockAsync())
{
await SendInternal(message);
}
}

private async Task SendInternal(string message)
private async Task SendInternal(RequestMessage message)
{
if (!IsClientConnected())
{
Expand All @@ -205,10 +299,26 @@ private async Task SendInternal(string message)
}

_logger.LogTrace(L("Sending: {message}"), Name, message);
var buffer = GetEncoding().GetBytes(message);
var messageSegment = new ArraySegment<byte>(buffer);

ReadOnlyMemory<byte> payload;

switch (message)
{
case RequestTextMessage textMessage:
payload = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(textMessage.Text));
break;
case RequestBinaryMessage binaryMessage:
payload = MemoryMarshal.AsMemory<byte>(binaryMessage.Data);
break;
case RequestBinarySegmentMessage segmentMessage:
payload = segmentMessage.Data.AsMemory();
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}

await _client!
.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None)
.SendAsync(payload, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None)
.ConfigureAwait(false);
}

Expand All @@ -220,18 +330,18 @@ private async Task SendInternalSynchronized(ArraySegment<byte> message)
}
}

private async Task SendInternal(ArraySegment<byte> message)
private async Task SendInternal(ArraySegment<byte> payload)
{
if (!IsClientConnected())
{
_logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, message.Count);
_logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, payload.Count);
return;
}

_logger.LogTrace(L("Sending binary, length: {length}"), Name, message.Count);
_logger.LogTrace(L("Sending binary, length: {length}"), Name, payload.Count);

await _client!
.SendAsync(message, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None)
.SendAsync(payload, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None)
.ConfigureAwait(false);
}
}
Expand Down
Loading
Loading