Skip to content

Commit

Permalink
refactor(kafka): use GetCertPath from helper
Browse files Browse the repository at this point in the history
  • Loading branch information
pogromistik committed Jun 19, 2024
1 parent 47acf3b commit 71d73ae
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/Sitko.Core.Kafka/KafkaConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using KafkaFlow;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers.DistributionStrategies;
using Sitko.Core.App.Helpers;
using SecurityProtocol = KafkaFlow.Configuration.SecurityProtocol;

namespace Sitko.Core.Kafka;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void Build(IKafkaConfigurationBuilder builder, KafkaModuleOptions options
information.SecurityProtocol = options.SecurityProtocol;
if (information.SecurityProtocol == SecurityProtocol.SaslSsl)
{
information.SslCaLocation = options.GetSaslCertPath();
information.SslCaLocation = CertHelper.GetCertPath(options.SaslCertBase64);
}
});
}
Expand Down
5 changes: 3 additions & 2 deletions src/Sitko.Core.Kafka/KafkaConsumerOffsetsEnsurer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using Sitko.Core.App.Helpers;

namespace Sitko.Core.Kafka;

Expand Down Expand Up @@ -93,7 +94,7 @@ await adminClient.CreateTopicsAsync(new[]
consumerConfig.SecurityProtocol = (SecurityProtocol?)options.SecurityProtocol;
if (consumerConfig.SecurityProtocol == SecurityProtocol.SaslSsl)
{
consumerConfig.SslCaLocation = options.GetSaslCertPath();
consumerConfig.SslCaLocation = CertHelper.GetCertPath(options.SaslCertBase64);
}
}
var cts = new CancellationTokenSource();
Expand Down Expand Up @@ -160,7 +161,7 @@ private IAdminClient GetAdminClient(KafkaModuleOptions options)
adminClientConfig.SecurityProtocol = (SecurityProtocol?)options.SecurityProtocol;
if (adminClientConfig.SecurityProtocol == SecurityProtocol.SaslSsl)
{
adminClientConfig.SslCaLocation = options.GetSaslCertPath();
adminClientConfig.SslCaLocation = CertHelper.GetCertPath(options.SaslCertBase64);
}
}

Expand Down
8 changes: 0 additions & 8 deletions src/Sitko.Core.Kafka/KafkaModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ public class KafkaModuleOptions : BaseModuleOptions
public bool EnableIdempotence { get; set; } = true;
public bool SocketNagleDisable { get; set; } = true;
public Acks Acks { get; set; } = Acks.All;

public string GetSaslCertPath()
{
var cert = Convert.FromBase64String(SaslCertBase64);
var path = Path.GetTempFileName();
File.WriteAllBytes(path, cert);
return path;
}
}

public class KafkaModuleOptionsValidator : AbstractValidator<KafkaModuleOptions>
Expand Down

0 comments on commit 71d73ae

Please sign in to comment.