Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code review for the notifications aggregator service. #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
74 changes: 74 additions & 0 deletions src/server/NotificationServiceHTTPServer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import * as http from 'http';
import { NotificationServiceHTTPServer } from './NotificationServiceHTTPServer';

describe('NotificationServiceHTTPServer', () => {
let server: NotificationServiceHTTPServer;

beforeEach(() => {
server = new NotificationServiceHTTPServer(8080, console);
});

afterEach(() => {
// Clean up any resources used by the server
server.server.close();
});

it('should set up the server and connect to the cache service', async () => {
await server.setupServer(8080);

expect(server.server).toBeInstanceOf(http.Server);
expect(server.cacheService).toBeDefined();
});

it('should handle incoming requests', async () => {
const request: http.IncomingMessage = {} as http.IncomingMessage;
const response: http.ServerResponse = {} as http.ServerResponse;

await server.request_handler(request, response);

// Add your assertions here
});

it('should handle notification POST requests', async () => {
const request: http.IncomingMessage = {} as http.IncomingMessage;
const response: http.ServerResponse = {} as http.ServerResponse;

await server.handleNotificationPostRequest(request, response);

// Add your assertions here
});

it('should handle client GET requests', async () => {
const request: http.IncomingMessage = {} as http.IncomingMessage;
const response: http.ServerResponse = {} as http.ServerResponse;

await server.handleClientGetRequest(request, response);

// Add your assertions here
});

it('should handle notification DELETE requests', async () => {
const request: http.IncomingMessage = {} as http.IncomingMessage;
const response: http.ServerResponse = {} as http.ServerResponse;

await server.handleNotificationDeleteRequest(request, response);

// Add your assertions here
});

it('should send a message to the WebSocket server', () => {
const message = 'Hello, WebSocket server!';

server.send_to_websocket_server(message);

// Add your assertions here
});

it('should connect to the WebSocket server', async () => {
const wss_url = 'ws://localhost:8081';

await server.connect_to_websocket_server(wss_url);

// Add your assertions here
});
});
26 changes: 17 additions & 9 deletions src/server/NotificationServiceHTTPServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import { WebSocketServerHandler } from './WebSocketServerHandler';
* @class NotificationServiceHTTPServer
*/
export class NotificationServiceHTTPServer {
private readonly cacheService: CacheService;
private readonly server: http.Server;
public cacheService: CacheService;
public server: http.Server;
public connection: any;
public client: any;
public logger: any;
Expand All @@ -23,7 +23,6 @@ export class NotificationServiceHTTPServer {
/**
* Creates an instance of NotificationServiceHTTPServer.
* @param {number} port - The port where the HTTP server will listen.
* @param {string[]} pod_url - The location of the Solid Pod from which the notifications are retrieved.
* @param {*} logger - The logger object.
* @memberof NotificationServiceHTTPServer
*/
Expand All @@ -40,7 +39,7 @@ export class NotificationServiceHTTPServer {
this.websocket_handler = new WebSocketServerHandler(this.websocket_server);
this.setupServer(port);
this.connect_to_websocket_server('ws://localhost:8085/');
this.websocket_handler.handle_communication(this.cacheService);
this.websocket_handler.handle_communication();

}
/**
Expand All @@ -49,7 +48,7 @@ export class NotificationServiceHTTPServer {
* @param {number} port - The port where the HTTP server will listen.
* @memberof NotificationServiceHTTPServer
*/
private async setupServer(port: number) {
public async setupServer(port: number) {
this.server.listen(port, () => {
this.logger.info(`Server listening on port ${port}`);
});
Expand Down Expand Up @@ -85,7 +84,7 @@ export class NotificationServiceHTTPServer {
* @returns {Promise<void>}
* @memberof NotificationServiceHTTPServer
*/
private async handleNotificationPostRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
public async handleNotificationPostRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
let body = '';
request.on('data', (chunk) => {
body += chunk.toString();
Expand Down Expand Up @@ -134,7 +133,7 @@ export class NotificationServiceHTTPServer {
* @returns {Promise<void>}
* @memberof NotificationServiceHTTPServer
*/
private async handleClientGetRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
public async handleClientGetRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
this.logger.info(`GET request received for ${request.url}`)
console.log(`GET request received for ${request.url}`);
const parsed_url = url.parse(request.url!, true);
Expand All @@ -156,7 +155,7 @@ export class NotificationServiceHTTPServer {
* @returns {Promise<void>} - A promise which responses nothing.
* @memberof NotificationServiceHTTPServer
*/
private async handleNotificationDeleteRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
public async handleNotificationDeleteRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
const parsed_url = url.parse(request.url!, true);
const query_parameters = parsed_url.query;
const event_time = query_parameters.event_time as string | undefined || 'Anonymous';
Expand All @@ -165,6 +164,11 @@ export class NotificationServiceHTTPServer {
response.end('OK');
}

/**
* Sends a message to the WebSocket server.
* @param {string} message - The message to send.
* @memberof NotificationServiceHTTPServer
*/
public send_to_websocket_server(message: string) {
if (this.connection.connected) {
this.connection.sendUTF(message);
Expand All @@ -175,7 +179,11 @@ export class NotificationServiceHTTPServer {
});
}
}

/**
* Connects to the WebSocket server.
* @param {string} wss_url - The URL of the WebSocket server.
* @memberof NotificationServiceHTTPServer
*/
public async connect_to_websocket_server(wss_url: string) {
this.client.connect(wss_url, 'solid-stream-notifications-aggregator');
this.client.on('connect', (connection: typeof websocket.connection) => {
Expand Down
53 changes: 53 additions & 0 deletions src/server/WebSocketServerHandler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// import { WebSocketServerHandler } from './WebSocketServerHandler';

// describe('WebSocketServerHandler', () => {
// let websocketServerHandler: WebSocketServerHandler;

// beforeEach(() => {
// // Create a mock WebSocket server
// const websocketServerMock: any = {
// on: jest.fn(),
// };

// websocketServerHandler = new WebSocketServerHandler(websocketServerMock);
// });

// it('should handle communication for the WebSocket server', async () => {
// // Mock WebSocket server events
// const connectCallback = websocketServerHandler.websocket_server.on.mock.calls[0][1];
// const requestCallback = websocketServerHandler.websocket_server.on.mock.calls[1][1];

// // Mock WebSocket connection and message events
// const connectionMock: any = {
// on: jest.fn(),
// };
// const messageMock: any = {
// type: 'utf8',
// utf8Data: JSON.stringify({ subscribe: ['stream1', 'stream2'] }),
// };

// // Call the handle_communication method
// await websocketServerHandler.handle_communication();

// // Simulate WebSocket server events
// connectCallback();
// requestCallback({ accept: () => connectionMock });

// // Simulate WebSocket connection message event
// connectionMock.on.mock.calls[0][1](messageMock);

// // Expectations
// expect(websocketServerHandler.subscribe_notification.subscribe).toHaveBeenCalledTimes(2);
// expect(websocketServerHandler.set_connections).toHaveBeenCalledTimes(2);
// expect(connectionMock.send).toHaveBeenCalledTimes(1);
// });

// it('should set connections for the WebSocket server', () => {
// const subscribedStream = 'stream1';
// const connectionMock: any = {};

// websocketServerHandler.set_connections(subscribedStream, connectionMock);

// expect(websocketServerHandler.websocket_connections.get(subscribedStream)).toBe(connectionMock);
// });
// });
77 changes: 54 additions & 23 deletions src/server/WebSocketServerHandler.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,84 @@
import * as WebSocket from 'websocket';
import { CacheService } from '../service/CacheService';
import { SubscribeNotification } from '../service/SubscribeNotification';

/**
* This class is used to handle the WebSocket server.
* @class WebSocketServerHandler
*/
export class WebSocketServerHandler {

public websocket_server: any;
public websocket_connections: Map<string, WebSocket>;
public websocket_connections: Map<string, WebSocket[]>;
public subscribe_notification: SubscribeNotification;


/**
* Creates an instance of WebSocketServerHandler.
* @param {WebSocket.server} websocket_server - The WebSocket server.
* @memberof WebSocketServerHandler
*/
constructor(websocket_server: WebSocket.server) {
this.websocket_server = websocket_server;
this.websocket_connections = new Map<string, WebSocket>();
this.websocket_connections = new Map<string, WebSocket[]>();
this.subscribe_notification = new SubscribeNotification();
}

public async handle_communication(cache_service: CacheService) {
/**
* Handles the communication for the WebSocket server, including subscribing to the notification server and sending messages to the client.
* @memberof WebSocketServerHandler
*/
public async handle_communication() {
console.log(`Handling the communication for the WebSocket server.`);
this.websocket_server.on('connect', (connection: any) => {
console.log(`Connection received from the client with address: ${connection.remoteAddress}`);
this.websocket_server.on('connect', () => {
console.log(`Connected to the WebSocket server.`);
});

this.websocket_server.on('request', (request: any) => {
const connection = request.accept('solid-stream-notifications-aggregator', request.origin);
connection.on('message', (message: any) => {
connection.on('message', async (message: any) => {
if (message.type === 'utf8') {
const message_utf8 = message.utf8Data;
const ws_message = JSON.parse(message_utf8);
if (Object.keys(ws_message).includes('subscribe')) {
console.log(`Received a subscribe message from the client.`);
let stream_to_subscribe = ws_message.subscribe;
for (let stream of stream_to_subscribe){
this.subscribe_notification.subscribe(stream);
console.log(`Subscribed to the stream: ${stream}`);
this.set_connections(stream, connection);
const streams_to_subscribe = ws_message.subscribe;
for (const stream_to_subscribe of streams_to_subscribe){
this.set_connections(stream_to_subscribe, connection);
}
}
else if (Object.keys(ws_message).includes('event')) {
let connection = this.websocket_connections.get(ws_message.stream);
if (connection !== undefined) {
connection.send(JSON.stringify(ws_message));
else if (Object.keys(ws_message).includes('event')){
console.log(`Received an event message from the notification server.`);
for (const [stream, connection] of this.websocket_connections){
if(ws_message.stream === stream){
for(const conn of connection){
conn.send(JSON.stringify(ws_message));
}
}
}
}
}
});
connection.on('close', (reasonCode: number, description: string) => {
console.log(`Peer ${connection.remoteAddress} disconnected.`);
console.log(`Reason code: ${reasonCode}`);
console.log(`Description: ${description}`);

});
});
}

/**
* Sets the connections for the WebSocket server's Map.
* @param {string} subscribed_stream - The stream to subscribe to.
* @param {WebSocket} connection - The WebSocket connection.
* @memberof WebSocketServerHandler
*/
public set_connections(subscribed_stream: string, connection: WebSocket): void {
this.websocket_connections.set(subscribed_stream, connection);
if (!this.websocket_connections.has(subscribed_stream)) {
this.subscribe_notification.subscribe(subscribed_stream);
this.websocket_connections.set(subscribed_stream, [connection]);
}
else {
const connections = this.websocket_connections.get(subscribed_stream);
if (connections !== undefined) {
connections.push(connection);
this.websocket_connections.set(subscribed_stream, connections);
}

}
}
}
11 changes: 0 additions & 11 deletions src/service/CacheService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,4 @@ describe('CacheService', () => {
expect(is_disconnected).toBe(true);
});

it('should_describe_the_cache', async () => {
const status = await cacheService.get_status();
expect(status).toBe('wait');
});

it('test_number_of_runs', async () => {
await cacheService.set('key', 'value');
for (let i = 0; i < 10000; i++) {
console.log(await cacheService.get('key'));
}
});
});
Loading