Skip to content

Commit

Permalink
feat: add self discovery functionality
Browse files Browse the repository at this point in the history
Adds another endpoint that allows to discovers the node within a set
of participating analysis nodes which the message broker belongs to.
This is related to a feature that allows broadcasting a message or
sending a message to arbitrary participating analysis nodes.
  • Loading branch information
DiCanio committed Mar 27, 2024
1 parent cd3527c commit eba2722
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 21 deletions.
43 changes: 38 additions & 5 deletions src/server/discovery/discovery.controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {
Controller, Get, HttpCode, Inject, Param, UseGuards,
BadGatewayException,
Controller, Get, HttpCode, Inject, NotFoundException, Param, UseGuards,
} from '@nestjs/common';
import type { AnalysisNodeDiscoveryResult, DiscoveryService } from './discovery.service';
import { DISCOVERY_SERVICE } from './discovery.service';
import type { AnalysisNodeDiscoveryResult } from './discovery.service';
import { DISCOVERY_SERVICE, DiscoveryError, DiscoveryService } from './discovery.service';
import { AuthGuard } from '../auth/auth.guard';

/**
Expand All @@ -20,7 +21,39 @@ export class DiscoveryController {
// TODO: add ETag later on to allow for caching
@Get('participants')
@HttpCode(200)
async discoverParticipants(@Param('id') analysisId: string): Promise<AnalysisNodeDiscoveryResult[]> {
return this.discoveryService.discoverParticipatingAnalysisNodes(analysisId);
async discoverParticipants(@Param('id') analysisId: string): Promise<void | AnalysisNodeDiscoveryResult[]> {
return this.discoveryService.discoverParticipatingAnalysisNodes(analysisId)
.catch((err) => {
if (err instanceof DiscoveryError) {
if (err.name === 'FAILED_TO_FETCH_ANALYSIS_NODES') {
throw new BadGatewayException('could not fetch analysis nodes', {
cause: err,
description: err.message,
});
}
}
});
}

@Get('participants/self')
@HttpCode(200)
async discoverSelf(@Param('id') analysisId: string): Promise<void | AnalysisNodeDiscoveryResult> {
return this.discoveryService.discoverSelf(analysisId)
.catch((err) => {
if (err instanceof DiscoveryError) {
if (err.name === 'FAILED_TO_FETCH_ANALYSIS_NODES') {
throw new BadGatewayException('could not fetch analysis nodes', {
cause: err,
description: err.message,
});
} else if (err.name === 'SELF_NOT_FOUND') {
// TODO: this can be discussed
throw new NotFoundException('could not find own identity in analysis', {
cause: err,
description: err.message,
});
}
}
});
}
}
7 changes: 5 additions & 2 deletions src/server/discovery/discovery.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ import { DISCOVERY_SERVICE, HubBackedDiscoveryService } from './discovery.servic
},
{
provide: DISCOVERY_SERVICE,
useFactory: async (hubApiClient: HubApiClient) => new HubBackedDiscoveryService(hubApiClient),
inject: [HubApiClient],
useFactory: async (hubApiClient: HubApiClient, configService: ConfigService) => {
const ownNodeId = configService.getOrThrow<string>('hub.auth.robotId');
return new HubBackedDiscoveryService(hubApiClient, ownNodeId);
},
inject: [HubApiClient, ConfigService],
},
],
})
Expand Down
61 changes: 57 additions & 4 deletions src/server/discovery/discovery.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
import { HttpException, HttpStatus, Injectable } from '@nestjs/common';
/* eslint-disable max-classes-per-file */
import { Injectable } from '@nestjs/common';
import { APIClient as HubApiClient } from '@privateaim/core';
import type { AnalysisNode, CollectionResourceResponse } from '@privateaim/core';

type ErrorName =
| 'FAILED_TO_FETCH_ANALYSIS_NODES'
| 'SELF_NOT_FOUND';

export class DiscoveryError extends Error {
override name: ErrorName;

override message: string;

override cause: any;

constructor({
name,
message,
cause,
}: {
name: ErrorName;
message: string;
cause?: any;
}) {
super();
this.name = name;
this.message = message;
this.cause = cause;
}
}

/**
* Describes the type of a participating analysis node.
*/
Expand All @@ -26,6 +54,7 @@ export const DISCOVERY_SERVICE = 'DISCOVERY SERVICE';
*/
export interface DiscoveryService {
discoverParticipatingAnalysisNodes(analysisId: string): Promise<AnalysisNodeDiscoveryResult[]>;
discoverSelf(analysisId: string): Promise<AnalysisNodeDiscoveryResult>;
}

