Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ThorstenThiel committed Sep 11, 2024
1 parent 5ff9b8c commit 9068d96
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 21 deletions.
27 changes: 18 additions & 9 deletions src/Benchmark/Bench.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet_GitCompare;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Jobs;
using Fluss;
Expand All @@ -9,8 +10,10 @@

namespace Benchmark;

[SimpleJob(baseline: true)]
[SimpleJob(RuntimeMoniker.Net90)]
[SimpleJob(id: "with_pooled")]
[GitJob(id: "with_list")]
[GitJob("cd630d7", baseline: true, id: "original")]
//[SimpleJob(RuntimeMoniker.Net90)]
[MemoryDiagnoser]
public class Bench
{
Expand Down Expand Up @@ -40,7 +43,7 @@ await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => {
}
});

var unitOfWork = _sp.GetSystemUserUnitOfWork();
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsEqualReadModel, int>(1);
var readModel2 = await unitOfWork.GetReadModel<EventsEqualReadModel, int>(2);
sum += readModel1.GotEvents + readModel2.GotEvents;
Expand All @@ -67,27 +70,27 @@ public void SetupHeavyRead()
}


[Benchmark]
//[Benchmark]
public async Task<int> PublishEventsAndReadReadHeavySingleReadModel()
{
var sum = 0;
for (var j = 0; j < 50000; j++)
{
var unitOfWork = _sp.GetSystemUserUnitOfWork();
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsModEqualReadModel, int>(3);
sum += readModel1.GotEvents;
}

return sum;
}

[Benchmark]
//[Benchmark]
public async Task<int> PublishEventsAndReadReadHeavyMultipleReadModel()
{
var sum = 0;
for (var j = 1; j < 5000; j++)
{
var unitOfWork = _sp.GetSystemUserUnitOfWork();
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsModEqualReadModel, int>(j);
sum += readModel1.GotEvents;
}
Expand Down Expand Up @@ -136,4 +139,10 @@ protected override EventsModEqualReadModel When(EventEnvelope envelope)
_ => this,
};
}
}
}

/*
00:00:00.0000184
00:00:00.0086291
00:00:00.6152374
*/
18 changes: 16 additions & 2 deletions src/Benchmark/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
using Benchmark;
using System.Diagnostics;
using Benchmark;
using BenchmarkDotNet.Running;

BenchmarkRunner.Run<Bench>();
//BenchmarkRunner.Run<Bench>();

for (var j = 0; j < 10; j++)
{
var watch = Stopwatch.StartNew();
var b = new Bench();
Console.WriteLine(watch.Elapsed);
watch.Restart();
b.Setup();
Console.WriteLine(watch.Elapsed);
watch.Restart();
await b.PublishEventsAndReadMixedReadWrite();
Console.WriteLine(watch.Elapsed);
}
3 changes: 3 additions & 0 deletions src/Fluss.sln.DotSettings.user
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AAsyncValueTaskMethodBuilderT_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FAppData_003FLocal_003FSymbols_003Fsrc_003Fdotnet_003Fruntime_003F3b8b000a0e115700b18265d8ec8c6307056dc94d_003Fsrc_003Flibraries_003FSystem_002EPrivate_002ECoreLib_003Fsrc_003FSystem_003FRuntime_003FCompilerServices_003FAsyncValueTaskMethodBuilderT_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AIExecutionResult_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FAppData_003FRoaming_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F961f60b56be69f9ad0a2b9cdb6837ebe1b7782fbc82c14f7dbed3f0b9c1e67d_003FIExecutionResult_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/Environment/Hierarchy/Build/BuildTool/CustomBuildToolPath/@EntryValue">/home/enterprize1/Downloads/dotnet8/sdk/9.0.100-preview.7.24407.12/MSBuild.dll</s:String>
<s:String x:Key="/Default/Environment/Hierarchy/Build/BuildTool/DotNetCliExePath/@EntryValue">/home/enterprize1/Downloads/dotnet8/dotnet</s:String>
<s:Boolean x:Key="/Default/Environment/Hierarchy/Build/BuildTool/RecentDotNetCliExePaths/=_002Fhome_002Fenterprize1_002FDownloads_002Fdotnet8_002Fdotnet/@EntryIndexedValue">True</s:Boolean>
<s:String x:Key="/Default/Environment/Highlighting/HighlightingSourceSnapshotLocation/@EntryValue">C:\Users\Enterprize1\AppData\Local\JetBrains\Rider2024.2\resharper-host\temp\Rider\vAny\CoverageData\_Fluss.-842573491\Snapshot\snapshot.utdcvr</s:String>

<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=69a3202a_002Daecd_002D488a_002Da2d1_002Df3588261e611/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="Session" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;&#xD;
Expand Down
1 change: 1 addition & 0 deletions src/Fluss/Fluss.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" Version="1.0.82" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
Expand Down
4 changes: 2 additions & 2 deletions src/Fluss/UnitOfWork/IUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace Fluss;

