Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocation-free Write(AndFlush)Async using ValueTask #375

Open
wants to merge 29 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0be4706
custom alloc free custom awaitable
maksimkim Feb 22, 2018
ff944b2
ChannelFuture -> ValueTask
maksimkim Mar 10, 2018
2b6f37d
tls fixes
maksimkim Mar 10, 2018
47ff644
single callback per future + callback execution on executor
maksimkim Mar 10, 2018
4f8f30e
uv changes
maksimkim Mar 10, 2018
c70594a
continuations and recycle are inlined
maksimkim Mar 10, 2018
eef526d
test fixes
maksimkim Mar 11, 2018
8abd878
revert libuv changes
maksimkim Mar 11, 2018
cd0b11e
cleanup
maksimkim Mar 11, 2018
1cef415
update packages
maksimkim Mar 12, 2018
5bed79f
multi continuation support returned back
maksimkim Mar 12, 2018
d0836b0
using nuget.config on pkg restore
maksimkim Mar 12, 2018
ed79cce
add nuget.org source
maksimkim Mar 12, 2018
208e69c
rename
maksimkim Mar 13, 2018
c9d0c8a
execution and sync context preservation
maksimkim Mar 14, 2018
67b306b
context propagation fixes
maksimkim Mar 22, 2018
619bfb8
post rebase fixes
maksimkim Mar 22, 2018
baa5e3a
multicontinuation fix for timeout handler
maksimkim Mar 22, 2018
3428e2f
switch to nuget.org for tasks.extensions package
maksimkim Apr 10, 2018
eb5ab1b
default WriteAndFlushAsync returns Task
maksimkim Apr 11, 2018
4dcb8bb
post rebase fixes
maksimkim Apr 12, 2018
78ec006
remove nuget config
maksimkim Apr 12, 2018
fc447d7
upgrade packages to stable version
maksimkim Jun 5, 2018
03e7b1d
ws fixes
maksimkim Oct 9, 2018
5fab149
bump up package versions
maksimkim Oct 10, 2018
6c99f3a
http test fixes
maksimkim Oct 10, 2018
05e4aa6
fix keep alive implementation
maksimkim Oct 10, 2018
7f08e1d
disable configureawait
maksimkim Oct 10, 2018
c1c9865
package version fixes
maksimkim Oct 10, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .nuget/NuGet.Config

This file was deleted.

9 changes: 5 additions & 4 deletions examples/Telnet.Server/TelnetServerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace Telnet.Server
using System;
using System.Net;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Concurrency;
using DotNetty.Transport.Channels;

public class TelnetServerHandler : SimpleChannelInboundHandler<string>
Expand All @@ -16,7 +18,7 @@ public override void ChannelActive(IChannelHandlerContext contex)
contex.WriteAndFlushAsync(string.Format("It is {0} now !\r\n", DateTime.Now));
}

protected override void ChannelRead0(IChannelHandlerContext contex, string msg)
protected override void ChannelRead0(IChannelHandlerContext context, string msg)
{
// Generate and write a response.
string response;
Expand All @@ -35,11 +37,10 @@ protected override void ChannelRead0(IChannelHandlerContext contex, string msg)
response = "Did you say '" + msg + "'?\r\n";
}

Task wait_close = contex.WriteAndFlushAsync(response);
Task waitClose = context.WriteAndFlushAsync(response);
if (close)
{
Task.WaitAll(wait_close);
contex.CloseAsync();
waitClose.CloseOnComplete(context);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/DotNetty.Buffers/DotNetty.Buffers.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?><Project Sdk="Microsoft.NET.Sdk">
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup Label="NuGet">
<TargetFrameworks>netstandard1.3;net45</TargetFrameworks>
<IsPackable>true</IsPackable>
Expand Down
13 changes: 3 additions & 10 deletions src/DotNetty.Codecs.Http/Cors/CorsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void SetExposeHeaders(IHttpResponse response)

void SetMaxAge(IHttpResponse response) => response.Headers.Set(HttpHeaderNames.AccessControlMaxAge, this.config.MaxAge);

public override Task WriteAsync(IChannelHandlerContext context, object message)
public override ValueTask WriteAsync(IChannelHandlerContext context, object message)
{
if (this.config.IsCorsSupportEnabled && message is IHttpResponse response)
{
Expand All @@ -177,7 +177,7 @@ public override Task WriteAsync(IChannelHandlerContext context, object message)
this.SetExposeHeaders(response);
}
}
return context.WriteAndFlushAsync(message);
return context.WriteAndFlushAsync(message, true);
}

