Skip to content

Commit

Permalink
Pickup some commits not related to Mono from #374 (#410)
Browse files Browse the repository at this point in the history
* Add some missing method to LoggingHandler

* Avoid to alloc an huge error message when the test not failed.

* Update the unittest

* Update Microsoft.NET.Test.Sdk from 15.0.0 to 15.7.2, fix that unable to debug an unittest for the second time.
* Disable parallelization for InternalLoggerFactoryTest.TestMockReturned to avoid an rare test failure.
* Remove `dotnet-xunit` since it's never used and will be discontinued, see https://xunit.github.io/releases/2.4-beta2

* Remove space from filename

* Switch back to `DiscardSomeReadBytes` since it's avaliable

* Rework some logic in TlsHandler

* Make sure TlsHandler.MediationStream works well with different style of aync calls(Still not work for Mono, see #374)
* Rework some logic in #366, now always close TlsHandler.MediationStream in TlsHandler.HandleFailure since it's never exported.

* Workaround to fix issue 'microsoft/vstest#1129'.

* Change the default of TcpServerSocketChannel.Metadata.defaultMaxMessagesPerRead to 1
  • Loading branch information
yyjdelete authored and nayato committed Aug 13, 2018
1 parent 2c0c109 commit 797629a
Show file tree
Hide file tree
Showing 25 changed files with 142 additions and 55 deletions.
5 changes: 5 additions & 0 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<Project>
<Target Name="VSTestIfTestProject">
<CallTarget Targets="VSTest" Condition="'$(IsTestProject)' == 'true'" />
</Target>
</Project>
5 changes: 5 additions & 0 deletions after.DotNetty.sln.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<Project>
<Target Name="VSTest">
<MSBuild Projects="@(ProjectReference)" Targets="VSTestIfTestProject" />
</Target>
</Project>
2 changes: 1 addition & 1 deletion src/DotNetty.Codecs/ByteToMessageDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected void DiscardSomeReadBytes()
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
this.cumulation.DiscardReadBytes(); // todo: use discardSomeReadBytes
this.cumulation.DiscardSomeReadBytes();
}
}

Expand Down
42 changes: 42 additions & 0 deletions src/DotNetty.Handlers/Logging/LoggingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,48 @@ public override void ChannelRead(IChannelHandlerContext ctx, object message)
ctx.FireChannelRead(message);
}

public override void ChannelReadComplete(IChannelHandlerContext ctx)
{
if (this.Logger.IsEnabled(this.InternalLevel))
{
this.Logger.Log(this.InternalLevel, this.Format(ctx, "RECEIVED_COMPLETE"));
}
ctx.FireChannelReadComplete();
}

public override void ChannelWritabilityChanged(IChannelHandlerContext ctx)
{
if (this.Logger.IsEnabled(this.InternalLevel))
{
this.Logger.Log(this.InternalLevel, this.Format(ctx, "WRITABILITY", ctx.Channel.IsWritable));
}
ctx.FireChannelWritabilityChanged();
}

public override void HandlerAdded(IChannelHandlerContext ctx)
{
if (this.Logger.IsEnabled(this.InternalLevel))
{
this.Logger.Log(this.InternalLevel, this.Format(ctx, "HANDLER_ADDED"));
}
}
public override void HandlerRemoved(IChannelHandlerContext ctx)
{
if (this.Logger.IsEnabled(this.InternalLevel))
{
this.Logger.Log(this.InternalLevel, this.Format(ctx, "HANDLER_REMOVED"));
}
}

public override void Read(IChannelHandlerContext ctx)
{
if (this.Logger.IsEnabled(this.InternalLevel))
{
this.Logger.Log(this.InternalLevel, this.Format(ctx, "READ"));
}
ctx.Read();
}

