From 0a0aba607f5ca77c036ff1c46c891714beb6f71a Mon Sep 17 00:00:00 2001 From: Andrew El Date: Mon, 11 Mar 2024 12:48:03 -0400 Subject: [PATCH 1/5] ActorGraphInterpreter Caching --- .../Fusing/ActorGraphInterpreter.cs | 217 +++++++++++++++--- 1 file changed, 189 insertions(+), 28 deletions(-) diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index dba48031db8..14093030037 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -6,9 +6,11 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using Akka.Actor; using Akka.Annotations; using Akka.Event; @@ -21,6 +23,39 @@ // ReSharper disable MemberHidesStaticFromOuterClass namespace Akka.Streams.Implementation.Fusing { + [InternalApi] + public sealed class ImperfectPerformantPool + { + private readonly int _maxItems; + private readonly ConcurrentQueue _itemSet; + private int _curr = 0; + + public ImperfectPerformantPool(int? maxItems = null) + { + _maxItems = maxItems ?? Environment.ProcessorCount; + _itemSet = new ConcurrentQueue(); + } + + public void TryAdd(T item) + { + if (_curr + 1 <= _maxItems) + { + _curr = _curr + 1; + _itemSet.Enqueue(item); + } + } + + public bool TryGetValue(out T item) + { + if (_itemSet.TryDequeue(out item)) + { + _curr = _curr - 1; + return true; + } + + return false; + } + } /// /// INTERNAL API /// @@ -98,11 +133,63 @@ public override string ToString() => "GraphModule\n" + [InternalApi] public sealed class GraphInterpreterShell { + internal sealed class BoxedRequestMore : ActorGraphInterpreter.IBoundaryEvent + { + public ActorGraphInterpreter.RequestMore Boxed { get; private set; } + + public BoxedRequestMore() + { + + } + public BoxedRequestMore SetBox(ActorGraphInterpreter.RequestMore value) + { + Boxed = value; + return this; + } + + public GraphInterpreterShell Shell => Boxed.Shell; + } + internal sealed class BoxedOnNext : ActorGraphInterpreter.IBoundaryEvent + { + public ActorGraphInterpreter.OnNext Boxed { get; private set; } + + public BoxedOnNext() + { + + } + public BoxedOnNext SetBox(ActorGraphInterpreter.OnNext value) + { + Boxed = value; + return this; + } + + public GraphInterpreterShell Shell => Boxed.Shell; + } + private sealed class BoxedAsyncInput : ActorGraphInterpreter.IBoundaryEvent + { + public ActorGraphInterpreter.AsyncInput Boxed { get; private set; } + public BoxedAsyncInput() + { + + } + public BoxedAsyncInput SetBox(ActorGraphInterpreter.AsyncInput value) + { + Boxed = value; + return this; + } + + public GraphInterpreterShell Shell => Boxed.Shell; + } private readonly GraphAssembly _assembly; private readonly Connection[] _connections; private readonly GraphStageLogic[] _logics; private readonly Shape _shape; private readonly ActorMaterializerSettings _settings; + + private readonly ImperfectPerformantPool _onNextPool = + new ImperfectPerformantPool(); + private readonly ImperfectPerformantPool _requestMorePool = + new ImperfectPerformantPool(); /// /// TBD /// @@ -138,6 +225,8 @@ public sealed class GraphInterpreterShell private bool _interpreterCompleted; private readonly ActorGraphInterpreter.Resume _resume; + private readonly ImperfectPerformantPool + _asyncInputPool = new ImperfectPerformantPool(); /// /// TBD /// @@ -156,8 +245,8 @@ public GraphInterpreterShell(GraphAssembly assembly, Connection[] connections, G _settings = settings; Materializer = materializer; - _inputs = new ActorGraphInterpreter.BatchingActorInputBoundary[shape.Inlets.Count()]; - _outputs = new ActorGraphInterpreter.IActorOutputBoundary[shape.Outlets.Count()]; + _inputs = new ActorGraphInterpreter.BatchingActorInputBoundary[shape.Inlets.Length]; + _outputs = new ActorGraphInterpreter.IActorOutputBoundary[shape.Outlets.Length]; _subscribersPending = _inputs.Length; _publishersPending = _outputs.Length; _shellEventLimit = settings.MaxInputBufferSize * (assembly.Inlets.Length + assembly.Outlets.Length); @@ -264,30 +353,38 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) // Cases that are most likely on the hot path, in decreasing order of frequency switch (e) { - case ActorGraphInterpreter.OnNext onNext: - if (IsDebug) Console.WriteLine($"{Interpreter.Name} OnNext {onNext.Event} id={onNext.Id}"); - _inputs[onNext.Id].OnNext(onNext.Event); - return RunBatch(eventLimit); - - case ActorGraphInterpreter.RequestMore requestMore: - if (IsDebug) Console.WriteLine($"{Interpreter.Name} Request {requestMore.Demand} id={requestMore.Id}"); - _outputs[requestMore.Id].RequestMore(requestMore.Demand); - return RunBatch(eventLimit); - + case BoxedOnNext bOn: + var asO = bOn.Boxed; + bOn.SetBox(default); + _onNextPool.TryAdd(bOn); + return RunOnNextMeth(asO); + break; + // Case unused: + //case ActorGraphInterpreter.OnNext onNext: + // return RunOnNextMeth(onNext); + + //case ActorGraphInterpreter.RequestMore requestMore: + // return RunRequestMore(requestMore); + case BoxedRequestMore bRm: + var asR = bRm.Boxed; + bRm.SetBox(default); + _requestMorePool.TryAdd(bRm); + return RunRequestMore(asR); + case ActorGraphInterpreter.Resume _: if (IsDebug) Console.WriteLine($"{Interpreter.Name} Resume"); if (Interpreter.IsSuspended) return RunBatch(eventLimit); return eventLimit; - - case ActorGraphInterpreter.AsyncInput asyncInput: - Interpreter.RunAsyncInput(asyncInput.Logic, asyncInput.Event, asyncInput.Handler); - if (eventLimit == 1 && _interpreter.IsSuspended) - { - SendResume(true); - return 0; - } - return RunBatch(eventLimit - 1); + + case BoxedAsyncInput bAi: + var asI = bAi.Boxed; + bAi.SetBox(default); + _asyncInputPool.TryAdd(bAi); + return RunAsyncInputMeth(asI); + + //case ActorGraphInterpreter.AsyncInput asyncInput: + // return RunAsyncInputMeth(asyncInput); case ActorGraphInterpreter.OnError onError: if (IsDebug) Console.WriteLine($"{Interpreter.Name} OnError id={onError.Id}"); @@ -320,7 +417,32 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) return eventLimit; } + int RunRequestMore(ActorGraphInterpreter.RequestMore requestMore1) + { + if (IsDebug) Console.WriteLine($"{Interpreter.Name} Request {requestMore1.Demand} id={requestMore1.Id}"); + _outputs[requestMore1.Id].RequestMore(requestMore1.Demand); + return RunBatch(eventLimit); + } + + int RunOnNextMeth(ActorGraphInterpreter.OnNext onNext1) + { + if (IsDebug) Console.WriteLine($"{Interpreter.Name} OnNext {onNext1.Event} id={onNext1.Id}"); + _inputs[onNext1.Id].OnNext(onNext1.Event); + return RunBatch(eventLimit); + } + return eventLimit; + + int RunAsyncInputMeth(ActorGraphInterpreter.AsyncInput asyncInput) + { + Interpreter.RunAsyncInput(asyncInput.Logic, asyncInput.Event, asyncInput.Handler); + if (eventLimit == 1 && _interpreter.IsSuspended) + { + SendResume(true); + return 0; + } + return RunBatch(eventLimit - 1); + } } #pragma warning restore CS0162 @@ -411,12 +533,25 @@ private GraphInterpreter GetInterpreter() return new GraphInterpreter(_assembly, Materializer, Log, _logics, _connections, (logic, @event, handler) => { - var asyncInput = new ActorGraphInterpreter.AsyncInput(this, logic, @event, handler); + if (_asyncInputPool.TryGetValue(out var bAi) == false) + { + bAi = new BoxedAsyncInput(); + } var currentInterpreter = CurrentInterpreterOrNull; - if (currentInterpreter == null || !Equals(currentInterpreter.Context, Self)) - Self.Tell(new ActorGraphInterpreter.AsyncInput(this, logic, @event, handler)); + if (currentInterpreter == null || + !Equals(currentInterpreter.Context, Self)) + { + Self.Tell(bAi.SetBox( + new ActorGraphInterpreter.AsyncInput(this, + logic, @event, handler))); + //Self.Tell(new ActorGraphInterpreter.AsyncInput(this, + // logic, @event, handler)); + } else - _enqueueToShortCircuit(asyncInput); + _enqueueToShortCircuit(bAi.SetBox( + new ActorGraphInterpreter.AsyncInput(this, + logic, @event, handler))); + //_enqueueToShortCircuit( new ActorGraphInterpreter.AsyncInput(this, logic, @event, handler)); }, _settings.IsFuzzingMode, Self); } @@ -430,6 +565,26 @@ private ILoggingAdapter GetLogger() /// /// TBD public override string ToString() => $"GraphInterpreterShell\n {_assembly.ToString().Replace("\n", "\n ")}"; + + internal BoxedOnNext GetNextBuffer() + { + if (_onNextPool.TryGetValue(out var rV) == false) + { + rV = new BoxedOnNext(); + } + + return rV; + } + + internal BoxedRequestMore GetNextRequestMore() + { + if (_requestMorePool.TryGetValue(out var rV) == false) + { + rV = new BoxedRequestMore(); + } + + return rV; + } } /// @@ -736,7 +891,7 @@ public AsyncInput(GraphInterpreterShell shell, GraphStageLogic logic, object @ev /// /// TBD /// - public readonly struct Resume : IBoundaryEvent + public class Resume : IBoundaryEvent { /// /// TBD @@ -828,7 +983,12 @@ public BoundarySubscription(IActorRef parent, GraphInterpreterShell shell, int i /// TBD /// /// TBD - public void Request(long elements) => _parent.Tell(new RequestMore(_shell, _id, elements)); + public void Request(long elements) + { + var bRm = _shell.GetNextRequestMore(); + _parent.Tell(bRm.SetBox(new RequestMore(_shell, _id, elements))); + //_parent.Tell(new RequestMore(_shell, _id, elements)); + } /// /// TBD @@ -899,7 +1059,8 @@ public void OnError(Exception cause) public void OnNext(T element) { ReactiveStreamsCompliance.RequireNonNullElement(element); - _parent.Tell(new OnNext(_shell, _id, element)); + _parent.Tell(_shell.GetNextBuffer().SetBox(new OnNext(_shell, _id, element))); + //_parent.Tell(new OnNext(_shell, _id, element)); } } From 9241f06a5b9a7fba3fcfc7c6066393a4ffdda4a1 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 16 Mar 2024 14:59:13 -0400 Subject: [PATCH 2/5] Benchmarks --- .../Streams/SelectAsyncBenchmarks.cs | 119 +++++++++ .../Streams/UnfoldAsyncBenchmarks.cs | 122 ++++++++++ .../Streams/UnfoldResourceAsyncBenchmarks.cs | 230 ++++++++++++++++++ 3 files changed, 471 insertions(+) create mode 100644 src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs create mode 100644 src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs create mode 100644 src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs diff --git a/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs new file mode 100644 index 00000000000..c942e16d4d9 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs @@ -0,0 +1,119 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class SelectAsyncBenchmarks +{ + public readonly struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncCh; + private Task selectAsyncSyncStub; + private Channel asyncChSync; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncCh = Channel.CreateUnbounded(); + + asyncChSync = Channel.CreateUnbounded(); + + + selectAsyncSyncStub = Source.ChannelReader(asyncChSync.Reader) + .SelectAsync(4, a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + } + + return Task.FromResult(NotUsed.Instance); + }).RunWith(Sink.Ignore(), materializer); + + + selectAsyncStub = Source.ChannelReader(asyncCh.Reader) + .SelectAsync(4, async a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + //await Task.Yield(); + await Task.Delay(0); + } + + return NotUsed.Instance; + }).RunWith(Sink.Ignore(), materializer); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + } + + [Benchmark] + public async Task RunSelectAsync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task RunSelectAsyncSync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncChSync.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs new file mode 100644 index 00000000000..5bf3bf0504c --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs @@ -0,0 +1,122 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class UnfoldAsyncBenchmarks +{ + public readonly struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncNoYieldCh; + private Task selectValueTaskAsyncStub; + private Task unfoldAsyncSyncStub; + private Task selectAsyncValueTaskSyncStub; + private Channel asyncYieldCh; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncNoYieldCh = Channel.CreateUnbounded(); + + asyncYieldCh = Channel.CreateUnbounded(); + + + unfoldAsyncSyncStub = Source.UnfoldAsync,int>(asyncYieldCh.Reader, async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return (r, -1); + } + else + { + return (r, i.IntValue); + } + }) + .RunWith(Sink.Ignore(), materializer); + + selectAsyncStub = Source.UnfoldAsync,int>(asyncNoYieldCh.Reader,async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return (r, -1); + } + else + { + return (r, a.IntValue); + } + }).RunWith(Sink.Ignore(), materializer); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + } + + [Benchmark] + public async Task UnfoldAsyncYieldInConsume() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task UnfoldAsyncYieldInPush() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs new file mode 100644 index 00000000000..2bec31825fe --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs @@ -0,0 +1,230 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using Akka.Streams.Implementation.Fusing; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class UnfoldResourceAsyncBenchmarks +{ + + public readonly struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncNoYieldCh; + private Task unfoldAsyncSyncStub; + private Channel asyncYieldCh; + private Channel straightCh; + private Task straightTask; + private CancellationTokenSource straightChTokenSource; + private Channel straightYieldCh; + private Task straightYieldTask; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncNoYieldCh = Channel.CreateUnbounded(); + + asyncYieldCh = Channel.CreateUnbounded(); + + + unfoldAsyncSyncStub = Source.UnfoldResourceAsync>(()=> Task.FromResult(asyncYieldCh.Reader), async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return -1; + } + else + { + return i.IntValue; + } + }, (r)=> Task.FromResult(Done.Instance)) + .RunWith(Sink.Ignore(), materializer); + + selectAsyncStub = Source.UnfoldResourceAsync>(()=>Task.FromResult(asyncNoYieldCh.Reader),async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return -1; + } + else + { + //await Task.Yield(); + // await Task.Delay(0); + return a.IntValue; + } + }, (r)=> Task.FromResult(Done.Instance) ).RunWith(Sink.Ignore(), materializer); + + + straightChTokenSource = new CancellationTokenSource(); + straightCh = Channel.CreateUnbounded(); + + + straightTask = Task.Run(async () => + { + static async IAsyncEnumerable GetEnumerator( + ChannelReader reader, CancellationToken token) + { + while (token.IsCancellationRequested == false) + { + await Task.Yield(); + var a = await reader.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + yield return -1; + } + else + { + //await Task.Yield(); + //await Task.Delay(0); + yield return a.IntValue; + } + } + } + var r = straightCh.Reader; + await foreach (var v in GetEnumerator(r,straightChTokenSource.Token)) + { + + } + }); + + straightYieldCh = Channel.CreateUnbounded(); + + + straightYieldTask = Task.Run(async () => + { + static async IAsyncEnumerable GetEnumerator( + ChannelReader reader, CancellationToken token) + { + while (token.IsCancellationRequested == false) + { + var a = await reader.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + yield return -1; + } + else + { + //await Task.Yield(); + //await Task.Delay(0); + yield return a.IntValue; + } + } + } + var r = straightYieldCh.Reader; + await foreach (var v in GetEnumerator(r,straightChTokenSource.Token)) + { + + } + }); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + straightChTokenSource.Cancel(); + } + + [Benchmark] + public async Task UnfoldResourceAsyncNoYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task UnfoldResourceAsyncWithYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + + [Benchmark] + public async Task StraightChannelReadNoYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + straightCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + straightCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task StraightChannelReadWithYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + straightYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + straightYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } +} \ No newline at end of file From cf8d3c5c06b8a85baf1ccc0d53d0337a2b154940 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 16 Mar 2024 15:42:13 -0400 Subject: [PATCH 3/5] GenBox --- .../Fusing/ActorGraphInterpreter.cs | 75 ++++++------------- 1 file changed, 23 insertions(+), 52 deletions(-) diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index 14093030037..5b16e90ec28 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -133,46 +133,17 @@ public override string ToString() => "GraphModule\n" + [InternalApi] public sealed class GraphInterpreterShell { - internal sealed class BoxedRequestMore : ActorGraphInterpreter.IBoundaryEvent + internal sealed class + BoxedBoundaryEvent : ActorGraphInterpreter.IBoundaryEvent + where T : struct, ActorGraphInterpreter.IBoundaryEvent { - public ActorGraphInterpreter.RequestMore Boxed { get; private set; } + public T Boxed { get; private set; } - public BoxedRequestMore() + public BoxedBoundaryEvent() { } - public BoxedRequestMore SetBox(ActorGraphInterpreter.RequestMore value) - { - Boxed = value; - return this; - } - - public GraphInterpreterShell Shell => Boxed.Shell; - } - internal sealed class BoxedOnNext : ActorGraphInterpreter.IBoundaryEvent - { - public ActorGraphInterpreter.OnNext Boxed { get; private set; } - - public BoxedOnNext() - { - - } - public BoxedOnNext SetBox(ActorGraphInterpreter.OnNext value) - { - Boxed = value; - return this; - } - - public GraphInterpreterShell Shell => Boxed.Shell; - } - private sealed class BoxedAsyncInput : ActorGraphInterpreter.IBoundaryEvent - { - public ActorGraphInterpreter.AsyncInput Boxed { get; private set; } - public BoxedAsyncInput() - { - - } - public BoxedAsyncInput SetBox(ActorGraphInterpreter.AsyncInput value) + public BoxedBoundaryEvent SetBox(T value) { Boxed = value; return this; @@ -180,16 +151,20 @@ public BoxedAsyncInput SetBox(ActorGraphInterpreter.AsyncInput value) public GraphInterpreterShell Shell => Boxed.Shell; } + private readonly GraphAssembly _assembly; private readonly Connection[] _connections; private readonly GraphStageLogic[] _logics; private readonly Shape _shape; private readonly ActorMaterializerSettings _settings; - private readonly ImperfectPerformantPool _onNextPool = - new ImperfectPerformantPool(); - private readonly ImperfectPerformantPool _requestMorePool = - new ImperfectPerformantPool(); + private readonly ImperfectPerformantPool> _onNextPool = + new ImperfectPerformantPool>(); + private readonly ImperfectPerformantPool> _requestMorePool = + new ImperfectPerformantPool>(); + private readonly ImperfectPerformantPool> + _asyncInputPool = new ImperfectPerformantPool>(); + private readonly ActorGraphInterpreter.Resume _resume; /// /// TBD /// @@ -214,7 +189,6 @@ public BoxedAsyncInput SetBox(ActorGraphInterpreter.AsyncInput value) private readonly int _abortLimit; private readonly ActorGraphInterpreter.BatchingActorInputBoundary[] _inputs; private readonly ActorGraphInterpreter.IActorOutputBoundary[] _outputs; - private ILoggingAdapter _log; private GraphInterpreter _interpreter; private int _subscribersPending; @@ -223,10 +197,7 @@ public BoxedAsyncInput SetBox(ActorGraphInterpreter.AsyncInput value) private bool _waitingForShutdown; private Action _enqueueToShortCircuit; private bool _interpreterCompleted; - private readonly ActorGraphInterpreter.Resume _resume; - - private readonly ImperfectPerformantPool - _asyncInputPool = new ImperfectPerformantPool(); + /// /// TBD /// @@ -353,7 +324,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) // Cases that are most likely on the hot path, in decreasing order of frequency switch (e) { - case BoxedOnNext bOn: + case BoxedBoundaryEvent bOn: var asO = bOn.Boxed; bOn.SetBox(default); _onNextPool.TryAdd(bOn); @@ -365,7 +336,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) //case ActorGraphInterpreter.RequestMore requestMore: // return RunRequestMore(requestMore); - case BoxedRequestMore bRm: + case BoxedBoundaryEvent bRm: var asR = bRm.Boxed; bRm.SetBox(default); _requestMorePool.TryAdd(bRm); @@ -377,7 +348,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) return RunBatch(eventLimit); return eventLimit; - case BoxedAsyncInput bAi: + case BoxedBoundaryEvent bAi: var asI = bAi.Boxed; bAi.SetBox(default); _asyncInputPool.TryAdd(bAi); @@ -535,7 +506,7 @@ private GraphInterpreter GetInterpreter() { if (_asyncInputPool.TryGetValue(out var bAi) == false) { - bAi = new BoxedAsyncInput(); + bAi = new(); } var currentInterpreter = CurrentInterpreterOrNull; if (currentInterpreter == null || @@ -566,21 +537,21 @@ private ILoggingAdapter GetLogger() /// TBD public override string ToString() => $"GraphInterpreterShell\n {_assembly.ToString().Replace("\n", "\n ")}"; - internal BoxedOnNext GetNextBuffer() + internal BoxedBoundaryEvent GetNextBuffer() { if (_onNextPool.TryGetValue(out var rV) == false) { - rV = new BoxedOnNext(); + rV = new BoxedBoundaryEvent(); } return rV; } - internal BoxedRequestMore GetNextRequestMore() + internal BoxedBoundaryEvent GetNextRequestMore() { if (_requestMorePool.TryGetValue(out var rV) == false) { - rV = new BoxedRequestMore(); + rV = new(); } return rV; From e10e7196b1573b4ca2eddc1d44974cf2cb0cf029 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 16 Mar 2024 17:13:27 -0400 Subject: [PATCH 4/5] ObjectPool --- .../Fusing/ActorGraphInterpreter.cs | 63 ++--- .../Implementation/Fusing/ObjectPoolStuff.cs | 224 ++++++++++++++++++ 2 files changed, 238 insertions(+), 49 deletions(-) create mode 100644 src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index 5b16e90ec28..d0b3806566d 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -10,7 +10,6 @@ using System.Collections.Generic; using System.Linq; using System.Text; -using System.Threading; using Akka.Actor; using Akka.Annotations; using Akka.Event; @@ -23,39 +22,6 @@ // ReSharper disable MemberHidesStaticFromOuterClass namespace Akka.Streams.Implementation.Fusing { - [InternalApi] - public sealed class ImperfectPerformantPool - { - private readonly int _maxItems; - private readonly ConcurrentQueue _itemSet; - private int _curr = 0; - - public ImperfectPerformantPool(int? maxItems = null) - { - _maxItems = maxItems ?? Environment.ProcessorCount; - _itemSet = new ConcurrentQueue(); - } - - public void TryAdd(T item) - { - if (_curr + 1 <= _maxItems) - { - _curr = _curr + 1; - _itemSet.Enqueue(item); - } - } - - public bool TryGetValue(out T item) - { - if (_itemSet.TryDequeue(out item)) - { - _curr = _curr - 1; - return true; - } - - return false; - } - } /// /// INTERNAL API /// @@ -138,7 +104,6 @@ internal sealed class where T : struct, ActorGraphInterpreter.IBoundaryEvent { public T Boxed { get; private set; } - public BoxedBoundaryEvent() { @@ -158,12 +123,12 @@ public BoxedBoundaryEvent SetBox(T value) private readonly Shape _shape; private readonly ActorMaterializerSettings _settings; - private readonly ImperfectPerformantPool> _onNextPool = - new ImperfectPerformantPool>(); - private readonly ImperfectPerformantPool> _requestMorePool = - new ImperfectPerformantPool>(); - private readonly ImperfectPerformantPool> - _asyncInputPool = new ImperfectPerformantPool>(); + private readonly ObjectPoolStuff.ObjectPoolV2 _onNextPool = + new (Environment.ProcessorCount*2); + private readonly ObjectPoolStuff.ObjectPoolV2 _requestMorePool = + new (Environment.ProcessorCount*2); + private readonly ObjectPoolStuff.ObjectPoolV2 + _asyncInputPool = new (Environment.ProcessorCount*2); private readonly ActorGraphInterpreter.Resume _resume; /// /// TBD @@ -327,7 +292,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) case BoxedBoundaryEvent bOn: var asO = bOn.Boxed; bOn.SetBox(default); - _onNextPool.TryAdd(bOn); + _onNextPool.TryPush(bOn); return RunOnNextMeth(asO); break; // Case unused: @@ -339,7 +304,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) case BoxedBoundaryEvent bRm: var asR = bRm.Boxed; bRm.SetBox(default); - _requestMorePool.TryAdd(bRm); + _requestMorePool.TryPush(bRm); return RunRequestMore(asR); case ActorGraphInterpreter.Resume _: @@ -351,7 +316,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) case BoxedBoundaryEvent bAi: var asI = bAi.Boxed; bAi.SetBox(default); - _asyncInputPool.TryAdd(bAi); + _asyncInputPool.TryPush(bAi); return RunAsyncInputMeth(asI); //case ActorGraphInterpreter.AsyncInput asyncInput: @@ -504,7 +469,7 @@ private GraphInterpreter GetInterpreter() return new GraphInterpreter(_assembly, Materializer, Log, _logics, _connections, (logic, @event, handler) => { - if (_asyncInputPool.TryGetValue(out var bAi) == false) + if (_asyncInputPool.TryPop(out var bAi) == false) { bAi = new(); } @@ -539,7 +504,7 @@ private ILoggingAdapter GetLogger() internal BoxedBoundaryEvent GetNextBuffer() { - if (_onNextPool.TryGetValue(out var rV) == false) + if (_onNextPool.TryPop(out var rV) == false) { rV = new BoxedBoundaryEvent(); } @@ -549,7 +514,7 @@ private ILoggingAdapter GetLogger() internal BoxedBoundaryEvent GetNextRequestMore() { - if (_requestMorePool.TryGetValue(out var rV) == false) + if (_requestMorePool.TryPop(out var rV) == false) { rV = new(); } @@ -1620,9 +1585,9 @@ protected override bool Receive(object message) { switch (message) { - case IBoundaryEvent _: + case IBoundaryEvent ba: _currentLimit = _eventLimit; - ProcessEvent((IBoundaryEvent)message); + ProcessEvent(ba); if (_shortCircuitBuffer != null) ShortCircuitBatch(); return true; diff --git a/src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs b/src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs new file mode 100644 index 00000000000..d2c233370ba --- /dev/null +++ b/src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs @@ -0,0 +1,224 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using Akka.Annotations; + +namespace Akka.Streams.Implementation.Fusing; + +/// +/// This is an object pool based on +/// +[InternalApi] +internal sealed class ObjectPoolStuff +{ + /* + internal interface IObjectPoolNode + { + ref T? NextNode { get; } + } + + internal sealed class ObjectPool + { + private static int typeId = -1; // Increment by IdentityGenerator + + private readonly object _gate = new object(); + private readonly int _poolLimit; + private object[] _poolNodes = new object[4]; // ObjectPool[] + + // pool-limit per type. + public ObjectPool(int poolLimit) + { + _poolLimit = poolLimit; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryRent([NotNullWhen(true)] out T? value) + where T : class, IObjectPoolNode + { + // poolNodes is grow only, safe to access indexer with no-lock + var id = IdentityGenerator.Identity; + if (id < _poolNodes.Length && + _poolNodes[id] is ObjectPool pool) + { + return pool.TryPop(out value); + } + + Grow(id); + value = default; + return false; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool Return(T value) + where T : class, IObjectPoolNode + { + var id = IdentityGenerator.Identity; + if (id < _poolNodes.Length && + _poolNodes[id] is ObjectPool pool) + { + return pool.TryPush(value); + } + + return false; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private void Grow(int id) + where T : class, IObjectPoolNode + { + lock (_gate) + { + if (_poolNodes.Length <= id) + { + Array.Resize(ref _poolNodes, + Math.Max(_poolNodes.Length * 2, id + 1)); + _poolNodes[id] = new ObjectPool(_poolLimit); + } + else if (_poolNodes[id] == null) + { + _poolNodes[id] = new ObjectPool(_poolLimit); + } + else + { + // other thread already created new ObjectPool so do nothing. + } + } + } + + // avoid for Dictionary lookup cost. + private static class IdentityGenerator + { +#pragma warning disable SA1401 + public static int Identity; +#pragma warning restore SA1401 + + static IdentityGenerator() + { + Identity = Interlocked.Increment(ref typeId); + } + } + } +*/ + + internal sealed class ObjectPoolV2 + where T : struct, ActorGraphInterpreter.IBoundaryEvent + { + private readonly int _limit; + private readonly ConcurrentQueue> _items = new(); + //private readonly decimal _softLimit = default; + //private readonly decimal _softLimit2 = default; + //private readonly decimal _softLimit3 = default; + //private readonly decimal _softLimit4 = default; + private int _size; + //private readonly decimal _hardLimit1 = default; + //private readonly decimal _hardLimit2 = default; + //private readonly decimal _hardLimit3 = default; + //private readonly decimal _hardLimit4 = default; + public ObjectPoolV2(int size) + { + _limit = size+3; + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryPop([NotNullWhen(true)] out GraphInterpreterShell.BoxedBoundaryEvent? result) + { + // Instead of lock, use CompareExchange gate. + // In a worst case, missed cached object(create new one) but it's not a big deal. + if (_items.TryDequeue(out result)) + { + Interlocked.Decrement(ref _size); + return true; + } + + return false; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void TryPush(GraphInterpreterShell.BoxedBoundaryEvent item) + { + if (Interlocked.Increment(ref _size) <= _limit) + { + _items.Enqueue(item); + } + else + { + Interlocked.Decrement(ref _size); + } + } + } + + /* + internal sealed class ObjectPool + where T : class, IObjectPoolNode + { + private readonly int _limit; + private int _gate; + private int _size; + private T? _root; + + public ObjectPool(int limit) + { + _limit = limit; + } + + public int Size => _size; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryPop([NotNullWhen(true)] out T? result) + { + // Instead of lock, use CompareExchange gate. + // In a worst case, missed cached object(create new one) but it's not a big deal. + if (Interlocked.CompareExchange(ref _gate, 1, 0) == 0) + { + var v = _root; + if (!(v is null)) + { + ref var nextNode = ref v.NextNode; + _root = nextNode; + nextNode = null; + _size--; + result = v; + Volatile.Write(ref _gate, 0); + return true; + } + + Volatile.Write(ref _gate, 0); + } + + result = default; + return false; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool TryPush(T item) + { + if (Interlocked.CompareExchange(ref _gate, 1, 0) == 0) + { + if (_size < _limit) + { + item.NextNode = _root; + _root = item; + _size++; + Volatile.Write(ref _gate, 0); + return true; + } + else + { + Volatile.Write(ref _gate, 0); + } + } + + return false; + } + } + */ +} \ No newline at end of file From cec75720b4e363212567eda1dc8bb7f234f11480 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sun, 17 Mar 2024 16:47:33 -0400 Subject: [PATCH 5/5] Make ObjectPool less specific --- .../Fusing/ActorGraphInterpreter.cs | 15 ++++++--------- .../Implementation/Fusing/ObjectPoolStuff.cs | 7 +++---- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index d0b3806566d..412279e57df 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -123,11 +123,11 @@ public BoxedBoundaryEvent SetBox(T value) private readonly Shape _shape; private readonly ActorMaterializerSettings _settings; - private readonly ObjectPoolStuff.ObjectPoolV2 _onNextPool = + private readonly ObjectPoolStuff.ObjectPoolV2> _onNextPool = new (Environment.ProcessorCount*2); - private readonly ObjectPoolStuff.ObjectPoolV2 _requestMorePool = + private readonly ObjectPoolStuff.ObjectPoolV2> _requestMorePool = new (Environment.ProcessorCount*2); - private readonly ObjectPoolStuff.ObjectPoolV2 + private readonly ObjectPoolStuff.ObjectPoolV2> _asyncInputPool = new (Environment.ProcessorCount*2); private readonly ActorGraphInterpreter.Resume _resume; /// @@ -291,8 +291,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) { case BoxedBoundaryEvent bOn: var asO = bOn.Boxed; - bOn.SetBox(default); - _onNextPool.TryPush(bOn); + _onNextPool.TryPush(bOn.SetBox(default)); return RunOnNextMeth(asO); break; // Case unused: @@ -303,8 +302,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) // return RunRequestMore(requestMore); case BoxedBoundaryEvent bRm: var asR = bRm.Boxed; - bRm.SetBox(default); - _requestMorePool.TryPush(bRm); + _requestMorePool.TryPush(bRm.SetBox(default)); return RunRequestMore(asR); case ActorGraphInterpreter.Resume _: @@ -315,8 +313,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) case BoxedBoundaryEvent bAi: var asI = bAi.Boxed; - bAi.SetBox(default); - _asyncInputPool.TryPush(bAi); + _asyncInputPool.TryPush(bAi.SetBox(default)); return RunAsyncInputMeth(asI); //case ActorGraphInterpreter.AsyncInput asyncInput: diff --git a/src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs b/src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs index d2c233370ba..aac0b1027ac 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ObjectPoolStuff.cs @@ -111,10 +111,9 @@ static IdentityGenerator() */ internal sealed class ObjectPoolV2 - where T : struct, ActorGraphInterpreter.IBoundaryEvent { private readonly int _limit; - private readonly ConcurrentQueue> _items = new(); + private readonly ConcurrentQueue _items = new(); //private readonly decimal _softLimit = default; //private readonly decimal _softLimit2 = default; //private readonly decimal _softLimit3 = default; @@ -129,7 +128,7 @@ public ObjectPoolV2(int size) _limit = size+3; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryPop([NotNullWhen(true)] out GraphInterpreterShell.BoxedBoundaryEvent? result) + public bool TryPop([NotNullWhen(true)] out T? result) { // Instead of lock, use CompareExchange gate. // In a worst case, missed cached object(create new one) but it's not a big deal. @@ -143,7 +142,7 @@ public bool TryPop([NotNullWhen(true)] out GraphInterpreterShell.BoxedBoundaryEv } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void TryPush(GraphInterpreterShell.BoxedBoundaryEvent item) + public void TryPush(T item) { if (Interlocked.Increment(ref _size) <= _limit) {