-
Notifications
You must be signed in to change notification settings - Fork 0
/
Program.cs
59 lines (50 loc) · 2.35 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using MassTransit;
namespace GettingStarted
{
public class Program
{
public static async Task Main(string[] args)
{
await CreateHostBuilder(args).Build().RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.UsingInMemory((context, config) => config.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>(typeof(KafkaMessageConsumerDefinition));
rider.UsingKafka((context, k) =>
{
k.Host("host", h =>
{
h.UseSasl(sasl =>
{
sasl.Username = "username";
sasl.Password = "password";
sasl.Mechanism = SaslMechanism.Plain;
});
h.UseSsl(s => s.EndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.Https);
});
k.SecurityProtocol = SecurityProtocol.SaslSsl;
k.TopicEndpoint<KafkaMessage>("my-topic", "my-kafka-consumer-2", e =>
{
e.ConfigureConsumer<KafkaMessageConsumer>(context);
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConcurrentDeliveryLimit = 100; // Important to set for single-key topics, otherwise consumer will return 1 message per batch
e.CheckpointMessageCount = 100;
e.CheckpointInterval = TimeSpan.FromMinutes(1);
});
});
});
});
});
}
}