From aad4ac88f1853fd876507249821315ba36cea93b Mon Sep 17 00:00:00 2001 From: Alexander Twrdik <6052859+DiCanio@users.noreply.github.com> Date: Mon, 25 Mar 2024 20:24:19 +0100 Subject: [PATCH] feat: [wip] revise message sending functionality --- .../message/hub/hub.message.controller.ts | 41 ++++++++++-- .../hub/hub.message.producer.service.ts | 65 +++++++++++++++++++ 2 files changed, 101 insertions(+), 5 deletions(-) diff --git a/src/server/message/hub/hub.message.controller.ts b/src/server/message/hub/hub.message.controller.ts index b3cb69d..6e3c534 100644 --- a/src/server/message/hub/hub.message.controller.ts +++ b/src/server/message/hub/hub.message.controller.ts @@ -1,13 +1,24 @@ +/* eslint-disable max-classes-per-file */ import { - Body, Controller, HttpCode, Param, Post, UseGuards, + BadRequestException, + Body, Controller, HttpCode, NotFoundException, Param, Post, UseGuards, } from '@nestjs/common'; +import { ArrayNotEmpty, IsObject } from 'class-validator'; import { AuthGuard } from '../../auth/auth.guard'; -import { HubMessageProducerService } from './hub.message.producer.service'; +import { HubMessageProducerService, MessageProducerError } from './hub.message.producer.service'; + +class SendMessageBody { + @ArrayNotEmpty() + recipients: Array; + + @IsObject() + message: Record; +} /** * Bundles API endpoints related to message exchange with the central side (hub). */ -@Controller('messages') +@Controller('analyses/:id/messages') @UseGuards(AuthGuard) export class HubMessageController { private readonly hubMessageProducerService: HubMessageProducerService; @@ -16,9 +27,29 @@ export class HubMessageController { this.hubMessageProducerService = hubMessageProducerService; } - @Post('/broadcasts/:analysisId') + @Post('/broadcasts') @HttpCode(201) - async sendNodeBroadcast(@Param('analysisId') analysisId: string, @Body() messagePayload: Record) { + async sendNodeBroadcast(@Param('id') analysisId: string, @Body() messagePayload: Record) { return this.hubMessageProducerService.produceNodeBroadcastMessage(analysisId, messagePayload); } + + @Post('/') + @HttpCode(201) + async send(@Param('id') analysisId: string, @Body() data: SendMessageBody) { + return this.hubMessageProducerService.produceMessage(data.recipients, analysisId, data.message) + .catch((err) => { + if (err instanceof MessageProducerError) { + if (err.name === 'INVALID_RECIPIENTS') { + throw new BadRequestException('one or more of the given recipient node ids are invalid', { + cause: err, + description: err.message, + }); + } else if (err.name === 'ANALYSIS_NOT_FOUND') { + throw new NotFoundException(`analysis '${analysisId}' cannot be found`, { + cause: err, + }); + } + } + }); + } } diff --git a/src/server/message/hub/hub.message.producer.service.ts b/src/server/message/hub/hub.message.producer.service.ts index 87b627f..84534e0 100644 --- a/src/server/message/hub/hub.message.producer.service.ts +++ b/src/server/message/hub/hub.message.producer.service.ts @@ -1,6 +1,37 @@ +/* eslint-disable max-classes-per-file */ import { Injectable } from '@nestjs/common'; import { HubClient } from './hub.client'; +type ErrorName = + | 'INVALID_RECIPIENTS' + | 'ANALYSIS_NOT_FOUND'; + +/** + * Describes an error that occured while producing a message. + */ +export class MessageProducerError extends Error { + override name: ErrorName; + + override message: string; + + override cause: any; + + constructor({ + name, + message, + cause, + }: { + name: ErrorName; + message: string; + cause?: any; + }) { + super(); + this.name = name; + this.message = message; + this.cause = cause; + } +} + /** * A service for sending out messages to the central side (hub). */ @@ -20,6 +51,7 @@ export class HubMessageProducerService { * @returns */ async produceNodeBroadcastMessage(analysisId: string, payload: Record): Promise { + // TODO: add analysis not found error handling return this.hubClient.getAnalysisNodes(analysisId) .then((nodes) => ({ nodeIds: nodes @@ -31,4 +63,37 @@ export class HubMessageProducerService { })) .then((header) => this.hubClient.sendMessage(header, payload)); } + + /** + * Produces a message that is sent to all given recipients. The message will be associated with the given analysis ID. + * + * @param recipientNodeIds identifies all recipients of this message + * @param analysisId identifies the analysis that this message is associated with + * @param payload the actual message that is sent + * @returns + */ + async produceMessage(recipientNodeIds: Array, analysisId: string, payload: Record): Promise { + // TODO: add analysis not found error handling + return this.hubClient.getAnalysisNodes(analysisId) + .then((nodes) => nodes + .filter((n) => n.node.robot_id !== undefined && n.node.robot_id !== null) + .map((n) => n.node.robot_id || '')) + .then((paticipatingNodeIds) => { + const invalidRecipients = recipientNodeIds.filter((rn) => !paticipatingNodeIds.includes(rn)); + if (invalidRecipients.length > 0) { + throw new MessageProducerError({ + name: 'INVALID_RECIPIENTS', + message: `recipient node ids '[${invalidRecipients}]' are invalid for analysis '${analysisId}' since they are no participants`, + }); + } else { + return recipientNodeIds; + } + }) + .then((recipientNodeIds) => this.hubClient.sendMessage({ + nodeIds: recipientNodeIds, + metadata: { + analysisId, + }, + }, payload)); + } }