Skip to content

Commit

Permalink
multi continuation support returned back
Browse files Browse the repository at this point in the history
  • Loading branch information
maksimkim committed Mar 14, 2018
1 parent 2d38d0a commit e1e30c1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
81 changes: 51 additions & 30 deletions src/DotNetty.Common/Concurrency/AbstractPromise.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace DotNetty.Common.Concurrency
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;
Expand All @@ -19,9 +18,9 @@ public abstract class AbstractPromise : IPromise, IValueTaskSource
static readonly Exception CompletedNoException = new Exception();

protected Exception exception;

Action<object> callback;
object callbackState;
int callbackCount;
(Action<object>, object)[] callbacks;

public bool TryComplete() => this.TryComplete0(CompletedNoException);

Expand All @@ -35,7 +34,7 @@ protected virtual bool TryComplete0(Exception exception)
{
// Set the exception object to the exception passed in or a sentinel value
this.exception = exception;
this.TryExecuteCallback();
this.TryExecuteCallbacks();
return true;
}

Expand Down Expand Up @@ -69,32 +68,34 @@ public virtual void GetResult(short token)
if (this.exception == null)
{
throw new InvalidOperationException("Attempt to get result on not yet completed promise");
//ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted();
}

this.IsCompletedOrThrow();
/*
// Change the state from to be canceled -> observed
if (_writerAwaitable.ObserveCancelation())
{
result._resultFlags |= ResultFlags.Canceled;
}
if (_readerCompletion.IsCompletedOrThrow())
{
result._resultFlags |= ResultFlags.Completed;
}
*/
}

public virtual void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
this.callback = continuation;
this.callbackState = state;
//todo: context preservation
if (this.callbacks == null)
{
this.callbacks = new (Action<object>, object)[1];
}

int newIndex = this.callbackCount;
this.callbackCount++;

if (newIndex == this.callbacks.Length)
{
var newArray = new (Action<object>, object)[this.callbacks.Length * 2];
Array.Copy(this.callbacks, newArray, this.callbacks.Length);
this.callbacks = newArray;
}

this.callbacks[newIndex] = (continuation, state);

if (this.exception != null)
{
this.TryExecuteCallback();
this.TryExecuteCallbacks();
}
}

Expand All @@ -119,29 +120,49 @@ bool IsCompletedOrThrow()
[MethodImpl(MethodImplOptions.NoInlining)]
void ThrowLatchedException() => ExceptionDispatchInfo.Capture(this.exception).Throw();

bool TryExecuteCallback()
bool TryExecuteCallbacks()
{
if (this.callback == null)
if (this.callbackCount == 0 || this.callbacks == null)
{
return false;
}

try
List<Exception> exceptions = null;

for (int i = 0; i < this.callbackCount; i++)
{
this.callback(this.callbackState);
return true;
try
{
(Action<object> callback, object state) = this.callbacks[i];
callback(state);
}
catch (Exception ex)
{
if (exceptions == null)
{
exceptions = new List<Exception>();
}

exceptions.Add(ex);
}
}
finally

if (exceptions == null)
{
this.ClearCallback();
return true;
}

throw new AggregateException(exceptions);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ClearCallback()
protected void ClearCallbacks()
{
this.callback = null;
this.callbackState = null;
if (this.callbackCount > 0)
{
this.callbackCount = 0;
Array.Clear(this.callbacks, 0, this.callbacks.Length);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected void Init()
protected virtual void Recycle()
{
this.exception = null;
this.ClearCallback();
this.ClearCallbacks();
this.recycled = true;
this.handle.Release(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ public IPromise Remove()
{
return null;
}
//TaskCompletionSource promise = write.Promise;
ReferenceCountUtil.SafeRelease(write.Messages);
this.Recycle(write, true);
return write;
Expand Down

0 comments on commit e1e30c1

Please sign in to comment.