public override Task WriteAsync(IChannelHandlerContext ctx, object msg)
{
if (this.Logger.IsEnabled(this.InternalLevel))
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Handlers/Tls/SniHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public sealed class SniHandler : ByteToMessageDecoder
bool readPending;

public SniHandler(ServerTlsSniSettings settings)
: this(stream => new SslStream(stream, false), settings)
: this(stream => new SslStream(stream, true), settings)
{
}

Expand Down
99 changes: 63 additions & 36 deletions src/DotNetty.Handlers/Tls/TlsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace DotNetty.Handlers.Tls
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Net.Security;
Expand Down Expand Up @@ -41,7 +42,7 @@ public sealed class TlsHandler : ByteToMessageDecoder
Task<int> pendingSslStreamReadFuture;

public TlsHandler(TlsSettings settings)
: this(stream => new SslStream(stream, false), settings)
: this(stream => new SslStream(stream, true), settings)
{
}

Expand Down Expand Up @@ -69,8 +70,6 @@ public TlsHandler(Func<Stream, SslStream> sslStreamFactory, TlsSettings settings

bool IsServer => this.settings is ServerTlsSettings;

public void Dispose() => this.sslStream?.Dispose();

public override void ChannelActive(IChannelHandlerContext context)
{
base.ChannelActive(context);
Expand Down Expand Up @@ -344,6 +343,9 @@ void Unwrap(IChannelHandlerContext ctx, IByteBuffer packet, int offset, int leng

outputBuffer = this.pendingSslStreamReadBuffer;
outputBufferLength = outputBuffer.WritableBytes;

this.pendingSslStreamReadFuture = null;
this.pendingSslStreamReadBuffer = null;
}
else
{
Expand All @@ -363,17 +365,23 @@ void Unwrap(IChannelHandlerContext ctx, IByteBuffer packet, int offset, int leng
if (!currentReadFuture.IsCompleted)
{
// we did feed the whole current packet to SslStream yet it did not produce any result -> move to the next packet in input
Contract.Assert(this.mediationStream.SourceReadableBytes == 0);

continue;
}

int read = currentReadFuture.Result;

if (read == 0)
{
//Stream closed
return;
}

// Now output the result of previous read and decide whether to do an extra read on the same source or move forward
AddBufferToOutput(outputBuffer, read, output);

currentReadFuture = null;
outputBuffer = null;
if (this.mediationStream.SourceReadableBytes == 0)
{
// we just made a frame available for reading but there was already pending read so SslStream read it out to make further progress there
Expand Down Expand Up @@ -620,6 +628,7 @@ void HandleFailure(Exception cause)
// Release all resources such as internal buffers that SSLEngine
// is managing.

this.mediationStream.Dispose();
try
{
this.sslStream.Dispose();
Expand Down Expand Up @@ -701,14 +710,13 @@ public void ExpandSource(int count)

this.inputLength += count;

TaskCompletionSource<int> promise = this.readCompletionSource;
if (promise == null)
ArraySegment<byte> sslBuffer = this.sslOwnedBuffer;
if (sslBuffer.Array == null)
{
// there is no pending read operation - keep for future
return;
}

ArraySegment<byte> sslBuffer = this.sslOwnedBuffer;
this.sslOwnedBuffer = default(ArraySegment<byte>);

#if NETSTANDARD1_3
this.readByteCount = this.ReadFromInput(sslBuffer.Array, sslBuffer.Offset, sslBuffer.Count);
Expand All @@ -718,29 +726,35 @@ public void ExpandSource(int count)
{
var self = (MediationStream)ms;
TaskCompletionSource<int> p = self.readCompletionSource;
this.readCompletionSource = null;
self.readCompletionSource = null;
p.TrySetResult(self.readByteCount);
},
this)
.RunSynchronously(TaskScheduler.Default);
#else
int read = this.ReadFromInput(sslBuffer.Array, sslBuffer.Offset, sslBuffer.Count);

TaskCompletionSource<int> promise = this.readCompletionSource;
this.readCompletionSource = null;
promise.TrySetResult(read);
this.readCallback?.Invoke(promise.Task);

AsyncCallback callback = this.readCallback;
this.readCallback = null;
callback?.Invoke(promise.Task);
#endif
}

#if NETSTANDARD1_3
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (this.inputLength - this.inputOffset > 0)
if (this.SourceReadableBytes > 0)
{
// we have the bytes available upfront - write out synchronously
int read = this.ReadFromInput(buffer, offset, count);
return Task.FromResult(read);
}

Contract.Assert(this.sslOwnedBuffer.Array == null);
// take note of buffer - we will pass bytes there once available
this.sslOwnedBuffer = new ArraySegment<byte>(buffer, offset, count);
this.readCompletionSource = new TaskCompletionSource<int>();
Expand All @@ -749,13 +763,16 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
#else
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
if (this.inputLength - this.inputOffset > 0)
if (this.SourceReadableBytes > 0)
{
// we have the bytes available upfront - write out synchronously
int read = this.ReadFromInput(buffer, offset, count);
return this.PrepareSyncReadResult(read, state);
var res = this.PrepareSyncReadResult(read, state);
callback?.Invoke(res);
return res;
}

Contract.Assert(this.sslOwnedBuffer.Array == null);
// take note of buffer - we will pass bytes there once available
this.sslOwnedBuffer = new ArraySegment<byte>(buffer, offset, count);
this.readCompletionSource = new TaskCompletionSource<int>(state);
Expand All @@ -771,6 +788,7 @@ public override int EndRead(IAsyncResult asyncResult)
return syncResult.Result;
}

Debug.Assert(this.readCompletionSource == null || this.readCompletionSource.Task == asyncResult);
Contract.Assert(!((Task<int>)asyncResult).IsCanceled);

try
Expand All @@ -782,12 +800,6 @@ public override int EndRead(IAsyncResult asyncResult)
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw; // unreachable
}
finally
{
this.readCompletionSource = null;
this.readCallback = null;
this.sslOwnedBuffer = default(ArraySegment<byte>);
}
}

IAsyncResult PrepareSyncReadResult(int readBytes, object state)
Expand Down Expand Up @@ -817,51 +829,63 @@ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, As
// write+flush completed synchronously (and successfully)
var result = new SynchronousAsyncResult<int>();
result.AsyncState = state;
callback(result);
callback?.Invoke(result);
return result;
default:
this.writeCallback = callback;
var tcs = new TaskCompletionSource(state);
this.writeCompletion = tcs;
task.ContinueWith(WriteCompleteCallback, this, TaskContinuationOptions.ExecuteSynchronously);
return tcs.Task;
if (callback != null || state != task.AsyncState)
{
Contract.Assert(this.writeCompletion == null);
this.writeCallback = callback;
var tcs = new TaskCompletionSource(state);
this.writeCompletion = tcs;
task.ContinueWith(WriteCompleteCallback, this, TaskContinuationOptions.ExecuteSynchronously);
return tcs.Task;
}
else
{
return task;
}
}
}

