Skip to content

Commit

Permalink
Add Platform Logging (#5)
Browse files Browse the repository at this point in the history
Co-authored-by: Artem Leshchev <[email protected]>
  • Loading branch information
aleshchev and Artem Leshchev authored May 9, 2024
1 parent ce1f53c commit 338dac3
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 53 deletions.
10 changes: 10 additions & 0 deletions src/Bss.Platform.Logging/Bss.Platform.Logging.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Luxoft.Bss.Platform.Logging</PackageId>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.ApplicationInsights" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions"/>
<PackageReference Include="Serilog.AspNetCore" />
</ItemGroup>
</Project>
22 changes: 22 additions & 0 deletions src/Bss.Platform.Logging/DependencyInjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Bss.Platform.Logging.Sinks;

using Microsoft.ApplicationInsights;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

using Serilog;

namespace Bss.Platform.Logging;

public static class DependencyInjection
{
public static void AddPlatformLogging(this IHostBuilder builder) =>
builder
.UseSerilog(
(context, services, configuration) =>
configuration
.ReadFrom.Configuration(context.Configuration)
.ReadFrom.Services(services)
.WriteTo.Sink(new ApplicationInsightsSink(services.GetService<TelemetryClient>())),
true);
}
19 changes: 19 additions & 0 deletions src/Bss.Platform.Logging/Sinks/ApplicationInsightsSink.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Microsoft.ApplicationInsights;

using Serilog.Core;
using Serilog.Events;

namespace Bss.Platform.Logging.Sinks;

public class ApplicationInsightsSink(TelemetryClient? telemetryClient) : ILogEventSink
{
public void Emit(LogEvent logEvent)
{
if (telemetryClient is null || logEvent.Exception == null)
{
return;
}

telemetryClient.TrackException(logEvent.Exception, logEvent.Properties.ToDictionary(x => x.Key, x => x.Value.ToString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

namespace Bss.Platform.RabbitMq.Consumer.Services;

internal record ConcurrentConsumer(IRabbitMqMessageReader MessageReader) : IRabbitMqConsumer
internal class ConcurrentConsumer(IRabbitMqMessageReader messageReader) : IRabbitMqConsumer
{
public async Task ConsumeAsync(IModel channel, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
await this.MessageReader.ReadAsync(channel, token);
await messageReader.ReadAsync(channel, token);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

namespace Bss.Platform.RabbitMq.Consumer.Services;

internal record ConsumerInitializer(IOptions<RabbitMqConsumerSettings> Options) : IRabbitMqInitializer
internal class ConsumerInitializer(IOptions<RabbitMqConsumerSettings> options) : IRabbitMqInitializer
{
public void Initialize(IModel model)
{
var consumerSettings = this.Options.Value;
var consumerSettings = options.Value;

model.ExchangeDeclare(consumerSettings.Exchange, ExchangeType.Topic, true);
model.ExchangeDeclare(consumerSettings.DeadLetterExchange, ExchangeType.Fanout, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@

namespace Bss.Platform.RabbitMq.Consumer.Services;

internal record DeadLetterProcessor(
IRabbitMqClient Client,
ILogger<DeadLetterProcessor> Logger,
IOptions<RabbitMqConsumerSettings> ConsumerSettings)
internal class DeadLetterProcessor(
IRabbitMqClient client,
ILogger<DeadLetterProcessor> logger,
IOptions<RabbitMqConsumerSettings> consumerSettings)
: IDeadLetterProcessor
{
public async Task<DeadLetterDecision> ProcessAsync(string message, string routingKey, Exception? exception, CancellationToken token)
{
try
{
using var connection = await this.Client.TryConnectAsync(this.ConsumerSettings.Value.ConnectionAttemptCount, token);
using var connection = await client.TryConnectAsync(consumerSettings.Value.ConnectionAttemptCount, token);
if (connection == null)
{
throw new Exception("Failed to open connection");
Expand All @@ -35,17 +35,17 @@ public async Task<DeadLetterDecision> ProcessAsync(string message, string routin
properties.Headers = new Dictionary<string, object>
{
{ "routingKey", routingKey },
{ "queue", this.ConsumerSettings.Value.Queue },
{ "queue", consumerSettings.Value.Queue },
{ "error", exception?.GetBaseException().Message ?? "unknown exception" },
{ "stacktrace", exception?.StackTrace ?? "missing stacktrace" }
};

channel.BasicPublish(this.ConsumerSettings.Value.DeadLetterExchange, string.Empty, properties, Encoding.UTF8.GetBytes(message));
channel.BasicPublish(consumerSettings.Value.DeadLetterExchange, string.Empty, properties, Encoding.UTF8.GetBytes(message));
return DeadLetterDecision.RemoveFromQueue;
}
catch (Exception e)
{
this.Logger.LogError(e, "Failed to process dead letter with routing key '{RoutingKey}'", routingKey);
logger.LogError(e, "Failed to process dead letter with routing key '{RoutingKey}'", routingKey);
return DeadLetterDecision.Requeue;
}
}
Expand Down
26 changes: 13 additions & 13 deletions src/Bss.Platform.RabbitMq.Consumer/Services/MessageReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@

namespace Bss.Platform.RabbitMq.Consumer.Services;

internal record MessageReader(
IRabbitMqMessageProcessor MessageProcessor,
IDeadLetterProcessor DeadLetterProcessor,
ILogger<MessageReader> Logger,
IOptions<RabbitMqConsumerSettings> ConsumerSettings)
internal class MessageReader(
IRabbitMqMessageProcessor messageProcessor,
IDeadLetterProcessor deadLetterProcessor,
ILogger<MessageReader> logger,
IOptions<RabbitMqConsumerSettings> consumerSettings)
: IRabbitMqMessageReader
{
public async Task ReadAsync(IModel channel, CancellationToken token)
{
var result = channel.BasicGet(this.ConsumerSettings.Value.Queue, false);
var result = channel.BasicGet(consumerSettings.Value.Queue, false);
if (result is null)
{
await Delay(this.ConsumerSettings.Value.ReceiveMessageDelayMilliseconds, token);
await Delay(consumerSettings.Value.ReceiveMessageDelayMilliseconds, token);
return;
}

Expand All @@ -37,10 +37,10 @@ private async Task ProcessAsync(BasicGetResult message, IModel channel, Cancella
var result = await Policy
.Handle<Exception>()
.WaitAndRetryAsync(
this.ConsumerSettings.Value.FailedMessageRetryCount,
_ => TimeSpan.FromMilliseconds(this.ConsumerSettings.Value.RejectMessageDelayMilliseconds))
consumerSettings.Value.FailedMessageRetryCount,
_ => TimeSpan.FromMilliseconds(consumerSettings.Value.RejectMessageDelayMilliseconds))
.ExecuteAndCaptureAsync(
innerToken => this.MessageProcessor.ProcessAsync(
innerToken => messageProcessor.ProcessAsync(
message.BasicProperties,
message.RoutingKey,
GetMessageBody(message),
Expand All @@ -62,7 +62,7 @@ private async Task HandleProcessResultAsync(BasicGetResult message, IModel chann
}
else
{
var deadLetteringResult = await this.DeadLetterProcessor.ProcessAsync(
var deadLetteringResult = await deadLetterProcessor.ProcessAsync(
GetMessageBody(message),
message.RoutingKey,
result.FinalException,
Expand All @@ -79,9 +79,9 @@ private async Task HandleProcessResultAsync(BasicGetResult message, IModel chann
}
catch (Exception ex)
{
this.Logger.LogError(ex, "Failed to deadLetter message with routing key {RoutingKey}", message.RoutingKey);
logger.LogError(ex, "Failed to deadLetter message with routing key {RoutingKey}", message.RoutingKey);

await Delay(this.ConsumerSettings.Value.ReceiveMessageDelayMilliseconds, token);
await Delay(consumerSettings.Value.ReceiveMessageDelayMilliseconds, token);

channel.BasicNack(message.DeliveryTag, false, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Bss.Platform.RabbitMq.Consumer.Services;

internal record MsSqlLockService(IOptions<RabbitMqConsumerSettings> ConsumerSettings) : IRabbitMqConsumerLockService
internal class MsSqlLockService(IOptions<RabbitMqConsumerSettings> consumerSettings) : IRabbitMqConsumerLockService
{
public bool TryObtainLock(SqlConnection connection)
{
Expand Down Expand Up @@ -51,5 +51,5 @@ public void TryReleaseLock(SqlConnection connection)
}
}

private string GetLockName() => $"{this.ConsumerSettings.Value.Queue}_Consumer_Lock";
private string GetLockName() => $"{consumerSettings.Value.Queue}_Consumer_Lock";
}
34 changes: 17 additions & 17 deletions src/Bss.Platform.RabbitMq.Consumer/Services/SynchronizedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

namespace Bss.Platform.RabbitMq.Consumer.Services;

internal record SynchronizedConsumer(
SqlConnectionStringProvider ConnectionStringProvider,
IRabbitMqConsumerLockService LockService,
ILogger<SynchronizedConsumer> Logger,
IRabbitMqMessageReader MessageReader,
IOptions<RabbitMqConsumerSettings> ConsumerSettings)
internal class SynchronizedConsumer(
SqlConnectionStringProvider connectionStringProvider,
IRabbitMqConsumerLockService lockService,
ILogger<SynchronizedConsumer> logger,
IRabbitMqMessageReader messageReader,
IOptions<RabbitMqConsumerSettings> consumerSettings)
: IRabbitMqConsumer
{
private SqlConnection? connection;
Expand All @@ -29,19 +29,19 @@ public async Task ConsumeAsync(IModel channel, CancellationToken token)
{
if (await this.GetLock(token))
{
await this.MessageReader.ReadAsync(channel, token);
await messageReader.ReadAsync(channel, token);
}
else
{
await this.CloseConnectionAsync();
await Delay(this.ConsumerSettings.Value.InactiveConsumerSleepMilliseconds, token);
await Delay(consumerSettings.Value.InactiveConsumerSleepMilliseconds, token);
}
}
catch (Exception e)
{
this.Logger.LogError(e, "Consuming error");
logger.LogError(e, "Consuming error");
await this.CloseConnectionAsync();
await Delay(this.ConsumerSettings.Value.InactiveConsumerSleepMilliseconds, token);
await Delay(consumerSettings.Value.InactiveConsumerSleepMilliseconds, token);
}
}
}
Expand All @@ -50,7 +50,7 @@ public void Dispose()
{
if (this.connection is not null)
{
this.LockService.TryReleaseLock(this.connection);
lockService.TryReleaseLock(this.connection);
}

this.connection?.Close();
Expand All @@ -59,19 +59,19 @@ public void Dispose()

private async Task<bool> GetLock(CancellationToken token)
{
if (this.lockObtainedDate?.AddMilliseconds(this.ConsumerSettings.Value.ActiveConsumerRefreshMilliseconds) >= DateTime.Now)
if (this.lockObtainedDate?.AddMilliseconds(consumerSettings.Value.ActiveConsumerRefreshMilliseconds) >= DateTime.Now)
{
return true;
}

await this.OpenConnectionAsync(token);
if (!this.LockService.TryObtainLock(this.connection!))
if (!lockService.TryObtainLock(this.connection!))
{
return false;
}

this.lockObtainedDate = DateTime.Now;
this.Logger.LogDebug("Current consumer is active");
logger.LogDebug("Current consumer is active");

return true;
}
Expand All @@ -80,7 +80,7 @@ private async Task OpenConnectionAsync(CancellationToken token)
{
await this.CloseConnectionAsync();

this.connection = new SqlConnection(this.ConnectionStringProvider.ConnectionString);
this.connection = new SqlConnection(connectionStringProvider.ConnectionString);
await this.connection.OpenAsync(token);
}

Expand All @@ -90,13 +90,13 @@ private async Task CloseConnectionAsync()
{
if (this.connection is not null)
{
this.LockService.TryReleaseLock(this.connection);
lockService.TryReleaseLock(this.connection);
await this.connection!.CloseAsync();
}
}
catch (Exception e)
{
this.Logger.LogError(e, "Failed to close connection");
logger.LogError(e, "Failed to close connection");
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Bss.Platform.RabbitMq/Services/RabbitMqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@

namespace Bss.Platform.RabbitMq.Services;

public record RabbitMqClient(IOptions<RabbitMqServerSettings> Options, ILogger<RabbitMqClient> Logger) : IRabbitMqClient
public class RabbitMqClient(IOptions<RabbitMqServerSettings> options, ILogger<RabbitMqClient> logger) : IRabbitMqClient
{
private const int RetryConnectDelay = 5000;

public Task<IConnection?> TryConnectAsync(int? attempts, CancellationToken token = default)
{
var serverSettings = this.Options.Value;
var serverSettings = options.Value;
var factory = new ConnectionFactory
{
HostName = serverSettings.Host,
Expand Down Expand Up @@ -58,5 +58,5 @@ private AsyncRetryPolicy CreateRetryPolicy(int? attempts)
(ex, _) => this.LogConnectionError(ex));
}

private void LogConnectionError(Exception exception) => this.Logger.LogError(exception, "Could not connect to RabbitMQ server");
private void LogConnectionError(Exception exception) => logger.LogError(exception, "Could not connect to RabbitMQ server");
}
9 changes: 9 additions & 0 deletions src/Bss.Platform.sln
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{C53E37ED
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tests.Unit", "Tests.Unit\Tests.Unit.csproj", "{16BA5D44-4C8F-4EF6-9633-C4502ED91A22}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Logging", "Logging", "{D07584C3-F6D5-472C-8515-7D38F4C54F11}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Bss.Platform.Logging", "Bss.Platform.Logging\Bss.Platform.Logging.csproj", "{2417B26C-C12D-4E63-8996-395DD78F81A1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -24,6 +28,7 @@ Global
{EF6932F4-67BD-422D-B2D8-0DA4CE98181A} = {0F54786D-AB46-46AC-88DF-BEB789A62C1F}
{5B028860-35C4-43CF-AA63-830A66D83E46} = {1E012CC2-9AD1-491A-9D6B-8E3215347B97}
{16BA5D44-4C8F-4EF6-9633-C4502ED91A22} = {C53E37ED-7EFD-4383-9D89-A3CD4B8ED2AB}
{2417B26C-C12D-4E63-8996-395DD78F81A1} = {D07584C3-F6D5-472C-8515-7D38F4C54F11}
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{860BFBD8-26EA-44F9-980E-21B828FC8F72}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
Expand All @@ -42,5 +47,9 @@ Global
{16BA5D44-4C8F-4EF6-9633-C4502ED91A22}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16BA5D44-4C8F-4EF6-9633-C4502ED91A22}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16BA5D44-4C8F-4EF6-9633-C4502ED91A22}.Release|Any CPU.Build.0 = Release|Any CPU
{2417B26C-C12D-4E63-8996-395DD78F81A1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2417B26C-C12D-4E63-8996-395DD78F81A1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2417B26C-C12D-4E63-8996-395DD78F81A1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2417B26C-C12D-4E63-8996-395DD78F81A1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
8 changes: 5 additions & 3 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="Microsoft.ApplicationInsights" Version="2.22.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.2.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
<PackageVersion Include="NHibernate" Version="5.5.0" />
<PackageVersion Include="NHibernate" Version="5.5.1" />
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
<PackageVersion Include="Polly" Version="8.3.1" />
<PackageVersion Include="Serilog.AspNetCore" Version="8.0.1" />
</ItemGroup>
<ItemGroup>
<!--Internal needs-->
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageVersion Include="xunit" Version="2.7.1" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.8" />
<PackageVersion Include="xunit" Version="2.8.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.0" />
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
</ItemGroup>
</Project>
6 changes: 3 additions & 3 deletions src/__SolutionItems/CommonAssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
[assembly: AssemblyCompany("Luxoft")]
[assembly: AssemblyCopyright("Copyright © Luxoft 2024")]

[assembly: AssemblyVersion("1.0.1.0")]
[assembly: AssemblyFileVersion("1.0.1.0")]
[assembly: AssemblyInformationalVersion("1.0.1.0")]
[assembly: AssemblyVersion("1.1.0.0")]
[assembly: AssemblyFileVersion("1.1.0.0")]
[assembly: AssemblyInformationalVersion("1.1.0.0")]

#if DEBUG
[assembly: AssemblyConfiguration("Debug")]
Expand Down

0 comments on commit 338dac3

Please sign in to comment.