static void Forbidden(IChannelHandlerContext ctx, IHttpRequest request)
Expand All @@ -197,15 +197,8 @@ static void Respond(IChannelHandlerContext ctx, IHttpRequest request, IHttpRespo
Task task = ctx.WriteAndFlushAsync(response);
if (!keepAlive)
{
task.ContinueWith(CloseOnComplete, ctx,
TaskContinuationOptions.ExecuteSynchronously);
task.CloseOnComplete(ctx);
}
}

static void CloseOnComplete(Task task, object state)
{
var ctx = (IChannelHandlerContext)state;
ctx.CloseAsync();
}
}
}
6 changes: 3 additions & 3 deletions src/DotNetty.Codecs.Http/HttpClientUpgradeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public HttpClientUpgradeHandler(ISourceCodec sourceCodec, IUpgradeCodec upgradeC
this.upgradeCodec = upgradeCodec;
}

public override Task WriteAsync(IChannelHandlerContext context, object message)
public override ValueTask WriteAsync(IChannelHandlerContext context, object message)
{
if (!(message is IHttpRequest))
{
Expand All @@ -81,14 +81,14 @@ public override Task WriteAsync(IChannelHandlerContext context, object message)

if (this.upgradeRequested)
{
return TaskEx.FromException(new InvalidOperationException("Attempting to write HTTP request with upgrade in progress"));
return new ValueTask(TaskEx.FromException(new InvalidOperationException("Attempting to write HTTP request with upgrade in progress")));
}

this.upgradeRequested = true;
this.SetUpgradeRequestHeaders(context, (IHttpRequest)message);

// Continue writing the request.
Task task = context.WriteAsync(message);
ValueTask task = context.WriteAsync(message);

// Notify that the upgrade request was issued.
context.FireUserEventTriggered(UpgradeEvent.UpgradeIssued);
Expand Down
16 changes: 2 additions & 14 deletions src/DotNetty.Codecs.Http/HttpServerExpectContinueHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,15 @@ public override void ChannelRead(IChannelHandlerContext context, object message)
// the expectation failed so we refuse the request.
IHttpResponse rejection = this.RejectResponse(req);
ReferenceCountUtil.Release(message);
context.WriteAndFlushAsync(rejection)
.ContinueWith(CloseOnFailure, context, TaskContinuationOptions.ExecuteSynchronously);
context.WriteAndFlushAsync(rejection).CloseOnFailure(context);
return;
}

context.WriteAndFlushAsync(accept)
.ContinueWith(CloseOnFailure, context, TaskContinuationOptions.ExecuteSynchronously);
context.WriteAndFlushAsync(accept).CloseOnFailure(context);
req.Headers.Remove(HttpHeaderNames.Expect);
}
base.ChannelRead(context, message);
}
}

static Task CloseOnFailure(Task task, object state)
{
if (task.IsFaulted)
{
var context = (IChannelHandlerContext)state;
return context.CloseAsync();
}
return TaskEx.Completed;
}
}
}
13 changes: 4 additions & 9 deletions src/DotNetty.Codecs.Http/HttpServerKeepAliveHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public override void ChannelRead(IChannelHandlerContext context, object message)
base.ChannelRead(context, message);
}

public override Task WriteAsync(IChannelHandlerContext context, object message)
public override ValueTask WriteAsync(IChannelHandlerContext context, object message)
{
// modify message on way out to add headers if needed
if (message is IHttpResponse response)
Expand All @@ -52,18 +52,13 @@ public override Task WriteAsync(IChannelHandlerContext context, object message)
}
if (message is ILastHttpContent && !this.ShouldKeepAlive())
{
return base.WriteAsync(context, message)
.ContinueWith(CloseOnComplete, context, TaskContinuationOptions.ExecuteSynchronously);
Task task = base.WriteAsync(context, message).AsTask();
task.CloseOnComplete(context.Channel);
return new ValueTask(task);
}
return base.WriteAsync(context, message);
}