function extractNodeType(analysisNode: AnalysisNode): NodeType {
Expand All @@ -46,8 +75,11 @@ function extractNodeType(analysisNode: AnalysisNode): NodeType {
export class HubBackedDiscoveryService implements DiscoveryService {
private readonly hubApiClient: HubApiClient;

constructor(hubApiClient: HubApiClient) {
private readonly myNodeId: string;

constructor(hubApiClient: HubApiClient, myNodeId: string) {
this.hubApiClient = hubApiClient;
this.myNodeId = myNodeId;
}

/**
Expand All @@ -72,10 +104,31 @@ export class HubBackedDiscoveryService implements DiscoveryService {
nodeType: extractNodeType(analysisNode),
})))
.catch((error) => {
// TODO: maybe introduce another error type and translate to an HTTP error at the controller level.
throw new HttpException('analysis nodes cannot be fetched from central side (hub)', HttpStatus.BAD_GATEWAY, {
throw new DiscoveryError({
name: 'FAILED_TO_FETCH_ANALYSIS_NODES',
message: `nodes for analysis '${analysisId}' cannot be fetched from central side (hub)`,
cause: error,
});
});
}

/**
* Discovers the participating node within an analysis that this message broker belongs to.
*
* @param analysisId Identifies the analysis.
* @returns A promise of the participating analysis node that this message broker belongs to.
*/
async discoverSelf(analysisId: string): Promise<AnalysisNodeDiscoveryResult> {
return this.discoverParticipatingAnalysisNodes(analysisId)
.then((nodes) => {
const selfNode = nodes.find((n) => n.nodeId === this.myNodeId);
if (selfNode !== undefined) {
return selfNode;
}
throw new DiscoveryError({
name: 'SELF_NOT_FOUND',
message: `cannot determine own identity for analysis '${analysisId}'`,
});
});
}
}
72 changes: 69 additions & 3 deletions test/unit/server/discovery/discovery.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import request from 'supertest';
import { Test } from '@nestjs/testing';
import type { INestApplication } from '@nestjs/common';
import { HttpStatus, type INestApplication } from '@nestjs/common';
import { when } from 'jest-when';
import { APIClient } from '@authup/core';
import { DISCOVERY_SERVICE, NodeType } from '../../../../src/server/discovery/discovery.service';
import { DISCOVERY_SERVICE, DiscoveryError, NodeType } from '../../../../src/server/discovery/discovery.service';
import type { DiscoveryService } from '../../../../src/server/discovery/discovery.service';
import { AuthGuard } from '../../../../src/server/auth/auth.guard';
import { DiscoveryController } from '../../../../src/server/discovery/discovery.controller';

describe('Discovery Controller', () => {
const mockedDiscoveryService: DiscoveryService = {
discoverParticipatingAnalysisNodes: jest.fn(),
discoverSelf: jest.fn(),
};
let app: INestApplication;

Expand Down Expand Up @@ -63,7 +64,7 @@ describe('Discovery Controller', () => {

await request(app.getHttpServer())
.get(`/analyses/${testAnalysisId}/participants`)
.expect(200)
.expect(HttpStatus.OK)
.then((res) => {
expect(res.body).toHaveLength(2);
expect(res.body[0].nodeId).toBe('foo');
Expand All @@ -72,4 +73,69 @@ describe('Discovery Controller', () => {
expect(res.body[1].nodeType).toBe(NodeType.DEFAULT);
});
});

it('/GET participants should return 502 if participating analysis nodes cannot be fetched from central side', async () => {
const testAnalysisId: string = '7e8cd08c-f536-4a44-bbf3-7d235e9cebc9';

when(mockedDiscoveryService.discoverParticipatingAnalysisNodes).calledWith(testAnalysisId)
.mockRejectedValue(new DiscoveryError({
name: 'FAILED_TO_FETCH_ANALYSIS_NODES',
message: 'something describing the error',
}));

await request(app.getHttpServer())
.get(`/analyses/${testAnalysisId}/participants`)
.expect(HttpStatus.BAD_GATEWAY);
});

it('/GET self should get participation information about the message broker itself from the underlying service', async () => {
const testAnalysisId: string = '7e8cd08c-f536-4a44-bbf3-7d235e9cebc9';

when(mockedDiscoveryService.discoverSelf).calledWith(testAnalysisId)
.mockResolvedValue({
nodeId: 'foo',
nodeType: NodeType.DEFAULT,
});

await request(app.getHttpServer())
.get(`/analyses/${testAnalysisId}/participants/self`)
.expect(HttpStatus.OK)
.then((res) => {
expect(res.body).toBeInstanceOf(Object);
expect(res.body.nodeId).toBe('foo');
expect(res.body.nodeType).toBe(NodeType.DEFAULT);
});
});

it('/GET self should return 404 if self cannot be found', async () => {
const testAnalysisId: string = '7e8cd08c-f536-4a44-bbf3-7d235e9cebc9';

when(mockedDiscoveryService.discoverSelf).calledWith(testAnalysisId)
.mockRejectedValue(
new DiscoveryError({
name: 'SELF_NOT_FOUND',
message: 'something describing the error',
}),
);

await request(app.getHttpServer())
.get(`/analyses/${testAnalysisId}/participants/self`)
.expect(HttpStatus.NOT_FOUND);
});

it('/GET self should return 502 if participating analysis nodes cannot be fetched from central side', async () => {
const testAnalysisId: string = '7e8cd08c-f536-4a44-bbf3-7d235e9cebc9';

when(mockedDiscoveryService.discoverSelf).calledWith(testAnalysisId)
.mockRejectedValue(
new DiscoveryError({
name: 'FAILED_TO_FETCH_ANALYSIS_NODES',
message: 'something describing the error',
}),
);

await request(app.getHttpServer())
.get(`/analyses/${testAnalysisId}/participants/self`)
.expect(HttpStatus.BAD_GATEWAY);
});
});
Loading

0 comments on commit eba2722

Please sign in to comment.