static void HandleChannelWriteComplete(Task writeTask, object state)
{
var self = (MediationStream)state;

AsyncCallback callback = self.writeCallback;
self.writeCallback = null;

var promise = self.writeCompletion;
self.writeCompletion = null;

switch (writeTask.Status)
{
case TaskStatus.RanToCompletion:
self.writeCompletion.TryComplete();
promise.TryComplete();
break;
case TaskStatus.Canceled:
self.writeCompletion.TrySetCanceled();
promise.TrySetCanceled();
break;
case TaskStatus.Faulted:
self.writeCompletion.TrySetException(writeTask.Exception);
promise.TrySetException(writeTask.Exception);
break;
default:
throw new ArgumentOutOfRangeException("Unexpected task status: " + writeTask.Status);
}

self.writeCallback?.Invoke(self.writeCompletion.Task);
callback?.Invoke(promise.Task);
}

public override void EndWrite(IAsyncResult asyncResult)
{
this.writeCallback = null;
this.writeCompletion = null;

if (asyncResult is SynchronousAsyncResult<int>)
{
return;
}

try
{
((Task<int>)asyncResult).Wait();
((Task)asyncResult).Wait();
}
catch (AggregateException ex)
{
Expand All @@ -876,7 +900,7 @@ int ReadFromInput(byte[] destination, int destinationOffset, int destinationCapa
Contract.Assert(destination != null);

byte[] source = this.input;
int readableBytes = this.inputLength - this.inputOffset;
int readableBytes = this.SourceReadableBytes;
int length = Math.Min(readableBytes, destinationCapacity);
Buffer.BlockCopy(source, this.inputStartOffset + this.inputOffset, destination, destinationOffset, length);
this.inputOffset += length;
Expand All @@ -894,8 +918,11 @@ protected override void Dispose(bool disposing)
if (disposing)
{
TaskCompletionSource<int> p = this.readCompletionSource;
this.readCompletionSource = null;
p?.TrySetResult(0);
if (p != null)
{
this.readCompletionSource = null;
p.TrySetResult(0);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace DotNetty.Transport.Channels.Sockets
public class TcpServerSocketChannel : AbstractSocketChannel, IServerSocketChannel
{
static readonly IInternalLogger Logger = InternalLoggerFactory.GetInstance<TcpServerSocketChannel>();
static readonly ChannelMetadata METADATA = new ChannelMetadata(false, 16);
static readonly ChannelMetadata METADATA = new ChannelMetadata(false);

static readonly Action<object, object> ReadCompletedSyncCallback = OnReadCompletedSync;

Expand Down
2 changes: 1 addition & 1 deletion test/DotNetty.Buffers.Tests/DotNetty.Buffers.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="Moq" Version="4.7.99" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="Moq" Version="4.7.99" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="Moq" Version="4.7.99" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="Moq" Version="4.7.99" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
Expand Down
Loading

0 comments on commit 797629a

Please sign in to comment.