static Task CloseOnComplete(Task task, object state)
{
var context = (IChannelHandlerContext)state;
return context.CloseAsync();
}

void TrackResponse(IHttpResponse response)
{
if (!IsInformational(response))
Expand Down
50 changes: 22 additions & 28 deletions src/DotNetty.Codecs.Http/HttpServerUpgradeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,34 +254,28 @@ bool Upgrade(IChannelHandlerContext ctx, IFullHttpRequest request)
var upgradeEvent = new UpgradeEvent(upgradeProtocol, request);

IUpgradeCodec finalUpgradeCodec = upgradeCodec;
ctx.WriteAndFlushAsync(upgradeResponse).ContinueWith(t =>
{
try
{
if (t.Status == TaskStatus.RanToCompletion)
{
// Perform the upgrade to the new protocol.
this.sourceCodec.UpgradeFrom(ctx);
finalUpgradeCodec.UpgradeTo(ctx, request);

// Notify that the upgrade has occurred. Retain the event to offset
// the release() in the finally block.
ctx.FireUserEventTriggered(upgradeEvent.Retain());

// Remove this handler from the pipeline.
ctx.Channel.Pipeline.Remove(this);
}
else
{
ctx.Channel.CloseAsync();
}
}
finally
{
// Release the event if the upgrade event wasn't fired.
upgradeEvent.Release();
}
}, TaskContinuationOptions.ExecuteSynchronously);
try
{
Task writeTask = ctx.WriteAndFlushAsync(upgradeResponse);

// Perform the upgrade to the new protocol.
this.sourceCodec.UpgradeFrom(ctx);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finalUpgradeCodec.UpgradeTo(ctx, request);

// Remove this handler from the pipeline.
ctx.Channel.Pipeline.Remove(this);

// Notify that the upgrade has occurred. Retain the event to offset
// the release() in the finally block.
ctx.FireUserEventTriggered(upgradeEvent.Retain());

writeTask.CloseOnFailure(ctx.Channel);
}
finally
{
// Release the event if the upgrade event wasn't fired.
upgradeEvent.Release();
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public WebSocketClientExtensionHandler(params IWebSocketClientExtensionHandshake
this.extensionHandshakers = new List<IWebSocketClientExtensionHandshaker>(extensionHandshakers);
}

public override Task WriteAsync(IChannelHandlerContext ctx, object msg)
public override ValueTask WriteAsync(IChannelHandlerContext ctx, object msg)
{
if (msg is IHttpRequest request && WebSocketExtensionUtil.IsWebsocketUpgrade(request.Headers))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ public class WebSocketServerExtensionHandler : ChannelHandlerAdapter
readonly List<IWebSocketServerExtensionHandshaker> extensionHandshakers;

List<IWebSocketServerExtension> validExtensions;
Action<Task, object> upgradeCompletedContinuation;

public WebSocketServerExtensionHandler(params IWebSocketServerExtensionHandshaker[] extensionHandshakers)
{
Contract.Requires(extensionHandshakers != null && extensionHandshakers.Length > 0);

this.extensionHandshakers = new List<IWebSocketServerExtensionHandshaker>(extensionHandshakers);
this.upgradeCompletedContinuation = this.OnUpgradeCompleted;
}

public override void ChannelRead(IChannelHandlerContext ctx, object msg)
Expand Down Expand Up @@ -67,16 +68,16 @@ public override void ChannelRead(IChannelHandlerContext ctx, object msg)
base.ChannelRead(ctx, msg);
}

public override Task WriteAsync(IChannelHandlerContext ctx, object msg)
public override ValueTask WriteAsync(IChannelHandlerContext ctx, object msg)
{
Action<Task> continuationAction = null;

HttpHeaders responseHeaders;
string headerValue = null;

if (msg is IHttpResponse response
&& WebSocketExtensionUtil.IsWebsocketUpgrade(response.Headers)
&& WebSocketExtensionUtil.IsWebsocketUpgrade(responseHeaders = response.Headers)
&& this.validExtensions != null)
{
string headerValue = null;
if (response.Headers.TryGet(HttpHeaderNames.SecWebsocketExtensions, out ICharSequence value))
if (responseHeaders.TryGet(HttpHeaderNames.SecWebsocketExtensions, out ICharSequence value))
{
headerValue = value?.ToString();
}
Expand All @@ -88,31 +89,33 @@ public override Task WriteAsync(IChannelHandlerContext ctx, object msg)
extensionData.Name, extensionData.Parameters);
}

continuationAction = promise =>
{
if (promise.Status == TaskStatus.RanToCompletion)
{
foreach (IWebSocketServerExtension extension in this.validExtensions)
{
WebSocketExtensionDecoder decoder = extension.NewExtensionDecoder();
WebSocketExtensionEncoder encoder = extension.NewExtensionEncoder();
ctx.Channel.Pipeline.AddAfter(ctx.Name, decoder.GetType().Name, decoder);
ctx.Channel.Pipeline.AddAfter(ctx.Name, encoder.GetType().Name, encoder);
}
}
ctx.Channel.Pipeline.Remove(ctx.Name);
};

if (headerValue != null)
{
response.Headers.Set(HttpHeaderNames.SecWebsocketExtensions, headerValue);
responseHeaders.Set(HttpHeaderNames.SecWebsocketExtensions, headerValue);
}

Task task = base.WriteAsync(ctx, msg).AsTask();
task.ContinueWith(this.upgradeCompletedContinuation, ctx, TaskContinuationOptions.ExecuteSynchronously);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return new ValueTask(task);
}

return continuationAction == null
? base.WriteAsync(ctx, msg)
: base.WriteAsync(ctx, msg)
.ContinueWith(continuationAction, TaskContinuationOptions.ExecuteSynchronously);
return base.WriteAsync(ctx, msg);
}

