diff --git a/src/Websocket.Client/RequestMessage.cs b/src/Websocket.Client/RequestMessage.cs new file mode 100644 index 0000000..6c6cede --- /dev/null +++ b/src/Websocket.Client/RequestMessage.cs @@ -0,0 +1,36 @@ +using System; + +namespace Websocket.Client +{ + internal abstract class RequestMessage { } + + internal class RequestTextMessage : RequestMessage + { + public string Text { get; } + + public RequestTextMessage(string text) + { + Text = text; + } + } + + internal class RequestBinaryMessage : RequestMessage + { + public byte[] Data { get; } + + public RequestBinaryMessage(byte[] data) + { + Data = data; + } + } + + internal class RequestBinarySegmentMessage : RequestMessage + { + public ArraySegment Data { get; } + + public RequestBinarySegmentMessage(ArraySegment data) + { + Data = data; + } + } +} diff --git a/src/Websocket.Client/ResponseMessage.cs b/src/Websocket.Client/ResponseMessage.cs index c2eba25..2c1eaf0 100644 --- a/src/Websocket.Client/ResponseMessage.cs +++ b/src/Websocket.Client/ResponseMessage.cs @@ -1,4 +1,5 @@ -using System.Net.WebSockets; +using System.IO; +using System.Net.WebSockets; namespace Websocket.Client { @@ -7,22 +8,30 @@ namespace Websocket.Client /// public class ResponseMessage { - private ResponseMessage(byte[]? binary, string? text, WebSocketMessageType messageType) + private readonly byte[]? _binary; + + private ResponseMessage(MemoryStream? memoryStream, byte[]? binary, string? text, WebSocketMessageType messageType) { - Binary = binary; + Stream = memoryStream; + _binary = binary; Text = text; MessageType = messageType; } /// - /// Received text message (only if type = WebSocketMessageType.Text) + /// Received text message (only if type = ) /// public string? Text { get; } /// - /// Received text message (only if type = WebSocketMessageType.Binary) + /// Received text message (only if type = ) /// - public byte[]? Binary { get; } + public byte[]? Binary => Stream is null ? _binary : Stream.ToArray(); + + /// + /// Received stream message (only if type = and = false) + /// + public MemoryStream? Stream { get; } /// /// Current message type (Text or Binary) @@ -47,7 +56,7 @@ public override string ToString() /// public static ResponseMessage TextMessage(string? data) { - return new ResponseMessage(null, data, WebSocketMessageType.Text); + return new ResponseMessage(null, null, data, WebSocketMessageType.Text); } /// @@ -55,7 +64,15 @@ public static ResponseMessage TextMessage(string? data) /// public static ResponseMessage BinaryMessage(byte[]? data) { - return new ResponseMessage(data, null, WebSocketMessageType.Binary); + return new ResponseMessage(null, data, null, WebSocketMessageType.Binary); + } + + /// + /// Create stream response message + /// + public static ResponseMessage BinaryStreamMessage(MemoryStream? memoryStream) + { + return new ResponseMessage(memoryStream, null, null, WebSocketMessageType.Binary); } } -} \ No newline at end of file +} diff --git a/src/Websocket.Client/Websocket.Client.csproj b/src/Websocket.Client/Websocket.Client.csproj index 3d39e4d..6ac713b 100644 --- a/src/Websocket.Client/Websocket.Client.csproj +++ b/src/Websocket.Client/Websocket.Client.csproj @@ -29,6 +29,7 @@ + diff --git a/src/Websocket.Client/WebsocketClient.Sending.cs b/src/Websocket.Client/WebsocketClient.Sending.cs index b8f5cb4..0e5e260 100644 --- a/src/Websocket.Client/WebsocketClient.Sending.cs +++ b/src/Websocket.Client/WebsocketClient.Sending.cs @@ -1,15 +1,16 @@ -using System; +using Microsoft.Extensions.Logging; +using System; using System.Net.WebSockets; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; namespace Websocket.Client { public partial class WebsocketClient { - private readonly Channel _messagesTextToSendQueue = Channel.CreateUnbounded(new UnboundedChannelOptions() + private readonly Channel _messagesTextToSendQueue = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false @@ -30,7 +31,7 @@ public void Send(string message) { Validations.Validations.ValidateInput(message, nameof(message)); - _messagesTextToSendQueue.Writer.TryWrite(message); + _messagesTextToSendQueue.Writer.TryWrite(new RequestTextMessage(message)); } /// @@ -57,6 +58,45 @@ public void Send(ArraySegment message) _messagesBinaryToSendQueue.Writer.TryWrite(message); } + /// + /// Send text message to the websocket channel. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Text message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendAsync(string message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesTextToSendQueue.Writer.WriteAsync(new RequestTextMessage(message), cancellationToken); + } + + /// + /// Send binary message to the websocket channel. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Binary message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendAsync(byte[] message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesBinaryToSendQueue.Writer.WriteAsync(new ArraySegment(message), cancellationToken); + } + + /// + /// Send binary message to the websocket channel. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Binary message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendAsync(ArraySegment message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesBinaryToSendQueue.Writer.WriteAsync(message, cancellationToken); + } + /// /// Send text message to the websocket channel. /// It doesn't use a sending queue, @@ -68,7 +108,7 @@ public Task SendInstant(string message) { Validations.Validations.ValidateInput(message, nameof(message)); - return SendInternalSynchronized(message); + return SendInternalSynchronized(new RequestTextMessage(message)); } /// @@ -83,6 +123,60 @@ public Task SendInstant(byte[] message) return SendInternalSynchronized(new ArraySegment(message)); } + /// + /// Send already converted text message to the websocket channel. + /// Use this method to avoid double serialization of the text message. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Message to be sent + public void SendTextAsBinary(byte[] message) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + _messagesTextToSendQueue.Writer.TryWrite(new RequestBinaryMessage(message)); + } + + /// + /// Send already converted text message to the websocket channel. + /// Use this method to avoid double serialization of the text message. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Message to be sent + public void SendTextAsBinary(ArraySegment message) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + _messagesTextToSendQueue.Writer.TryWrite(new RequestBinarySegmentMessage(message)); + } + + /// + /// Send already converted text message to the websocket channel. + /// Use this method to avoid double serialization of the text message. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendTextAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinaryMessage(message), cancellationToken); + } + + /// + /// Send already converted text message to the websocket channel. + /// Use this method to avoid double serialization of the text message. + /// It inserts the message to the queue and actual sending is done on an other thread + /// + /// Message to be sent + /// The cancellationToken enables graceful cancellation of asynchronous operations + public ValueTask SendTextAsBinaryAsync(ArraySegment message, CancellationToken cancellationToken = default) + { + Validations.Validations.ValidateInput(message, nameof(message)); + + return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinarySegmentMessage(message), cancellationToken); + } + /// /// Stream/publish fake message (via 'MessageReceived' observable). /// Use for testing purposes to simulate a server message. @@ -188,7 +282,7 @@ private void StartBackgroundThreadForSendingBinary() _ = Task.Factory.StartNew(_ => SendBinaryFromQueue(), TaskCreationOptions.LongRunning, _cancellationTotal?.Token ?? CancellationToken.None); } - private async Task SendInternalSynchronized(string message) + private async Task SendInternalSynchronized(RequestMessage message) { using (await _locker.LockAsync()) { @@ -196,7 +290,7 @@ private async Task SendInternalSynchronized(string message) } } - private async Task SendInternal(string message) + private async Task SendInternal(RequestMessage message) { if (!IsClientConnected()) { @@ -205,10 +299,26 @@ private async Task SendInternal(string message) } _logger.LogTrace(L("Sending: {message}"), Name, message); - var buffer = GetEncoding().GetBytes(message); - var messageSegment = new ArraySegment(buffer); + + ReadOnlyMemory payload; + + switch (message) + { + case RequestTextMessage textMessage: + payload = MemoryMarshal.AsMemory(GetEncoding().GetBytes(textMessage.Text)); + break; + case RequestBinaryMessage binaryMessage: + payload = MemoryMarshal.AsMemory(binaryMessage.Data); + break; + case RequestBinarySegmentMessage segmentMessage: + payload = segmentMessage.Data.AsMemory(); + break; + default: + throw new ArgumentException($"Unknown message type: {message.GetType()}"); + } + await _client! - .SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None) + .SendAsync(payload, WebSocketMessageType.Text, true, _cancellation?.Token ?? CancellationToken.None) .ConfigureAwait(false); } @@ -220,18 +330,18 @@ private async Task SendInternalSynchronized(ArraySegment message) } } - private async Task SendInternal(ArraySegment message) + private async Task SendInternal(ArraySegment payload) { if (!IsClientConnected()) { - _logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, message.Count); + _logger.LogDebug(L("Client is not connected to server, cannot send binary, length: {length}"), Name, payload.Count); return; } - _logger.LogTrace(L("Sending binary, length: {length}"), Name, message.Count); + _logger.LogTrace(L("Sending binary, length: {length}"), Name, payload.Count); await _client! - .SendAsync(message, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None) + .SendAsync(payload, WebSocketMessageType.Binary, true, _cancellation?.Token ?? CancellationToken.None) .ConfigureAwait(false); } } diff --git a/src/Websocket.Client/WebsocketClient.cs b/src/Websocket.Client/WebsocketClient.cs index e0c2baa..4d036d9 100644 --- a/src/Websocket.Client/WebsocketClient.cs +++ b/src/Websocket.Client/WebsocketClient.cs @@ -1,14 +1,14 @@ using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.IO; using System; using System.IO; -using System.Linq; using System.Net.WebSockets; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Logging.Abstractions; using Websocket.Client.Exceptions; using Websocket.Client.Threading; @@ -22,6 +22,7 @@ public partial class WebsocketClient : IWebsocketClient private readonly ILogger _logger; private readonly WebsocketAsyncLock _locker = new WebsocketAsyncLock(); private readonly Func> _connectionFactory; + private static readonly RecyclableMemoryStreamManager _memoryStreamManager = new RecyclableMemoryStreamManager(); private Uri _url; private Timer? _lastChanceTimer; @@ -179,6 +180,15 @@ public bool IsReconnectionEnabled /// public bool IsTextMessageConversionEnabled { get; set; } = true; + /// + /// Enable or disable automatic of the + /// after sending data (only available for binary response). + /// Setting value to false allows you to access the stream directly. + /// However, keep in mind that you need to handle the dispose yourself. + /// Default: true + /// + public bool IsStreamDisposedAutomatically { get; set; } = true; + /// public Encoding? MessageEncoding { get; set; } @@ -443,71 +453,30 @@ private async Task Listen(WebSocket client, CancellationToken token) { // define buffer here and reuse, to avoid more allocation const int chunkSize = 1024 * 4; - var buffer = new ArraySegment(new byte[chunkSize]); + var buffer = new Memory(new byte[chunkSize]); do { - WebSocketReceiveResult result; - byte[]? resultArrayWithTrailing = null; - var resultArraySize = 0; - var isResultArrayCloned = false; - MemoryStream? ms = null; + ValueWebSocketReceiveResult result; + var ms = _memoryStreamManager.GetStream() as RecyclableMemoryStream; while (true) { result = await client.ReceiveAsync(buffer, token); - var currentChunk = buffer.Array; - var currentChunkSize = result.Count; - - var isFirstChunk = resultArrayWithTrailing == null; - if (isFirstChunk) - { - // first chunk, use buffer as reference, do not allocate anything - resultArraySize += currentChunkSize; - resultArrayWithTrailing = currentChunk; - isResultArrayCloned = false; - } - else if (currentChunk == null) - { - // weird chunk, do nothing - } - else - { - // received more chunks, lets merge them via memory stream - if (ms == null) - { - // create memory stream and insert first chunk - ms = new MemoryStream(); - ms.Write(resultArrayWithTrailing!, 0, resultArraySize); - } - - // insert current chunk - ms.Write(currentChunk, buffer.Offset, currentChunkSize); - } + ms!.Write(buffer[..result.Count].Span); if (result.EndOfMessage) - { break; - } - - if (isResultArrayCloned) - continue; - - // we got more chunks incoming, need to clone first chunk - resultArrayWithTrailing = resultArrayWithTrailing?.ToArray(); - isResultArrayCloned = true; } - ms?.Seek(0, SeekOrigin.Begin); + ms.Seek(0, SeekOrigin.Begin); ResponseMessage message; + bool shouldDisposeStream = true; + if (result.MessageType == WebSocketMessageType.Text && IsTextMessageConversionEnabled) { - var data = ms != null ? - GetEncoding().GetString(ms.ToArray()) : - resultArrayWithTrailing != null ? - GetEncoding().GetString(resultArrayWithTrailing, 0, resultArraySize) : - null; + var data = GetEncoding().GetString(ms.ToArray()); message = ResponseMessage.TextMessage(data); } @@ -547,18 +516,19 @@ await StopInternal(client, WebSocketCloseStatus.NormalClosure, "Closing", } else { - if (ms != null) + if (IsStreamDisposedAutomatically) { message = ResponseMessage.BinaryMessage(ms.ToArray()); } else { - Array.Resize(ref resultArrayWithTrailing, resultArraySize); - message = ResponseMessage.BinaryMessage(resultArrayWithTrailing); + message = ResponseMessage.BinaryStreamMessage(ms); + shouldDisposeStream = false; } } - ms?.Dispose(); + if (shouldDisposeStream) + ms.Dispose(); _logger.LogTrace(L("Received: {message}"), Name, message); _lastReceivedMsg = DateTime.UtcNow;