Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ES-8, ES-9] Enable logging in ProjectionProcessor #22

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Tacta.EventStore.Test/Projector/ProjectionProcessorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.Logging;
using Moq;
using Tacta.EventStore.Domain;
using Tacta.EventStore.Projector;
Expand All @@ -28,19 +29,21 @@ public class ProjectionProcessorTest : SqlBaseTest
{
private readonly Mock<IProjection> _projectionMock;
private readonly IEventStoreRepository _eventStoreRepository;
private readonly Mock<ILogger<ProjectionProcessor>> _loggerMock;

public ProjectionProcessorTest()
{
_projectionMock = new Mock<IProjection>();
_eventStoreRepository = new EventStoreRepository(ConnectionFactory);
_loggerMock = new Mock<ILogger<ProjectionProcessor>>();
}

[Fact]
public async Task OnException_ShouldCallInitializeSequenceMethodExactlyOnce()
{
// Given
_projectionMock.Setup(x => x.Initialize()).Callback(() => throw new Exception());
var processor = new ProjectionProcessor(new List<IProjection> { _projectionMock.Object }, _eventStoreRepository);
var processor = new ProjectionProcessor(new List<IProjection> { _projectionMock.Object }, _eventStoreRepository, _loggerMock.Object);

// When
var _ = await Record.ExceptionAsync(async () => await processor.Process(100).ConfigureAwait(false));
Expand All @@ -54,7 +57,7 @@ public async Task OnTransientSqlException_ShouldCallInitializeSequenceMethodAtLe
{
// Given
_projectionMock.Setup(x => x.Initialize()).Callback(() => throw GenerateRandomTransientSqlException());
var processor = new ProjectionProcessor(new List<IProjection> { _projectionMock.Object }, _eventStoreRepository);
var processor = new ProjectionProcessor(new List<IProjection> { _projectionMock.Object }, _eventStoreRepository, _loggerMock.Object);

// When
var _ = await Record.ExceptionAsync(async () => await processor.Process().ConfigureAwait(false));
Expand Down Expand Up @@ -84,7 +87,7 @@ public async Task Rebuild_ShouldInvokeRebuildOnRepositories()
// Given
var (aggregate, events) = CreateFooAggregateWithRegisteredEvents();
await _eventStoreRepository.SaveAsync(aggregate, events);
var processor = new ProjectionProcessor(new List<IProjection> { _projectionMock.Object }, _eventStoreRepository);
var processor = new ProjectionProcessor(new List<IProjection> { _projectionMock.Object }, _eventStoreRepository, _loggerMock.Object);

// When
await processor.Rebuild();
Expand Down
13 changes: 8 additions & 5 deletions Tacta.EventStore.Test/Projector/ProjectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Moq;
using Tacta.EventStore.Domain;
using Tacta.EventStore.Projector;
Expand All @@ -17,11 +18,13 @@ public class ProjectionTest
{
private readonly Mock<IProjectionRepository> _projectionRepository;
private readonly Mock<IEventStoreRepository> _eventStoreRepository;
private readonly Mock<ILogger<ProjectionProcessor>> _logger;

public ProjectionTest()
{
_projectionRepository = new Mock<IProjectionRepository>();
_eventStoreRepository = new Mock<IEventStoreRepository>();
_logger = new Mock<ILogger<ProjectionProcessor>>();
}

[Fact]
Expand All @@ -47,7 +50,7 @@ public async Task OnUserRegistered()

// When
var userProjection = new UserProjection(_projectionRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> {userProjection}, _eventStoreRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> {userProjection}, _eventStoreRepository.Object, _logger.Object);
await processor.Process();

// Then
Expand Down Expand Up @@ -77,7 +80,7 @@ public async Task NoProjectionsAdded_ReturnsZero()

// When
var userProjection = new UserProjection(_projectionRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection>(), _eventStoreRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection>(), _eventStoreRepository.Object, _logger.Object);
await processor.Process();

// Then
Expand Down Expand Up @@ -120,7 +123,7 @@ public async Task OnUserBanned()

// When
var userProjection = new UserProjection(_projectionRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> { userProjection }, _eventStoreRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> { userProjection }, _eventStoreRepository.Object, _logger.Object);
await processor.Process();

// Then
Expand All @@ -143,7 +146,7 @@ public async Task OnRebuildWithoutProcess()
});

var userProjection = new UserProjection(_projectionRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> { userProjection }, _eventStoreRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> { userProjection }, _eventStoreRepository.Object, _logger.Object);

// When
await processor.Rebuild();
Expand Down Expand Up @@ -186,7 +189,7 @@ public async Task OnRebuildAndProcess()
});