void OnUpgradeCompleted(Task task, object state)
{
var ctx = (IChannelHandlerContext)state;
if (task.Status == TaskStatus.RanToCompletion)
{
foreach (IWebSocketServerExtension extension in this.validExtensions)
{
WebSocketExtensionDecoder decoder = extension.NewExtensionDecoder();
WebSocketExtensionEncoder encoder = extension.NewExtensionEncoder();
ctx.Channel.Pipeline.AddAfter(ctx.Name, decoder.GetType().Name, decoder);
ctx.Channel.Pipeline.AddAfter(ctx.Name, encoder.GetType().Name, encoder);
}
}
ctx.Channel.Pipeline.Remove(ctx.Name);
}
}
}
2 changes: 1 addition & 1 deletion src/DotNetty.Codecs.Mqtt/MqttDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ static void DecodeConnectPacket(IByteBuffer buffer, ConnectPacket packet, ref in
{
var connAckPacket = new ConnAckPacket();
connAckPacket.ReturnCode = ConnectReturnCode.RefusedUnacceptableProtocolVersion;
context.WriteAndFlushAsync(connAckPacket);
context.WriteAndFlushAsync(connAckPacket, false);
throw new DecoderException($"Unexpected protocol level. Expected: {Util.ProtocolLevel}. Actual: {packet.ProtocolLevel}");
}

Expand Down
3 changes: 1 addition & 2 deletions src/DotNetty.Codecs/Compression/JZlibEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ Task FinishEncode(IChannelHandlerContext context)
this.z.next_out = null;
}

return context.WriteAndFlushAsync(footer)
.ContinueWith(_ => context.CloseAsync());
return context.WriteAndFlushAsync(footer).CloseOnComplete(context);
}

public override void HandlerAdded(IChannelHandlerContext context) => this.ctx = context;
Expand Down
6 changes: 5 additions & 1 deletion src/DotNetty.Codecs/DotNetty.Codecs.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?><Project Sdk="Microsoft.NET.Sdk">
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard1.3;net45</TargetFrameworks>
<IsPackable>true</IsPackable>
Expand Down Expand Up @@ -44,4 +45,7 @@
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.1" />
</ItemGroup>
</Project>
Loading