diff --git a/Tacta.EventStore.Test/Projector/ProjectionProcessorTest.cs b/Tacta.EventStore.Test/Projector/ProjectionProcessorTest.cs index 14a3c6a..e5766c5 100644 --- a/Tacta.EventStore.Test/Projector/ProjectionProcessorTest.cs +++ b/Tacta.EventStore.Test/Projector/ProjectionProcessorTest.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using FluentAssertions; +using Microsoft.Extensions.Logging; using Moq; using Tacta.EventStore.Domain; using Tacta.EventStore.Projector; @@ -28,11 +29,13 @@ public class ProjectionProcessorTest : SqlBaseTest { private readonly Mock _projectionMock; private readonly IEventStoreRepository _eventStoreRepository; + private readonly Mock> _loggerMock; public ProjectionProcessorTest() { _projectionMock = new Mock(); _eventStoreRepository = new EventStoreRepository(ConnectionFactory); + _loggerMock = new Mock>(); } [Fact] @@ -40,7 +43,7 @@ public async Task OnException_ShouldCallInitializeSequenceMethodExactlyOnce() { // Given _projectionMock.Setup(x => x.Initialize()).Callback(() => throw new Exception()); - var processor = new ProjectionProcessor(new List { _projectionMock.Object }, _eventStoreRepository); + var processor = new ProjectionProcessor(new List { _projectionMock.Object }, _eventStoreRepository, _loggerMock.Object); // When var _ = await Record.ExceptionAsync(async () => await processor.Process(100).ConfigureAwait(false)); @@ -54,7 +57,7 @@ public async Task OnTransientSqlException_ShouldCallInitializeSequenceMethodAtLe { // Given _projectionMock.Setup(x => x.Initialize()).Callback(() => throw GenerateRandomTransientSqlException()); - var processor = new ProjectionProcessor(new List { _projectionMock.Object }, _eventStoreRepository); + var processor = new ProjectionProcessor(new List { _projectionMock.Object }, _eventStoreRepository, _loggerMock.Object); // When var _ = await Record.ExceptionAsync(async () => await processor.Process().ConfigureAwait(false)); @@ -84,7 +87,7 @@ public async Task Rebuild_ShouldInvokeRebuildOnRepositories() // Given var (aggregate, events) = CreateFooAggregateWithRegisteredEvents(); await _eventStoreRepository.SaveAsync(aggregate, events); - var processor = new ProjectionProcessor(new List { _projectionMock.Object }, _eventStoreRepository); + var processor = new ProjectionProcessor(new List { _projectionMock.Object }, _eventStoreRepository, _loggerMock.Object); // When await processor.Rebuild(); diff --git a/Tacta.EventStore.Test/Projector/ProjectionTest.cs b/Tacta.EventStore.Test/Projector/ProjectionTest.cs index 9b72a40..3b08712 100644 --- a/Tacta.EventStore.Test/Projector/ProjectionTest.cs +++ b/Tacta.EventStore.Test/Projector/ProjectionTest.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; using Moq; using Tacta.EventStore.Domain; using Tacta.EventStore.Projector; @@ -17,11 +18,13 @@ public class ProjectionTest { private readonly Mock _projectionRepository; private readonly Mock _eventStoreRepository; + private readonly Mock> _logger; public ProjectionTest() { _projectionRepository = new Mock(); _eventStoreRepository = new Mock(); + _logger = new Mock>(); } [Fact] @@ -47,7 +50,7 @@ public async Task OnUserRegistered() // When var userProjection = new UserProjection(_projectionRepository.Object); - var processor = new ProjectionProcessor(new List {userProjection}, _eventStoreRepository.Object); + var processor = new ProjectionProcessor(new List {userProjection}, _eventStoreRepository.Object, _logger.Object); await processor.Process(); // Then @@ -77,7 +80,7 @@ public async Task NoProjectionsAdded_ReturnsZero() // When var userProjection = new UserProjection(_projectionRepository.Object); - var processor = new ProjectionProcessor(new List(), _eventStoreRepository.Object); + var processor = new ProjectionProcessor(new List(), _eventStoreRepository.Object, _logger.Object); await processor.Process(); // Then @@ -120,7 +123,7 @@ public async Task OnUserBanned() // When var userProjection = new UserProjection(_projectionRepository.Object); - var processor = new ProjectionProcessor(new List { userProjection }, _eventStoreRepository.Object); + var processor = new ProjectionProcessor(new List { userProjection }, _eventStoreRepository.Object, _logger.Object); await processor.Process(); // Then @@ -143,7 +146,7 @@ public async Task OnRebuildWithoutProcess() }); var userProjection = new UserProjection(_projectionRepository.Object); - var processor = new ProjectionProcessor(new List { userProjection }, _eventStoreRepository.Object); + var processor = new ProjectionProcessor(new List { userProjection }, _eventStoreRepository.Object, _logger.Object); // When await processor.Rebuild(); @@ -186,7 +189,7 @@ public async Task OnRebuildAndProcess() }); var userProjection = new UserProjection(_projectionRepository.Object); - var processor = new ProjectionProcessor(new List { userProjection }, _eventStoreRepository.Object); + var processor = new ProjectionProcessor(new List { userProjection }, _eventStoreRepository.Object, _logger.Object); _projectionRepository.Setup(p => p.GetSequenceAsync()).ReturnsAsync(() => { diff --git a/Tacta.EventStore.Test/Projector/ResilienceTest.cs b/Tacta.EventStore.Test/Projector/ResilienceTest.cs index ceb97b3..e82084b 100644 --- a/Tacta.EventStore.Test/Projector/ResilienceTest.cs +++ b/Tacta.EventStore.Test/Projector/ResilienceTest.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Tasks; using FluentAssertions; +using Microsoft.Extensions.Logging; using Moq; using Tacta.EventStore.Domain; using Tacta.EventStore.Projector; @@ -18,11 +19,13 @@ public class ResilienceTest private IProjection _userProjection; private Mock _eventStoreRepository; private Mock _userProjectionRepository; + private readonly Mock> _logger; public ResilienceTest() { SetupProjectionRepository(); SetupEventStoreRepository(); + _logger = new Mock>(); } [Fact] @@ -33,7 +36,7 @@ public async Task OnTransientException_ShouldBeResilient() // Given _userProjection = new UserProjection(_userProjectionRepository.Object); var projections = new List { _userProjection }; - var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object); + var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object, _logger.Object); // When var numberOfProcessedEvents = await processor.Process().ConfigureAwait(false); @@ -51,7 +54,7 @@ public async Task WhileRebuildInProgress_ProcessShouldWait() // Given _userProjection = new VerySlowUserProjection(_userProjectionRepository.Object); var projections = new List { _userProjection }; - var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object); + var processor = new ProjectionProcessor(projections, _eventStoreRepository.Object, _logger.Object); // When var processTask1 = processor.Process().ConfigureAwait(false); diff --git a/Tacta.EventStore/Projector/ProjectionProcessor.cs b/Tacta.EventStore/Projector/ProjectionProcessor.cs index e8bec2f..267af9d 100644 --- a/Tacta.EventStore/Projector/ProjectionProcessor.cs +++ b/Tacta.EventStore/Projector/ProjectionProcessor.cs @@ -1,10 +1,10 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; using Polly.Retry; using Tacta.EventStore.Domain; using Tacta.EventStore.Repository; @@ -15,17 +15,28 @@ public class ProjectionProcessor : IProjectionProcessor { private readonly IEnumerable _projections; private readonly IEventStoreRepository _eventStoreRepository; + private readonly ILogger _logger; private readonly AsyncRetryPolicy _retryPolicy; private bool _isInitialized; private long _pivot; private readonly SemaphoreSlim _processingSemaphore = new SemaphoreSlim(1, 1); - public ProjectionProcessor(IEnumerable projections, IEventStoreRepository eventStoreRepository) + public ProjectionProcessor( + IEnumerable projections, + IEventStoreRepository eventStoreRepository) { _projections = projections; _eventStoreRepository = eventStoreRepository; _retryPolicy = new SqlServerResiliencePolicyBuilder().WithDefaults().BuildTransientErrorRetryPolicy(); } + + public ProjectionProcessor( + IEnumerable projections, + IEventStoreRepository eventStoreRepository, + ILogger logger) : this(projections, eventStoreRepository) + { + _logger = logger; + } public async Task Status(string service, int refreshRate = 5) { @@ -84,7 +95,7 @@ await _retryPolicy.ExecuteAsync(async () => } catch (Exception ex) { - Debug.WriteLine("Process exception {0}", ex); + _logger?.LogError(ex, "Process exception"); throw; } finally diff --git a/Tacta.EventStore/Tacta.EventStore.csproj b/Tacta.EventStore/Tacta.EventStore.csproj index 0217ec7..d43a385 100644 --- a/Tacta.EventStore/Tacta.EventStore.csproj +++ b/Tacta.EventStore/Tacta.EventStore.csproj @@ -12,10 +12,10 @@ https://tacta.io/ Apache-2.0 - 1.6.2 - 1.6.2 + 1.6.3 + 1.6.3 Tacta EventStore Library - Sequence type changed from int to long + Projection processor logs processing exception Debug;Release AnyCPU @@ -24,6 +24,7 @@ +