var userProjection = new UserProjection(_projectionRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> { userProjection }, _eventStoreRepository.Object);
var processor = new ProjectionProcessor(new List<IProjection> { userProjection }, _eventStoreRepository.Object, _logger.Object);

_projectionRepository.Setup(p => p.GetSequenceAsync()).ReturnsAsync(() =>
{
Expand Down
7 changes: 5 additions & 2 deletions Tacta.EventStore.Test/Projector/ResilienceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.Logging;
using Moq;
using Tacta.EventStore.Domain;
using Tacta.EventStore.Projector;
Expand All @@ -18,11 +19,13 @@ public class ResilienceTest
private IProjection _userProjection;
private Mock<IEventStoreRepository> _eventStoreRepository;
private Mock<IProjectionRepository> _userProjectionRepository;
private readonly Mock<ILogger<ProjectionProcessor>> _logger;

public ResilienceTest()
{
SetupProjectionRepository();
SetupEventStoreRepository();
_logger = new Mock<ILogger<ProjectionProcessor>>();
}

[Fact]
Expand All @@ -33,7 +36,7 @@ public async Task OnTransientException_ShouldBeResilient()
// Given
_userProjection = new UserProjection(_userProjectionRepository.Object);
var projections = new List<IProjection> { _userProjection };
var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object);
var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object, _logger.Object);

// When
var numberOfProcessedEvents = await processor.Process().ConfigureAwait(false);
Expand All @@ -51,7 +54,7 @@ public async Task WhileRebuildInProgress_ProcessShouldWait()
// Given
_userProjection = new VerySlowUserProjection(_userProjectionRepository.Object);
var projections = new List<IProjection> { _userProjection };
var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object);
var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object, _logger.Object);

// When
var processTask1 = processor.Process().ConfigureAwait(false);
Expand Down
17 changes: 14 additions & 3 deletions Tacta.EventStore/Projector/ProjectionProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Polly.Retry;
using Tacta.EventStore.Domain;
using Tacta.EventStore.Repository;
Expand All @@ -15,17 +15,28 @@ public class ProjectionProcessor : IProjectionProcessor
{
private readonly IEnumerable<IProjection> _projections;
private readonly IEventStoreRepository _eventStoreRepository;
private readonly ILogger<ProjectionProcessor> _logger;
private readonly AsyncRetryPolicy _retryPolicy;
private bool _isInitialized;
private long _pivot;
private readonly SemaphoreSlim _processingSemaphore = new SemaphoreSlim(1, 1);

public ProjectionProcessor(IEnumerable<IProjection> projections, IEventStoreRepository eventStoreRepository)
public ProjectionProcessor(
IEnumerable<IProjection> projections,
IEventStoreRepository eventStoreRepository)
{
_projections = projections;
_eventStoreRepository = eventStoreRepository;
_retryPolicy = new SqlServerResiliencePolicyBuilder().WithDefaults().BuildTransientErrorRetryPolicy();
}

public ProjectionProcessor(
IEnumerable<IProjection> projections,
IEventStoreRepository eventStoreRepository,
ILogger<ProjectionProcessor> logger) : this(projections, eventStoreRepository)
{
_logger = logger;
}

public async Task<string> Status(string service, int refreshRate = 5)
{
Expand Down Expand Up @@ -84,7 +95,7 @@ await _retryPolicy.ExecuteAsync(async () =>
}
catch (Exception ex)
{
Debug.WriteLine("Process exception {0}", ex);
_logger?.LogError(ex, "Process exception");
throw;
}
finally
Expand Down
7 changes: 4 additions & 3 deletions Tacta.EventStore/Tacta.EventStore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
<PackageIcon></PackageIcon>
<PackageProjectUrl>https://tacta.io/</PackageProjectUrl>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<Version>1.6.2</Version>
<PackageVersion>1.6.2</PackageVersion>
<Version>1.6.3</Version>
<PackageVersion>1.6.3</PackageVersion>
<Title>Tacta EventStore Library</Title>
<PackageReleaseNotes>Sequence type changed from int to long</PackageReleaseNotes>
<PackageReleaseNotes>Projection processor logs processing exception</PackageReleaseNotes>
<Configurations>Debug;Release</Configurations>
<Platforms>AnyCPU</Platforms>
</PropertyGroup>
Expand All @@ -24,6 +24,7 @@
<PackageReference Include="Dapper" Version="2.0.143" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.1.4" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Polly" Version="7.2.4" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.5" />
Expand Down
Loading