Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mqtt message on workflow succeeded #67

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions api/Controllers/WorkflowsControlller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using api.Database;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Authorization;
using api.MQTT;
namespace api.Controllers;

public class WorkflowStartedNotification
Expand All @@ -20,8 +21,10 @@ public class WorkflowExitedNotification

[ApiController]
[Route("[controller]")]
public class WorkflowsController(IInspectionDataService inspectionDataService) : ControllerBase
public class WorkflowsController(IInspectionDataService inspectionDataService, MqttService mqttService) : ControllerBase
{
private readonly IInspectionDataService _inspectionDataService = inspectionDataService;
private readonly MqttService _mqttService = mqttService;

/// <summary>
/// Updates status of inspection data to started
Expand All @@ -33,7 +36,7 @@ public class WorkflowsController(IInspectionDataService inspectionDataService) :
[ProducesResponseType(StatusCodes.Status404NotFound)]
public async Task<ActionResult<InspectionDataResponse>> WorkflowStarted([FromBody] WorkflowStartedNotification notification)
{
var updatedInspectionData = await inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, WorkflowStatus.Started);
var updatedInspectionData = await _inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, WorkflowStatus.Started);
if (updatedInspectionData == null)
{
return NotFound($"Could not find workflow with inspection id {notification.InspectionId}");
Expand Down Expand Up @@ -63,11 +66,21 @@ public async Task<ActionResult<InspectionDataResponse>> WorkflowExited([FromBody
status = WorkflowStatus.ExitFailure;
}

var updatedInspectionData = await inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, status);
var updatedInspectionData = await _inspectionDataService.UpdateAnonymizerWorkflowStatus(notification.InspectionId, status);
if (updatedInspectionData == null)
{
return NotFound($"Could not find workflow with inspection id {notification.InspectionId}");
}

var mqttTopic = $"workflow/{notification.InspectionId}/status_success";
var mqttMessage = new
{
notification.InspectionId,
Status = notification.WorkflowStatus, // "Succeeded"" for success, else fail
Timestamp = DateTime.UtcNow
};

await _mqttService.PublishMessageAsync(mqttTopic, mqttMessage);
return Ok(updatedInspectionData);
}
}
15 changes: 14 additions & 1 deletion api/MQTT/MqttService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using api.MQTT;
using MQTTnet.Protocol;
using api.Utilities;
using MQTTnet;
using MQTTnet.Client;
Expand Down Expand Up @@ -265,5 +265,18 @@ private void OnIsarTopicReceived<T>(string content) where T : MqttMessage
_logger.LogWarning("{msg}", e.Message);
}
}

public async Task PublishMessageAsync<T>(string topic, T message)
{
string payload = JsonSerializer.Serialize(message, serializerOptions);
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
.Build();

await _mqttClient.EnqueueAsync(mqttMessage);
_logger.LogInformation("Published message to topic {Topic}: {Message}", topic, payload);
}
}
}
5 changes: 2 additions & 3 deletions api/MQTT/MqttTopics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ public static class MqttTopics
public static readonly Dictionary<string, Type> TopicsToMessages =
new()
{
{
"isar/+/inspection_result", typeof(IsarInspectionResultMessage)
}
{ "isar/+/inspection_result", typeof(IsarInspectionResultMessage) },
{ "workflow/+/status_success", typeof(WorkflowStatusSuccessMessage) }
};

/// <summary>
Expand Down
18 changes: 18 additions & 0 deletions api/MQTT/WorkflowStatusResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Text.Json.Serialization;

namespace api.MQTT;

/// <summary>
/// Represents the message payload for a successful workflow status update.
/// </summary>
public class WorkflowStatusSuccessMessage : MqttMessage
{
[JsonPropertyName("inspection_id")]
public required string InspectionId { get; set; }

[JsonPropertyName("status")]
public required string Status { get; set; }

[JsonPropertyName("timestamp")]
public required DateTime Timestamp { get; set; }
}
Loading