Skip to content

Commit

Permalink
Add subscription status message after subscribe/unsubscribe when usin…
Browse files Browse the repository at this point in the history
…g IoTDB

    added tests
    send subscription status message after subscribe/unsubscribe

Signed-off-by: Christian Muehlbauer <[email protected]>
  • Loading branch information
chrizmc committed Dec 2, 2024
1 parent c76f57a commit 21e519b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 8 deletions.
4 changes: 2 additions & 2 deletions cdsp/information-layer/handlers/src/HandlerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ export abstract class HandlerBase {
}

/**
* Generic function to create or remove a subscription message.
* Generic function to create a subscription status message.
* @param type - Type of subscription message.
* @param message - The original message from client.
* @param status - The status of the subscription.
* @returns - The transformed message.
*/
protected createSubscribeMessage(
protected createSubscribeStatusMessage(
type: "subscribe" | "unsubscribe",
message: Pick<Message, "id" | "tree" | "uuid">,
status: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class IoTDBHandler extends HandlerBase {
throw new Error("Invalid database configuration.");
}
this.session = new Session();
this.subscriptionSimulator = new SubscriptionSimulator(this.session, this.sendMessageToClient, this.createUpdateMessage);
this.subscriptionSimulator = new SubscriptionSimulator(this.session, this.sendMessageToClient, this.createUpdateMessage, this.createSubscribeStatusMessage);
}

async authenticateAndConnect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jest.mock("../config/database-params", () => ({
}));

import { SubscriptionSimulator } from './SubscriptionSimulator';
import { WebSocketWithId, Message } from '../../../utils/data-types';
import { WebSocketWithId, Message, MessageBase } from '../../../utils/data-types';
import { Session } from './Session';
import { SessionDataSet } from "../utils/SessionDataSet";
import { transformSessionDataSet } from "../utils/database-helper";
Expand All @@ -35,8 +35,19 @@ describe('SubscriptionSimulator', () => {
beforeEach(() => {
mockSession = new Session();

const createSubscribeStatusMessageMock = jest.fn<
MessageBase,
["subscribe" | "unsubscribe", Pick<Message, "id" | "tree" | "uuid">, string]
>((type, message, status) => ({
type, // Properly narrowed type
id: message.id,
tree: message.tree,
uuid: message.uuid,
status,
}));

sendMessageToClientMock = jest.fn();
simulator = new SubscriptionSimulator(mockSession, sendMessageToClientMock, jest.fn());
simulator = new SubscriptionSimulator(mockSession, sendMessageToClientMock, jest.fn(), createSubscribeStatusMessageMock);

mockWebSocket = { id: 'ws1' } as WebSocketWithId;
mockMessage = { id: 'vehicle123', tree: 'VSS' } as Message;
Expand All @@ -56,6 +67,28 @@ describe('SubscriptionSimulator', () => {
* subscribe
*/

test('calls sendMessageToClient if subscription was successful', () => {

// Act: Call subscribe with a new WebSocket and message key
simulator.subscribe(mockMessage, mockWebSocket);

// Assert: Verify sendMessageToClient was called
expect(sendMessageToClientMock).toHaveBeenCalled();

// Capture the actual message passed to sendMessageToClient
const [webSocketArg, messageArg] = sendMessageToClientMock.mock.calls[0];

// Check the WebSocket argument
expect(webSocketArg).toBe(mockWebSocket);

// Check the message content
expect(messageArg).toEqual(
expect.objectContaining({
status: "succeed",
tree: "VSS",
})
);
});

test('calls sendMessageToClient if subscription with same WebSocket already exists', () => {
// Arrange: Manually add a subscription to simulate the "already exists" condition
Expand Down Expand Up @@ -116,6 +149,31 @@ describe('SubscriptionSimulator', () => {
/*
* unsubscribe
*/
test('calls sendMessageToClient if unsubscribe was successful', () => {

// Arrange: Add a subscription with same key and same websocket
simulator['subscriptions'].set(mockKey, [mockWebSocket]);

// Act: Call subscribe with a new WebSocket and message key
simulator.unsubscribe(mockMessage, mockWebSocket);

// Assert: Verify sendMessageToClient was called
expect(sendMessageToClientMock).toHaveBeenCalled();

// Capture the actual message passed to sendMessageToClient
const [webSocketArg, messageArg] = sendMessageToClientMock.mock.calls[0];

// Check the WebSocket argument
expect(webSocketArg).toBe(mockWebSocket);

// Check the message content
expect(messageArg).toEqual(
expect.objectContaining({
status: "succeed",
tree: "VSS",
})
);
});

test('remove websocket from existing subscription if websocket and subscription exist', () => {
// Arrange: Add a subscription with same key and same websocket + 1 other websocket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ export class SubscriptionSimulator {
private subscriptions: SubscriptionMap = new Map();
private sendMessageToClient: ( ws: WebSocketWithId, message: Message | MessageBase | ErrorMessage) => void;
private createUpdateMessage: (id: string, tree: string, uuid: string, nodes: Array<{ name: string; value: any }>) => Message;
private createSubscribeStatusMessage: (type: "subscribe" | "unsubscribe", message: Pick<Message, "id" | "tree" | "uuid">, status: string) => MessageBase;

constructor(session: Session,
sendMessageToClient: (ws: WebSocketWithId, message: Message | MessageBase | ErrorMessage) => void,
createUpdateMessage: (id: string, tree: string, uuid: string, nodes: Array<{ name: string; value: any }>) => Message)
createUpdateMessage: (id: string, tree: string, uuid: string, nodes: Array<{ name: string; value: any }>) => Message,
createSubscribeStatusMessage: (type: "subscribe" | "unsubscribe", message: Pick<Message, "id" | "tree" | "uuid">, status: string) => MessageBase)
{
this.session = session;
this.notifyDatabaseChanges = this.notifyDatabaseChanges.bind(this);
this.sendMessageToClient = sendMessageToClient;
this.createUpdateMessage = createUpdateMessage;
this.createSubscribeStatusMessage = createSubscribeStatusMessage;
}

/**
Expand Down Expand Up @@ -66,6 +69,12 @@ export class SubscriptionSimulator {
this.timeIntervalLowerLimit = Date.now();
logMessage("started timer");
}

this.sendMessageToClient(
wsOfNewSubscription,
this.createSubscribeStatusMessage("subscribe", message, "succeed")
);

logMessage("subscribed");
this.logSubscriptions();
}
Expand Down Expand Up @@ -104,6 +113,12 @@ export class SubscriptionSimulator {
}

this.removeTimerIfNoSubscription();

this.sendMessageToClient(
wsOfSubscriptionToBeDeleted,
this.createSubscribeStatusMessage("unsubscribe", message, "succeed")
);

logMessage("unsubscribed");
this.logSubscriptions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ export class RealmDBHandler extends HandlerBase {

this.sendMessageToClient(
ws,
this.createSubscribeMessage("subscribe", message, "succeed")
this.createSubscribeStatusMessage("subscribe", message, "succeed")
);

logWithColor(
Expand Down Expand Up @@ -275,7 +275,7 @@ export class RealmDBHandler extends HandlerBase {

this.sendMessageToClient(
ws,
this.createSubscribeMessage("unsubscribe", message, "succeed")
this.createSubscribeStatusMessage("unsubscribe", message, "succeed")
);
} else {
this.sendMessageToClient(
Expand Down

0 comments on commit 21e519b

Please sign in to comment.