Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Revise Message Sending Functionality #76

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions src/server/message/hub/hub.message.controller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
/* eslint-disable max-classes-per-file */
import {
BadRequestException,
Body, Controller, HttpCode, Param, Post, UseGuards,
} from '@nestjs/common';
import { ArrayNotEmpty, IsObject } from 'class-validator';
import { AuthGuard } from '../../auth/auth.guard';
import { HubMessageProducerService } from './hub.message.producer.service';
import { HubMessageProducerService, MessageProducerError } from './hub.message.producer.service';

class SendMessageBody {
@ArrayNotEmpty()
recipients: Array<string>;

@IsObject()
message: Record<string, any>;
}

/**
* Bundles API endpoints related to message exchange with the central side (hub).
*/
@Controller('messages')
@Controller('analyses/:id/messages')
@UseGuards(AuthGuard)
export class HubMessageController {
private readonly hubMessageProducerService: HubMessageProducerService;
Expand All @@ -16,9 +27,25 @@ export class HubMessageController {
this.hubMessageProducerService = hubMessageProducerService;
}

@Post('/broadcasts/:analysisId')
@Post('/broadcasts')
@HttpCode(201)
async sendNodeBroadcast(@Param('analysisId') analysisId: string, @Body() messagePayload: Record<string, any>) {
async sendNodeBroadcast(@Param('id') analysisId: string, @Body() messagePayload: Record<string, any>) {
return this.hubMessageProducerService.produceNodeBroadcastMessage(analysisId, messagePayload);
}

@Post('/')
@HttpCode(201)
async send(@Param('id') analysisId: string, @Body() data: SendMessageBody) {
return this.hubMessageProducerService.produceMessage(data.recipients, analysisId, data.message)
.catch((err) => {
if (err instanceof MessageProducerError) {
if (err.name === 'INVALID_RECIPIENTS') {
throw new BadRequestException('one or more of the given recipient node ids are invalid', {
cause: err,
description: err.message,
});
}
}
});
}
}
64 changes: 64 additions & 0 deletions src/server/message/hub/hub.message.producer.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,36 @@
/* eslint-disable max-classes-per-file */
import { Injectable } from '@nestjs/common';
import { HubClient } from './hub.client';

type ErrorName =
| 'INVALID_RECIPIENTS';

/**
* Describes an error that occured while producing a message.
*/
export class MessageProducerError extends Error {
override name: ErrorName;

override message: string;

override cause: any;

constructor({
name,
message,
cause,
}: {
name: ErrorName;
message: string;
cause?: any;
}) {
super();
this.name = name;
this.message = message;
this.cause = cause;
}
}

/**
* A service for sending out messages to the central side (hub).
*/
Expand All @@ -20,6 +50,7 @@ export class HubMessageProducerService {
* @returns
*/
async produceNodeBroadcastMessage(analysisId: string, payload: Record<string, any>): Promise<void> {
// Note: If an analysis does not exist this will still output an empty array of nodes!
return this.hubClient.getAnalysisNodes(analysisId)
.then((nodes) => ({
nodeIds: nodes
Expand All @@ -31,4 +62,37 @@ export class HubMessageProducerService {
}))
.then((header) => this.hubClient.sendMessage(header, payload));
}

/**
* Produces a message that is sent to all given recipients. The message will be associated with the given analysis ID.
*
* @param recipientNodeIds identifies all recipients of this message
* @param analysisId identifies the analysis that this message is associated with
* @param payload the actual message that is sent
* @returns
*/
async produceMessage(recipientNodeIds: Array<string>, analysisId: string, payload: Record<string, any>): Promise<void> {
// Note: If an analysis does not exist this will still output an empty array of nodes!
return this.hubClient.getAnalysisNodes(analysisId)
.then((nodes) => nodes
.filter((n) => n.node.robot_id !== undefined && n.node.robot_id !== null)
.map((n) => n.node.robot_id || ''))
.then((paticipatingNodeIds) => {
const invalidRecipients = recipientNodeIds.filter((rn) => !paticipatingNodeIds.includes(rn));
if (invalidRecipients.length > 0) {
throw new MessageProducerError({
name: 'INVALID_RECIPIENTS',
message: `recipient node ids '[${invalidRecipients}]' are invalid for analysis '${analysisId}' since they are no participants`,
});
} else {
return recipientNodeIds;
}
})
.then((recipientNodeIds) => this.hubClient.sendMessage({
nodeIds: recipientNodeIds,
metadata: {
analysisId,
},
}, payload));
}
}