-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathMultiEventsConsumer.cs
86 lines (71 loc) · 3.61 KB
/
MultiEventsConsumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
using Microsoft.Extensions.Caching.Distributed;
namespace MultiEventsConsumer;
public class MultiEventsConsumer(IDistributedCache cache, ILogger<MultiEventsConsumer> logger) : IEventConsumer<DoorClosed>, IEventConsumer<DoorOpened>
{
private static readonly TimeSpan SimulationDuration = TimeSpan.FromSeconds(3);
public async Task ConsumeAsync(EventContext<DoorOpened> context, CancellationToken cancellationToken = default)
{
var evt = context.Event;
var vehicleId = evt.VehicleId;
var kind = evt.Kind;
logger.LogInformation("{DoorKind} door for {VehicleId} was opened at {Opened:r}.", kind, vehicleId, evt.Opened);
// Get the door state from the cache (or database)
var state = await GetLastDoorStateAsync(vehicleId, kind, cancellationToken);
// If the door is not currently open, update and send a notification
if (state != DoorState.Open)
{
// save to cache (or database)
await SaveDoorStateAsync(vehicleId, kind, DoorState.Open, cancellationToken);
// Send push notification
logger.LogInformation("Sending push notification.");
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay
}
else
{
logger.LogInformation("Ignoring door opened event because the door was already opened");
}
}
public async Task ConsumeAsync(EventContext<DoorClosed> context, CancellationToken cancellationToken = default)
{
var evt = context.Event;
var vehicleId = evt.VehicleId;
var kind = evt.Kind;
logger.LogInformation("{DoorKind} door for {VehicleId} was closed at {Opened:r}.", kind, vehicleId, evt.Closed);
// Get the door state from the cache (or database)
var state = await GetLastDoorStateAsync(vehicleId, kind, cancellationToken);
// If the door is not currently closed, update and send a notification
if (state != DoorState.Closed)
{
// save to cache (or database)
await SaveDoorStateAsync(vehicleId, kind, DoorState.Closed, cancellationToken);
// Send push notification
logger.LogInformation("Sending push notification.");
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay
}
else
{
logger.LogInformation("Ignoring door closed event because the door was already closed");
}
}
private static string MakeCacheKey(string vehicleId, DoorKind kind) => $"{vehicleId}/{kind}".ToLowerInvariant();
private async Task<DoorState?> GetLastDoorStateAsync(string? vehicleId, DoorKind kind, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(vehicleId))
{
throw new ArgumentException($"'{nameof(vehicleId)}' cannot be null or whitespace.", nameof(vehicleId));
}
string key = MakeCacheKey(vehicleId, kind);
var stateStr = await cache.GetStringAsync(key, cancellationToken);
return Enum.TryParse<DoorState>(stateStr, ignoreCase: true, out var state) ? state : null;
}
private async Task SaveDoorStateAsync(string? vehicleId, DoorKind kind, DoorState state, CancellationToken cancellation)
{
if (string.IsNullOrWhiteSpace(vehicleId))
{
throw new ArgumentException($"'{nameof(vehicleId)}' cannot be null or whitespace.", nameof(vehicleId));
}
string key = MakeCacheKey(vehicleId, kind);
var stateStr = state.ToString().ToLowerInvariant();
await cache.SetStringAsync(key, stateStr, cancellation);
}
}