From 9068d9684fd29adee42efbb2e4a993fb8c43fa3f Mon Sep 17 00:00:00 2001 From: Thorsten Thiel Date: Wed, 11 Sep 2024 18:52:25 +0200 Subject: [PATCH] WIP --- src/Benchmark/Bench.cs | 27 ++++++++++++------- src/Benchmark/Program.cs | 18 +++++++++++-- src/Fluss.sln.DotSettings.user | 3 +++ src/Fluss/Fluss.csproj | 1 + src/Fluss/UnitOfWork/IUnitOfWork.cs | 4 +-- src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs | 10 ++++--- src/Fluss/UnitOfWork/UnitOfWork.cs | 8 ++++++ src/Fluss/UnitOfWork/UnitOfWorkFactory.cs | 4 +-- .../UnitOfWork/UnitOfWorkRecordingProxy.cs | 9 ++++++- src/Fluss/Validation/RootValidator.cs | 3 ++- 10 files changed, 66 insertions(+), 21 deletions(-) diff --git a/src/Benchmark/Bench.cs b/src/Benchmark/Bench.cs index f1d7c3c..c90a400 100644 --- a/src/Benchmark/Bench.cs +++ b/src/Benchmark/Bench.cs @@ -1,4 +1,5 @@ -using BenchmarkDotNet.Attributes; +using BenchmarkDotNet_GitCompare; +using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Jobs; using Fluss; @@ -9,8 +10,10 @@ namespace Benchmark; -[SimpleJob(baseline: true)] -[SimpleJob(RuntimeMoniker.Net90)] +[SimpleJob(id: "with_pooled")] +[GitJob(id: "with_list")] +[GitJob("cd630d7", baseline: true, id: "original")] +//[SimpleJob(RuntimeMoniker.Net90)] [MemoryDiagnoser] public class Bench { @@ -40,7 +43,7 @@ await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => { } }); - var unitOfWork = _sp.GetSystemUserUnitOfWork(); + using var unitOfWork = _sp.GetSystemUserUnitOfWork(); var readModel1 = await unitOfWork.GetReadModel(1); var readModel2 = await unitOfWork.GetReadModel(2); sum += readModel1.GotEvents + readModel2.GotEvents; @@ -67,13 +70,13 @@ public void SetupHeavyRead() } - [Benchmark] + //[Benchmark] public async Task PublishEventsAndReadReadHeavySingleReadModel() { var sum = 0; for (var j = 0; j < 50000; j++) { - var unitOfWork = _sp.GetSystemUserUnitOfWork(); + using var unitOfWork = _sp.GetSystemUserUnitOfWork(); var readModel1 = await unitOfWork.GetReadModel(3); sum += readModel1.GotEvents; } @@ -81,13 +84,13 @@ public async Task PublishEventsAndReadReadHeavySingleReadModel() return sum; } - [Benchmark] + //[Benchmark] public async Task PublishEventsAndReadReadHeavyMultipleReadModel() { var sum = 0; for (var j = 1; j < 5000; j++) { - var unitOfWork = _sp.GetSystemUserUnitOfWork(); + using var unitOfWork = _sp.GetSystemUserUnitOfWork(); var readModel1 = await unitOfWork.GetReadModel(j); sum += readModel1.GotEvents; } @@ -136,4 +139,10 @@ protected override EventsModEqualReadModel When(EventEnvelope envelope) _ => this, }; } -} \ No newline at end of file +} + +/* + 00:00:00.0000184 + 00:00:00.0086291 + 00:00:00.6152374 +*/ \ No newline at end of file diff --git a/src/Benchmark/Program.cs b/src/Benchmark/Program.cs index d7bd483..0746213 100644 --- a/src/Benchmark/Program.cs +++ b/src/Benchmark/Program.cs @@ -1,4 +1,18 @@ -using Benchmark; +using System.Diagnostics; +using Benchmark; using BenchmarkDotNet.Running; -BenchmarkRunner.Run(); \ No newline at end of file +//BenchmarkRunner.Run(); + +for (var j = 0; j < 10; j++) +{ + var watch = Stopwatch.StartNew(); + var b = new Bench(); + Console.WriteLine(watch.Elapsed); + watch.Restart(); + b.Setup(); + Console.WriteLine(watch.Elapsed); + watch.Restart(); + await b.PublishEventsAndReadMixedReadWrite(); + Console.WriteLine(watch.Elapsed); +} diff --git a/src/Fluss.sln.DotSettings.user b/src/Fluss.sln.DotSettings.user index 448c27b..74d8620 100644 --- a/src/Fluss.sln.DotSettings.user +++ b/src/Fluss.sln.DotSettings.user @@ -1,6 +1,9 @@  ForceIncluded ForceIncluded + /home/enterprize1/Downloads/dotnet8/sdk/9.0.100-preview.7.24407.12/MSBuild.dll + /home/enterprize1/Downloads/dotnet8/dotnet + True C:\Users\Enterprize1\AppData\Local\JetBrains\Rider2024.2\resharper-host\temp\Rider\vAny\CoverageData\_Fluss.-842573491\Snapshot\snapshot.utdcvr <SessionState ContinuousTestingMode="0" IsActive="True" Name="Session" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"> diff --git a/src/Fluss/Fluss.csproj b/src/Fluss/Fluss.csproj index 1bbda25..d261872 100644 --- a/src/Fluss/Fluss.csproj +++ b/src/Fluss/Fluss.csproj @@ -10,6 +10,7 @@ + diff --git a/src/Fluss/UnitOfWork/IUnitOfWork.cs b/src/Fluss/UnitOfWork/IUnitOfWork.cs index 6299ea6..144f087 100644 --- a/src/Fluss/UnitOfWork/IUnitOfWork.cs +++ b/src/Fluss/UnitOfWork/IUnitOfWork.cs @@ -4,11 +4,11 @@ namespace Fluss; -public interface IUnitOfWork +public interface IUnitOfWork : IDisposable { ValueTask ConsistentVersion(); IReadOnlyCollection ReadModels { get; } - List PublishedEventEnvelopes { get; } + IReadOnlyCollection PublishedEventEnvelopes { get; } ValueTask GetReadModel(Type tReadModel, object? key, long? at = null); diff --git a/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs b/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs index 8a670b1..62c212a 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using Collections.Pooled; using Fluss.Aggregates; using Fluss.Events; // ReSharper disable LoopCanBeConvertedToQuery @@ -7,7 +8,8 @@ namespace Fluss; public partial class UnitOfWork { - public List PublishedEventEnvelopes { get; } = new(); + internal readonly PooledList _publishedEventEnvelopes = []; + public IReadOnlyCollection PublishedEventEnvelopes => _publishedEventEnvelopes; public async ValueTask GetAggregate() where TAggregate : AggregateRoot, new() { @@ -67,8 +69,8 @@ public async ValueTask Publish(Event @event, AggregateRoot? aggregate = null) } await ValidateEventResult(eventEnvelope, aggregate); - - PublishedEventEnvelopes.Add(eventEnvelope); + + _publishedEventEnvelopes.Add(eventEnvelope); } private async ValueTask ValidateEventResult(EventEnvelope envelope, T? aggregate) where T : AggregateRoot @@ -96,7 +98,7 @@ internal async ValueTask CommitInternal() await _eventRepository.Publish(PublishedEventEnvelopes); _consistentVersion += PublishedEventEnvelopes.Count; - PublishedEventEnvelopes.Clear(); + _publishedEventEnvelopes.Clear(); } private async ValueTask UpdateAndApplyPublished(TEventListener eventListener, long? at) diff --git a/src/Fluss/UnitOfWork/UnitOfWork.cs b/src/Fluss/UnitOfWork/UnitOfWork.cs index 8b76dc7..b65e2b6 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.cs @@ -71,4 +71,12 @@ private Guid CurrentUserId() { return _userIdProvider.Get(); } + + public void Dispose() + { + _latestVersionLoader?.Dispose(); + _publishedEventEnvelopes.Dispose(); + + GC.SuppressFinalize(this); + } } diff --git a/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs b/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs index 21869ce..aac3afe 100644 --- a/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs +++ b/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs @@ -23,7 +23,7 @@ public async ValueTask Commit(Func action) await RetryPolicy .ExecuteAsync(async () => { - var unitOfWork = serviceProvider.GetRequiredService(); + using var unitOfWork = serviceProvider.GetRequiredService(); await action(unitOfWork); await unitOfWork.CommitInternal(); }); @@ -36,7 +36,7 @@ public async ValueTask Commit(Func> action) return await RetryPolicy .ExecuteAsync(async () => { - var unitOfWork = serviceProvider.GetRequiredService(); + using var unitOfWork = serviceProvider.GetRequiredService(); var result = await action(unitOfWork); await unitOfWork.CommitInternal(); return result; diff --git a/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs b/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs index 1fb7225..e397c60 100644 --- a/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs +++ b/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs @@ -12,7 +12,7 @@ public ValueTask ConsistentVersion() } public IReadOnlyCollection ReadModels => impl.ReadModels; - public List PublishedEventEnvelopes => impl.PublishedEventEnvelopes; + public IReadOnlyCollection PublishedEventEnvelopes => impl.PublishedEventEnvelopes; public List RecordedListeners { get; } = []; @@ -100,4 +100,11 @@ public async ValueTask IsStillUpToDate(IUnitOfWork unitOfWork, long? at = return false; } } + + public void Dispose() + { + impl.Dispose(); + + GC.SuppressFinalize(this); + } } \ No newline at end of file diff --git a/src/Fluss/Validation/RootValidator.cs b/src/Fluss/Validation/RootValidator.cs index 7c2de68..f12df0f 100644 --- a/src/Fluss/Validation/RootValidator.cs +++ b/src/Fluss/Validation/RootValidator.cs @@ -1,4 +1,5 @@ using System.Reflection; +using Collections.Pooled; using Fluss.Aggregates; using Fluss.Authentication; using Fluss.Events; @@ -65,7 +66,7 @@ public async Task ValidateEvent(EventEnvelope envelope, IReadOnlyList)versionedUnitOfWork.PublishedEventEnvelopes).Add(willBePublishedEnvelope); } var type = envelope.Event.GetType();