From b7d8b210663f2b700a55ee5364b03b00b9bdfab7 Mon Sep 17 00:00:00 2001 From: andchiind Date: Fri, 22 Nov 2024 09:59:16 +0100 Subject: [PATCH 1/2] Fetch media config from ISAR with API call --- backend/api.test/Mocks/IsarServiceMock.cs | 12 ++++ .../api/Controllers/MediaStreamController.cs | 51 +++++++++++++++ backend/api/EventHandlers/MqttEventHandler.cs | 22 ------- .../api/MQTT/MessageModels/IsarMediaConfig.cs | 25 -------- backend/api/MQTT/MqttService.cs | 5 -- backend/api/MQTT/MqttTopics.cs | 3 - backend/api/Services/IsarService.cs | 62 +++++++++++++++++++ .../api/Services/Models/IsarMediaConfig.cs | 18 ++++++ backend/api/Services/Models/MediaConfig.cs | 10 +-- 9 files changed, 144 insertions(+), 64 deletions(-) create mode 100644 backend/api/Controllers/MediaStreamController.cs delete mode 100644 backend/api/MQTT/MessageModels/IsarMediaConfig.cs create mode 100644 backend/api/Services/Models/IsarMediaConfig.cs diff --git a/backend/api.test/Mocks/IsarServiceMock.cs b/backend/api.test/Mocks/IsarServiceMock.cs index 476769f0..9bd3d797 100644 --- a/backend/api.test/Mocks/IsarServiceMock.cs +++ b/backend/api.test/Mocks/IsarServiceMock.cs @@ -51,5 +51,17 @@ public async Task StartMoveArm(Robot robot, string position) ); return isarServiceMissionResponse; } + + public async Task GetMediaStreamConfig(Robot robot) + { + await Task.Run(() => Thread.Sleep(1)); + return new MediaConfig + { + Url = "mockURL", + Token = "mockToken", + RobotId = robot.Id, + MediaConnectionType = MediaConnectionType.LiveKit + }; + } } } diff --git a/backend/api/Controllers/MediaStreamController.cs b/backend/api/Controllers/MediaStreamController.cs new file mode 100644 index 00000000..963d00d2 --- /dev/null +++ b/backend/api/Controllers/MediaStreamController.cs @@ -0,0 +1,51 @@ +using Api.Controllers.Models; +using Api.Services; +using Api.Services.Models; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; + +namespace Api.Controllers +{ + [ApiController] + [Route("media-stream")] + public class MediaStreamController( + ILogger logger, + IIsarService isarService, + IRobotService robotService + ) : ControllerBase + { + /// + /// Request the config for a new media stream connection from ISAR + /// + /// + /// This query gets a new media stream connection config from ISAR + /// + [HttpGet] + [Authorize(Roles = Role.Any)] + [Route("{id}")] + [ProducesResponseType(typeof(MediaConfig), StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status401Unauthorized)] + [ProducesResponseType(StatusCodes.Status403Forbidden)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task> GetMediaStreamConfig([FromRoute] string id) + { + try + { + var robot = await robotService.ReadById(id); + if (robot == null) + { + return NotFound($"Could not find robot with ID {id}"); + } + + var config = await isarService.GetMediaStreamConfig(robot); + return Ok(config); + } + catch (Exception e) + { + logger.LogError(e, "Error during GET of media stream config from ISAR"); + throw; + } + } + } +} diff --git a/backend/api/EventHandlers/MqttEventHandler.cs b/backend/api/EventHandlers/MqttEventHandler.cs index f668b27f..51b2ba12 100644 --- a/backend/api/EventHandlers/MqttEventHandler.cs +++ b/backend/api/EventHandlers/MqttEventHandler.cs @@ -59,7 +59,6 @@ public override void Subscribe() MqttService.MqttIsarPressureReceived += OnIsarPressureUpdate; MqttService.MqttIsarPoseReceived += OnIsarPoseUpdate; MqttService.MqttIsarCloudHealthReceived += OnIsarCloudHealthUpdate; - MqttService.MqttIsarMediaConfigReceived += OnIsarMediaConfigUpdate; } public override void Unsubscribe() @@ -72,7 +71,6 @@ public override void Unsubscribe() MqttService.MqttIsarPressureReceived -= OnIsarPressureUpdate; MqttService.MqttIsarPoseReceived -= OnIsarPoseUpdate; MqttService.MqttIsarCloudHealthReceived -= OnIsarCloudHealthUpdate; - MqttService.MqttIsarMediaConfigReceived -= OnIsarMediaConfigUpdate; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await stoppingToken; } @@ -538,25 +536,5 @@ private async void OnIsarCloudHealthUpdate(object? sender, MqttReceivedArgs mqtt TeamsMessageService.TriggerTeamsMessageReceived(new TeamsMessageEventArgs(message)); } - - private async void OnIsarMediaConfigUpdate(object? sender, MqttReceivedArgs mqttArgs) - { - var isarTelemetyUpdate = (IsarMediaConfigMessage)mqttArgs.Message; - - var robot = await RobotService.ReadByIsarId(isarTelemetyUpdate.IsarId); - if (robot == null) - { - _logger.LogInformation("Received message from unknown ISAR instance {Id} with robot name {Name}", isarTelemetyUpdate.IsarId, isarTelemetyUpdate.RobotName); - return; - } - await SignalRService.SendMessageAsync("Media stream config received", robot.CurrentInstallation, - new MediaConfig - { - Url = isarTelemetyUpdate.Url, - Token = isarTelemetyUpdate.Token, - RobotId = robot.Id, - MediaConnectionType = isarTelemetyUpdate.MediaConnectionType - }); - } } } diff --git a/backend/api/MQTT/MessageModels/IsarMediaConfig.cs b/backend/api/MQTT/MessageModels/IsarMediaConfig.cs deleted file mode 100644 index 214a4420..00000000 --- a/backend/api/MQTT/MessageModels/IsarMediaConfig.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System.Text.Json.Serialization; -using Api.Services.Models; - -namespace Api.Mqtt.MessageModels -{ -#nullable disable - public class IsarMediaConfigMessage : MqttMessage - { - [JsonPropertyName("robot_name")] - public string RobotName { get; set; } - - [JsonPropertyName("isar_id")] - public string IsarId { get; set; } - - [JsonPropertyName("url")] - public string Url { get; set; } - - [JsonPropertyName("token")] - public string Token { get; set; } - - [JsonPropertyName("mediaConnectionType")] - public MediaConnectionType MediaConnectionType { get; set; } - - } -} diff --git a/backend/api/MQTT/MqttService.cs b/backend/api/MQTT/MqttService.cs index 0a574331..480430f7 100644 --- a/backend/api/MQTT/MqttService.cs +++ b/backend/api/MQTT/MqttService.cs @@ -91,7 +91,6 @@ public MqttService(ILogger logger, IConfiguration config) public static event EventHandler? MqttIsarPressureReceived; public static event EventHandler? MqttIsarPoseReceived; public static event EventHandler? MqttIsarCloudHealthReceived; - public static event EventHandler? MqttIsarMediaConfigReceived; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { @@ -153,9 +152,6 @@ private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs messageRe case Type type when type == typeof(IsarCloudHealthMessage): OnIsarTopicReceived(content); break; - case Type type when type == typeof(IsarMediaConfigMessage): - OnIsarTopicReceived(content); - break; default: _logger.LogWarning( "No callback defined for MQTT message type '{type}'", @@ -305,7 +301,6 @@ private void OnIsarTopicReceived(string content) where T : MqttMessage _ when type == typeof(IsarPressureMessage) => MqttIsarPressureReceived, _ when type == typeof(IsarPoseMessage) => MqttIsarPoseReceived, _ when type == typeof(IsarCloudHealthMessage) => MqttIsarCloudHealthReceived, - _ when type == typeof(IsarMediaConfigMessage) => MqttIsarMediaConfigReceived, _ => throw new NotImplementedException( $"No event defined for message type '{typeof(T).Name}'" diff --git a/backend/api/MQTT/MqttTopics.cs b/backend/api/MQTT/MqttTopics.cs index 9a499cc2..3d6fcbf4 100644 --- a/backend/api/MQTT/MqttTopics.cs +++ b/backend/api/MQTT/MqttTopics.cs @@ -42,9 +42,6 @@ public static class MqttTopics }, { "isar/+/cloud_health", typeof(IsarCloudHealthMessage) - }, - { - "isar/+/media_config", typeof(IsarMediaConfigMessage) } }; diff --git a/backend/api/Services/IsarService.cs b/backend/api/Services/IsarService.cs index 076be91f..e883917a 100644 --- a/backend/api/Services/IsarService.cs +++ b/backend/api/Services/IsarService.cs @@ -17,6 +17,8 @@ public interface IIsarService public Task ResumeMission(Robot robot); public Task StartMoveArm(Robot robot, string armPosition); + + public Task GetMediaStreamConfig(Robot robot); } public class IsarService(IDownstreamApi isarApi, ILogger logger) : IIsarService @@ -275,5 +277,65 @@ private static (string, int) GetErrorDescriptionFoFailedIsarRequest(HttpResponse return (description, (int)statusCode); } + + public async Task GetMediaStreamConfig(Robot robot) + { + string mediaStreamPath = $"/media/media-stream-config"; + var response = await CallApi( + HttpMethod.Get, + robot.IsarUri, + mediaStreamPath + ); + + if (!response.IsSuccessStatusCode) + { + (string message, _) = GetErrorDescriptionFoFailedIsarRequest(response); + string errorResponse = await response.Content.ReadAsStringAsync(); + logger.LogError("{Message}: {ErrorResponse}", message, errorResponse); + throw new ConfigException(message); + } + if (response.Content is null) + { + string errorMessage = "Could not read content from new robot media stream config"; + logger.LogError("{ErrorMessage}", errorMessage); + throw new ConfigException(errorMessage); + } + + IsarMediaConfigMessage? isarMediaConfigResponse; + try + { + isarMediaConfigResponse = await response.Content.ReadFromJsonAsync(); + } + catch (JsonException) + { + string errorMessage = $"Could not parse content from new robot media stream config. {await response.Content.ReadAsStringAsync()}"; + logger.LogError("{ErrorMessage}", errorMessage); + throw new ConfigException(errorMessage); + } + + if (isarMediaConfigResponse == null) + { + string errorMessage = $"Parsing of robot media stream config resulted in empty config. {await response.Content.ReadAsStringAsync()}"; + logger.LogError("{ErrorMessage}", errorMessage); + throw new ConfigException(errorMessage); + } + + bool parseSuccess = Enum.TryParse(isarMediaConfigResponse.MediaConnectionType, out MediaConnectionType connectionType); + + if (!parseSuccess) + { + string errorMessage = $"Could not parse connection type from new robot media stream config. {isarMediaConfigResponse.MediaConnectionType}"; + logger.LogError("{ErrorMessage}", errorMessage); + throw new ConfigException(errorMessage); + } + + return new MediaConfig + { + Url = isarMediaConfigResponse.Url, + Token = isarMediaConfigResponse.Token, + RobotId = robot.Id, + MediaConnectionType = connectionType + }; + } } } diff --git a/backend/api/Services/Models/IsarMediaConfig.cs b/backend/api/Services/Models/IsarMediaConfig.cs new file mode 100644 index 00000000..07f3d3a3 --- /dev/null +++ b/backend/api/Services/Models/IsarMediaConfig.cs @@ -0,0 +1,18 @@ +using System.Text.Json.Serialization; + +namespace Api.Services.Models +{ +#nullable disable + public class IsarMediaConfigMessage + { + [JsonPropertyName("url")] + public string Url { get; set; } + + [JsonPropertyName("token")] + public string Token { get; set; } + + [JsonPropertyName("media_connection_type")] + public string MediaConnectionType { get; set; } + + } +} diff --git a/backend/api/Services/Models/MediaConfig.cs b/backend/api/Services/Models/MediaConfig.cs index 8d5a077f..08804897 100644 --- a/backend/api/Services/Models/MediaConfig.cs +++ b/backend/api/Services/Models/MediaConfig.cs @@ -1,18 +1,10 @@ -using System.Text.Json.Serialization; -namespace Api.Services.Models +namespace Api.Services.Models { public struct MediaConfig { - [JsonPropertyName("url")] public string? Url { get; set; } - - [JsonPropertyName("token")] public string? Token { get; set; } - - [JsonPropertyName("robotId")] public string? RobotId { get; set; } - - [JsonPropertyName("mediaConnectionType")] public MediaConnectionType MediaConnectionType { get; set; } } From 18148c36f5667374f0e682be14e5538c83101763 Mon Sep 17 00:00:00 2001 From: andchiind Date: Mon, 25 Nov 2024 13:13:10 +0100 Subject: [PATCH 2/2] Reconnect to video streams on disconnect --- frontend/src/api/ApiCaller.tsx | 7 + .../Contexts/MediaStreamContext.tsx | 148 ++++++++++++------ .../Pages/MissionPage/MissionPage.tsx | 8 +- .../components/Pages/RobotPage/RobotPage.tsx | 7 +- 4 files changed, 119 insertions(+), 51 deletions(-) diff --git a/frontend/src/api/ApiCaller.tsx b/frontend/src/api/ApiCaller.tsx index 8fd03de9..cac0e2be 100644 --- a/frontend/src/api/ApiCaller.tsx +++ b/frontend/src/api/ApiCaller.tsx @@ -13,6 +13,7 @@ import { MissionDefinition, PlantInfo } from 'models/MissionDefinition' import { MissionDefinitionUpdateForm } from 'models/MissionDefinitionUpdateForm' import { Deck } from 'models/Deck' import { ApiError, isApiError } from './ApiError' +import { MediaStreamConfig } from 'models/VideoStream' /** Implements the request sent to the backend api. */ export class BackendAPICaller { @@ -140,6 +141,12 @@ export class BackendAPICaller { return result.content } + static async getRobotMediaConfig(robotId: string): Promise { + const path: string = 'media-stream/' + robotId + const result = await this.GET(path).catch(BackendAPICaller.handleError('GET', path)) + return result.content + } + static async getMissionRuns(parameters: MissionRunQueryParameters): Promise> { let path: string = 'missions/runs?' diff --git a/frontend/src/components/Contexts/MediaStreamContext.tsx b/frontend/src/components/Contexts/MediaStreamContext.tsx index 118893e8..d201a32c 100644 --- a/frontend/src/components/Contexts/MediaStreamContext.tsx +++ b/frontend/src/components/Contexts/MediaStreamContext.tsx @@ -1,22 +1,19 @@ import { createContext, FC, useContext, useEffect, useState } from 'react' -import { SignalREventLabels, useSignalRContext } from './SignalRContext' -import { useRobotContext } from './RobotContext' -import { - ConnectionState, - RemoteParticipant, - RemoteTrack, - RemoteTrackPublication, - Room, - RoomEvent, -} from 'livekit-client' +import { ConnectionState, Room, RoomEvent } from 'livekit-client' import { MediaConnectionType, MediaStreamConfig } from 'models/VideoStream' +import { BackendAPICaller } from 'api/ApiCaller' type MediaStreamDictionaryType = { - [robotId: string]: MediaStreamConfig & { streams: MediaStreamTrack[] } + [robotId: string]: { isLoading: boolean } & MediaStreamConfig & { streams: MediaStreamTrack[] } +} + +type MediaStreamConfigDictionaryType = { + [robotId: string]: MediaStreamConfig } interface IMediaStreamContext { mediaStreams: MediaStreamDictionaryType + addMediaStreamConfigIfItDoesNotExist: (robotId: string) => void } interface Props { @@ -25,6 +22,7 @@ interface Props { const defaultMediaStreamInterface = { mediaStreams: {}, + addMediaStreamConfigIfItDoesNotExist: (robotId: string) => {}, } const MediaStreamContext = createContext(defaultMediaStreamInterface) @@ -33,76 +31,128 @@ export const MediaStreamProvider: FC = ({ children }) => { const [mediaStreams, setMediaStreams] = useState( defaultMediaStreamInterface.mediaStreams ) - const { registerEvent, connectionReady } = useSignalRContext() - const { enabledRobots } = useRobotContext() + const [cachedConfigs] = useState( + JSON.parse(window.localStorage.getItem('mediaConfigs') ?? '{}') + ) + + useEffect(() => { + // Here we maintain the localstorage with the connection details + let updatedConfigs: MediaStreamConfigDictionaryType = {} + Object.keys(mediaStreams).forEach((robotId) => { + const conf = mediaStreams[robotId] + + if (conf.streams.length === 0 && !conf.isLoading) refreshRobotMediaConfig(robotId) + updatedConfigs[robotId] = { + url: conf.url, + token: conf.token, + mediaConnectionType: conf.mediaConnectionType, + robotId: conf.robotId, + } + }) + window.localStorage.setItem('mediaConfigs', JSON.stringify(updatedConfigs)) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [mediaStreams]) const addTrackToConnection = (newTrack: MediaStreamTrack, robotId: string) => { setMediaStreams((oldStreams) => { - if (!Object.keys(oldStreams).includes(robotId)) { + if ( + !Object.keys(oldStreams).includes(robotId) || + oldStreams[robotId].streams.find((s) => s.id === newTrack.id) + ) { return oldStreams } else { - const newStreams = { ...oldStreams } return { ...oldStreams, - [robotId]: { ...newStreams[robotId], streams: [...oldStreams[robotId].streams, newTrack] }, + [robotId]: { + ...oldStreams[robotId], + streams: [...oldStreams[robotId].streams, newTrack], + isLoading: false, + }, } } }) } - const createLiveKitConnection = async (config: MediaStreamConfig) => { + const createLiveKitConnection = async (config: MediaStreamConfig, cachedConfig: boolean = false) => { const room = new Room() - room.on(RoomEvent.TrackSubscribed, handleTrackSubscribed) - - function handleTrackSubscribed( - track: RemoteTrack, - publication: RemoteTrackPublication, - participant: RemoteParticipant - ) { - addTrackToConnection(track.mediaStreamTrack, config.robotId) - } + + window.addEventListener('unload', async () => room.disconnect()) + + room.on(RoomEvent.TrackSubscribed, (track) => addTrackToConnection(track.mediaStreamTrack, config.robotId)) + room.on(RoomEvent.TrackUnpublished, (e) => { + setMediaStreams((oldStreams) => { + let streamsCopy = { ...oldStreams } + if (!Object.keys(streamsCopy).includes(config.robotId) || streamsCopy[config.robotId].isLoading) + return streamsCopy + + let streamList = streamsCopy[config.robotId].streams + const streamIndex = streamList.findIndex((s) => s.id === e.trackSid) + + if (streamIndex < 0) return streamsCopy + + streamList.splice(streamIndex, 1) + streamsCopy[config.robotId].streams = streamList + + if (streamList.length === 0) room.disconnect() + + return streamsCopy + }) + }) + if (room.state === ConnectionState.Disconnected) { room.connect(config.url, config.token) - .then(() => console.log(JSON.stringify(room.state))) - .catch((error) => console.warn('Error connecting to LiveKit Room, may already be connected:', error)) + .then(() => console.log('LiveKit room status: ', JSON.stringify(room.state))) + .catch((error) => { + if (cachedConfig) refreshRobotMediaConfig(config.robotId) + else console.error('Failed to connect to LiveKit room: ', error) + }) } } - const createMediaConnection = async (config: MediaStreamConfig) => { + const createMediaConnection = async (config: MediaStreamConfig, cachedConfig: boolean = false) => { switch (config.mediaConnectionType) { case MediaConnectionType.LiveKit: - return await createLiveKitConnection(config) + return await createLiveKitConnection(config, cachedConfig) default: console.error('Invalid media connection type received') } return undefined } - // Register a signalR event handler that listens for new media stream connections - useEffect(() => { - if (connectionReady) { - registerEvent(SignalREventLabels.mediaStreamConfigReceived, (username: string, message: string) => { - const newMediaConfig: MediaStreamConfig = JSON.parse(message) - setMediaStreams((oldStreams) => { - if (Object.keys(oldStreams).includes(newMediaConfig.robotId)) { - return oldStreams - } else { - createMediaConnection(newMediaConfig) - return { - ...oldStreams, - [newMediaConfig.robotId]: { ...newMediaConfig, streams: [] }, - } - } - }) - }) + const addConfigToMediaStreams = (conf: MediaStreamConfig, cachedConfig: boolean = false) => { + setMediaStreams((oldStreams) => { + createMediaConnection(conf, cachedConfig) + return { + ...oldStreams, + [conf.robotId]: { ...conf, streams: [], isLoading: true }, + } + }) + } + + const addMediaStreamConfigIfItDoesNotExist = (robotId: string) => { + if (Object.keys(mediaStreams).includes(robotId)) { + const currentStream = mediaStreams[robotId] + if (currentStream.isLoading || currentStream.streams.find((stream) => stream.enabled)) return + } else if (Object.keys(cachedConfigs).includes(robotId)) { + const config = cachedConfigs[robotId] + addConfigToMediaStreams(config, true) + return } - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [registerEvent, connectionReady, enabledRobots]) + + refreshRobotMediaConfig(robotId) + } + + const refreshRobotMediaConfig = (robotId: string) => { + BackendAPICaller.getRobotMediaConfig(robotId) + .then((conf: MediaStreamConfig) => addConfigToMediaStreams(conf)) + .catch((e) => console.error(e)) + } return ( {children} diff --git a/frontend/src/components/Pages/MissionPage/MissionPage.tsx b/frontend/src/components/Pages/MissionPage/MissionPage.tsx index f6fb417e..660ac186 100644 --- a/frontend/src/components/Pages/MissionPage/MissionPage.tsx +++ b/frontend/src/components/Pages/MissionPage/MissionPage.tsx @@ -41,7 +41,13 @@ export const MissionPage = () => { const [videoMediaStreams, setVideoMediaStreams] = useState([]) const [selectedMission, setSelectedMission] = useState() const { registerEvent, connectionReady } = useSignalRContext() - const { mediaStreams } = useMediaStreamContext() + const { mediaStreams, addMediaStreamConfigIfItDoesNotExist } = useMediaStreamContext() + + useEffect(() => { + if (selectedMission && !Object.keys(mediaStreams).includes(selectedMission?.robot.id)) + addMediaStreamConfigIfItDoesNotExist(selectedMission?.robot.id) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [selectedMission]) useEffect(() => { if (connectionReady) { diff --git a/frontend/src/components/Pages/RobotPage/RobotPage.tsx b/frontend/src/components/Pages/RobotPage/RobotPage.tsx index c66dfff6..16f66db7 100644 --- a/frontend/src/components/Pages/RobotPage/RobotPage.tsx +++ b/frontend/src/components/Pages/RobotPage/RobotPage.tsx @@ -60,10 +60,15 @@ export const RobotPage = () => { const { setAlert, setListAlert } = useAlertContext() const { robotId } = useParams() const { enabledRobots } = useRobotContext() - const { mediaStreams } = useMediaStreamContext() + const { mediaStreams, addMediaStreamConfigIfItDoesNotExist } = useMediaStreamContext() const [videoMediaStreams, setVideoMediaStreams] = useState([]) const { ongoingMissions } = useMissionsContext() + useEffect(() => { + if (robotId && !Object.keys(mediaStreams).includes(robotId)) addMediaStreamConfigIfItDoesNotExist(robotId) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [robotId]) + const selectedRobot = enabledRobots.find((robot) => robot.id === robotId) const [isDialogOpen, setIsDialogOpen] = useState(false)