public interface IUnitOfWork
public interface IUnitOfWork : IDisposable
{
ValueTask<long> ConsistentVersion();
IReadOnlyCollection<EventListener> ReadModels { get; }
List<EventEnvelope> PublishedEventEnvelopes { get; }
IReadOnlyCollection<EventEnvelope> PublishedEventEnvelopes { get; }

ValueTask<IReadModel> GetReadModel(Type tReadModel, object? key, long? at = null);

Expand Down
10 changes: 6 additions & 4 deletions src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using Collections.Pooled;
using Fluss.Aggregates;
using Fluss.Events;
// ReSharper disable LoopCanBeConvertedToQuery
Expand All @@ -7,7 +8,8 @@ namespace Fluss;

public partial class UnitOfWork
{
public List<EventEnvelope> PublishedEventEnvelopes { get; } = new();
internal readonly PooledList<EventEnvelope> _publishedEventEnvelopes = [];
public IReadOnlyCollection<EventEnvelope> PublishedEventEnvelopes => _publishedEventEnvelopes;

public async ValueTask<TAggregate> GetAggregate<TAggregate>() where TAggregate : AggregateRoot, new()
{
Expand Down Expand Up @@ -67,8 +69,8 @@ public async ValueTask Publish(Event @event, AggregateRoot? aggregate = null)
}

await ValidateEventResult(eventEnvelope, aggregate);

PublishedEventEnvelopes.Add(eventEnvelope);
_publishedEventEnvelopes.Add(eventEnvelope);
}

private async ValueTask ValidateEventResult<T>(EventEnvelope envelope, T? aggregate) where T : AggregateRoot
Expand Down Expand Up @@ -96,7 +98,7 @@ internal async ValueTask CommitInternal()

await _eventRepository.Publish(PublishedEventEnvelopes);
_consistentVersion += PublishedEventEnvelopes.Count;
PublishedEventEnvelopes.Clear();
_publishedEventEnvelopes.Clear();
}

private async ValueTask<TEventListener> UpdateAndApplyPublished<TEventListener>(TEventListener eventListener, long? at)
Expand Down
8 changes: 8 additions & 0 deletions src/Fluss/UnitOfWork/UnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ private Guid CurrentUserId()
{
return _userIdProvider.Get();
}

public void Dispose()
{
_latestVersionLoader?.Dispose();
_publishedEventEnvelopes.Dispose();

GC.SuppressFinalize(this);
}
}
4 changes: 2 additions & 2 deletions src/Fluss/UnitOfWork/UnitOfWorkFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async ValueTask Commit(Func<IWriteUnitOfWork, ValueTask> action)
await RetryPolicy
.ExecuteAsync(async () =>
{
var unitOfWork = serviceProvider.GetRequiredService<UnitOfWork>();
using var unitOfWork = serviceProvider.GetRequiredService<UnitOfWork>();
await action(unitOfWork);
await unitOfWork.CommitInternal();
});
Expand All @@ -36,7 +36,7 @@ public async ValueTask<T> Commit<T>(Func<IWriteUnitOfWork, ValueTask<T>> action)
return await RetryPolicy
.ExecuteAsync(async () =>
{
var unitOfWork = serviceProvider.GetRequiredService<UnitOfWork>();
using var unitOfWork = serviceProvider.GetRequiredService<UnitOfWork>();
var result = await action(unitOfWork);
await unitOfWork.CommitInternal();
return result;
Expand Down
9 changes: 8 additions & 1 deletion src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public ValueTask<long> ConsistentVersion()
}

public IReadOnlyCollection<EventListener> ReadModels => impl.ReadModels;
public List<EventEnvelope> PublishedEventEnvelopes => impl.PublishedEventEnvelopes;
public IReadOnlyCollection<EventEnvelope> PublishedEventEnvelopes => impl.PublishedEventEnvelopes;

public List<EventListener> RecordedListeners { get; } = [];

Expand Down Expand Up @@ -100,4 +100,11 @@ public async ValueTask<bool> IsStillUpToDate(IUnitOfWork unitOfWork, long? at =
return false;
}
}

public void Dispose()
{
impl.Dispose();

GC.SuppressFinalize(this);
}
}
3 changes: 2 additions & 1 deletion src/Fluss/Validation/RootValidator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Reflection;
using Collections.Pooled;
using Fluss.Aggregates;
using Fluss.Authentication;
using Fluss.Events;
Expand Down Expand Up @@ -65,7 +66,7 @@ public async Task ValidateEvent(EventEnvelope envelope, IReadOnlyList<EventEnvel
var versionedUnitOfWork = unitOfWork.WithPrefilledVersion(envelope.Version - willBePublishedEnvelopes.Count - 1);
foreach (var willBePublishedEnvelope in willBePublishedEnvelopes)
{
versionedUnitOfWork.PublishedEventEnvelopes.Add(willBePublishedEnvelope);
((PooledList<EventEnvelope>)versionedUnitOfWork.PublishedEventEnvelopes).Add(willBePublishedEnvelope);
}

var type = envelope.Event.GetType();
Expand Down

0 comments on commit 9068d96

Please sign in to comment.