Skip to content

Commit

Permalink
feat: [wip] revise message sending functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
DiCanio committed Mar 26, 2024
1 parent c4de873 commit e3e9efa
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
41 changes: 36 additions & 5 deletions src/server/message/hub/hub.message.controller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
/* eslint-disable max-classes-per-file */
import {
Body, Controller, HttpCode, Param, Post, UseGuards,
BadRequestException,
Body, Controller, HttpCode, NotFoundException, Param, Post, UseGuards,
} from '@nestjs/common';
import { ArrayNotEmpty, IsObject } from 'class-validator';
import { AuthGuard } from '../../auth/auth.guard';
import { HubMessageProducerService } from './hub.message.producer.service';
import { HubMessageProducerService, MessageProducerError } from './hub.message.producer.service';

class SendMessageBody {
@ArrayNotEmpty()
recipients: Array<string>;

@IsObject()
message: Record<string, any>;
}

/**
* Bundles API endpoints related to message exchange with the central side (hub).
*/
@Controller('messages')
@Controller('analyses/:id/messages')
@UseGuards(AuthGuard)
export class HubMessageController {
private readonly hubMessageProducerService: HubMessageProducerService;
Expand All @@ -16,9 +27,29 @@ export class HubMessageController {
this.hubMessageProducerService = hubMessageProducerService;
}

@Post('/broadcasts/:analysisId')
@Post('/broadcasts')
@HttpCode(201)
async sendNodeBroadcast(@Param('analysisId') analysisId: string, @Body() messagePayload: Record<string, any>) {
async sendNodeBroadcast(@Param('id') analysisId: string, @Body() messagePayload: Record<string, any>) {
return this.hubMessageProducerService.produceNodeBroadcastMessage(analysisId, messagePayload);
}

@Post('/')
@HttpCode(201)
async send(@Param('id') analysisId: string, @Body() data: SendMessageBody) {
return this.hubMessageProducerService.produceMessage(data.recipients, analysisId, data.message)
.catch((err) => {
if (err instanceof MessageProducerError) {
if (err.name === 'INVALID_RECIPIENTS') {
throw new BadRequestException('one or more of the given recipient node ids are invalid', {
cause: err,
description: err.message,
});
} else if (err.name === 'ANALYSIS_NOT_FOUND') {
throw new NotFoundException(`analysis '${analysisId}' cannot be found`, {
cause: err,
});
}
}
});
}
}
65 changes: 65 additions & 0 deletions src/server/message/hub/hub.message.producer.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,37 @@
/* eslint-disable max-classes-per-file */
import { Injectable } from '@nestjs/common';
import { HubClient } from './hub.client';

type ErrorName =
| 'INVALID_RECIPIENTS'
| 'ANALYSIS_NOT_FOUND';

/**
* Describes an error that occured while producing a message.
*/
export class MessageProducerError extends Error {
override name: ErrorName;

override message: string;

override cause: any;

constructor({
name,
message,
cause,
}: {
name: ErrorName;
message: string;
cause?: any;
}) {
super();
this.name = name;
this.message = message;
this.cause = cause;
}
}

/**
* A service for sending out messages to the central side (hub).
*/
Expand All @@ -20,6 +51,7 @@ export class HubMessageProducerService {
* @returns
*/
async produceNodeBroadcastMessage(analysisId: string, payload: Record<string, any>): Promise<void> {
// TODO: add analysis not found error handling
return this.hubClient.getAnalysisNodes(analysisId)
.then((nodes) => ({
nodeIds: nodes
Expand All @@ -31,4 +63,37 @@ export class HubMessageProducerService {
}))
.then((header) => this.hubClient.sendMessage(header, payload));
}

/**
* Produces a message that is sent to all given recipients. The message will be associated with the given analysis ID.
*
* @param recipientNodeIds identifies all recipients of this message
* @param analysisId identifies the analysis that this message is associated with
* @param payload the actual message that is sent
* @returns
*/
async produceMessage(recipientNodeIds: Array<string>, analysisId: string, payload: Record<string, any>): Promise<void> {
// TODO: add analysis not found error handling
return this.hubClient.getAnalysisNodes(analysisId)
.then((nodes) => nodes
.filter((n) => n.node.robot_id !== undefined && n.node.robot_id !== null)
.map((n) => n.node.robot_id || ''))
.then((paticipatingNodeIds) => {
const invalidRecipients = recipientNodeIds.filter((rn) => !paticipatingNodeIds.includes(rn));
if (invalidRecipients.length > 0) {
throw new MessageProducerError({
name: 'INVALID_RECIPIENTS',
message: `recipient node ids '[${invalidRecipients}]' are invalid for analysis '${analysisId}' since they are no participants`,
});
} else {
return recipientNodeIds;
}
})
.then((recipientNodeIds) => this.hubClient.sendMessage({
nodeIds: recipientNodeIds,
metadata: {
analysisId,
},
}, payload));
}
}

0 comments on commit e3e9efa

Please sign in to comment.