From 59fccb4039d8c437184776607354b8d2ba5adc8f Mon Sep 17 00:00:00 2001 From: Ola Alstad Date: Tue, 3 Dec 2024 12:08:07 +0100 Subject: [PATCH] Add mqtt message on workflow succeeded --- api/Controllers/WorkflowsControlller.cs | 19 ++++++++++++++++--- api/MQTT/MqttService.cs | 15 ++++++++++++++- api/MQTT/MqttTopics.cs | 5 ++--- api/MQTT/WorkflowStatusResult.cs | 18 ++++++++++++++++++ 4 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 api/MQTT/WorkflowStatusResult.cs diff --git a/api/Controllers/WorkflowsControlller.cs b/api/Controllers/WorkflowsControlller.cs index 8c5d60b..ab7eecf 100644 --- a/api/Controllers/WorkflowsControlller.cs +++ b/api/Controllers/WorkflowsControlller.cs @@ -3,6 +3,7 @@ using api.Database; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Authorization; +using api.MQTT; namespace api.Controllers; public class WorkflowStartedNotification @@ -20,8 +21,10 @@ public class WorkflowExitedNotification [ApiController] [Route("[controller]")] -public class WorkflowsController(IInspectionDataService inspectionDataService) : ControllerBase +public class WorkflowsController(IInspectionDataService inspectionDataService, MqttService mqttService) : ControllerBase { + private readonly IInspectionDataService _inspectionDataService = inspectionDataService; + private readonly MqttService _mqttService = mqttService; /// /// Updates status of inspection data to started @@ -33,7 +36,7 @@ public class WorkflowsController(IInspectionDataService inspectionDataService) : [ProducesResponseType(StatusCodes.Status404NotFound)] public async Task> WorkflowStarted([FromBody] WorkflowStartedNotification notification) { - var updatedInspectionData = await inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, WorkflowStatus.Started); + var updatedInspectionData = await _inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, WorkflowStatus.Started); if (updatedInspectionData == null) { return NotFound($"Could not find workflow with inspection id {notification.InspectionId}"); @@ -63,11 +66,21 @@ public async Task> WorkflowExited([FromBody status = WorkflowStatus.ExitFailure; } - var updatedInspectionData = await inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, status); + var updatedInspectionData = await _inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, status); if (updatedInspectionData == null) { return NotFound($"Could not find workflow with inspection id {notification.InspectionId}"); } + + var mqttTopic = $"workflow/{notification.InspectionId}/status_success"; + var mqttMessage = new + { + notification.InspectionId, + Status = notification.WorkflowStatus, // "Succeeded"" for success, else fail + Timestamp = DateTime.UtcNow + }; + + await _mqttService.PublishMessageAsync(mqttTopic, mqttMessage); return Ok(updatedInspectionData); } } diff --git a/api/MQTT/MqttService.cs b/api/MQTT/MqttService.cs index 4c2eaaa..91d3e19 100644 --- a/api/MQTT/MqttService.cs +++ b/api/MQTT/MqttService.cs @@ -1,7 +1,7 @@ using System.Text; using System.Text.Json; using System.Text.Json.Serialization; -using api.MQTT; +using MQTTnet.Protocol; using api.Utilities; using MQTTnet; using MQTTnet.Client; @@ -265,5 +265,18 @@ private void OnIsarTopicReceived(string content) where T : MqttMessage _logger.LogWarning("{msg}", e.Message); } } + + public async Task PublishMessageAsync(string topic, T message) + { + string payload = JsonSerializer.Serialize(message, serializerOptions); + var mqttMessage = new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(payload) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce) + .Build(); + + await _mqttClient.EnqueueAsync(mqttMessage); + _logger.LogInformation("Published message to topic {Topic}: {Message}", topic, payload); + } } } diff --git a/api/MQTT/MqttTopics.cs b/api/MQTT/MqttTopics.cs index 648c2fb..9f00306 100644 --- a/api/MQTT/MqttTopics.cs +++ b/api/MQTT/MqttTopics.cs @@ -12,9 +12,8 @@ public static class MqttTopics public static readonly Dictionary TopicsToMessages = new() { - { - "isar/+/inspection_result", typeof(IsarInspectionResultMessage) - } + { "isar/+/inspection_result", typeof(IsarInspectionResultMessage) }, + { "workflow/+/status_success", typeof(WorkflowStatusSuccessMessage) } }; /// diff --git a/api/MQTT/WorkflowStatusResult.cs b/api/MQTT/WorkflowStatusResult.cs new file mode 100644 index 0000000..de46406 --- /dev/null +++ b/api/MQTT/WorkflowStatusResult.cs @@ -0,0 +1,18 @@ +using System.Text.Json.Serialization; + +namespace api.MQTT; + +/// +/// Represents the message payload for a successful workflow status update. +/// +public class WorkflowStatusSuccessMessage : MqttMessage +{ + [JsonPropertyName("inspection_id")] + public required string InspectionId { get; set; } + + [JsonPropertyName("status")] + public required string Status { get; set; } + + [JsonPropertyName("timestamp")] + public required DateTime Timestamp { get; set; } +}