Skip to content

Commit

Permalink
Merge pull request #148 from ainblockchain/feature/platfowner/feature
Browse files Browse the repository at this point in the history
Add typedoc comments for event manager modules
  • Loading branch information
platfowner authored Sep 25, 2023
2 parents 03e40de + fe120e7 commit c12a712
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 31 deletions.
61 changes: 56 additions & 5 deletions src/event-manager/event-callback-manager.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
import EventFilter from './event-filter';
import Subscription from './subscription';
import { BlockchainEventTypes, EventConfigType, BlockchainEventCallback, FilterDeletedEventCallback, FilterDeletedEvent } from '../types';
import { BlockchainEventTypes, BlockchainEventConfig, BlockchainEventCallback, BlockchainErrorCallback, FilterDeletedEventCallback, FilterDeletedEvent } from '../types';
import { PushId } from '../ain-db/push-id';
import { FAILED_TO_REGISTER_ERROR_CODE } from '../constants';

/**
* A class for managing event callbacks.
*/
export default class EventCallbackManager {
/** The event filter map from filter ID to event filter. */
private readonly _filters: Map<string, EventFilter>;
/** The subscription map from filter ID to subscription. */
private readonly _filterIdToSubscription: Map<string, Subscription>;

/**
* Creates a new EventCallbackManager object.
*/
constructor() {
this._filters = new Map<string, EventFilter>();
this._filterIdToSubscription = new Map<string, Subscription>();
}

/**
* Builds a filter ID.
* @returns {string} The filter ID built.
*/
buildFilterId() {
return PushId.generate();
}

/**
* Builds a subscription ID.
* @returns {string} The subscription ID built.
*/
buildSubscriptionId() {
return PushId.generate();
}

/**
* Emits a blockchain event to trigger callback functions.
* @param {string} filterId The filter ID.
* @param {BlockchainEventTypes} eventType The blockchain event type.
* @param {any} payload The payload of the event.
*/
emitEvent(filterId: string, eventType: BlockchainEventTypes, payload: any) {
const subscription = this._filterIdToSubscription.get(filterId);
if (!subscription) {
Expand All @@ -33,6 +55,12 @@ export default class EventCallbackManager {
subscription.emit('event', payload);
}

/**
* Emits an error to trigger callback functions.
* @param {string} filterId The filter ID.
* @param {number} code The error code.
* @param {string} errorMessage The error message.
*/
emitError(filterId: string, code: number, errorMessage: string) {
const subscription = this._filterIdToSubscription.get(filterId);
if (!subscription) {
Expand All @@ -47,7 +75,13 @@ export default class EventCallbackManager {
});
}

createFilter(eventTypeStr: string, config: EventConfigType): EventFilter {
/**
* Creates a new EventFilter object and adds it to the event filter map.
* @param {string} eventTypeStr The event type string.
* @param {BlockchainEventConfig} config The blockchain event configuration.
* @returns {EventFilter} The event filter object created.
*/
createFilter(eventTypeStr: string, config: BlockchainEventConfig): EventFilter {
const eventType = eventTypeStr as BlockchainEventTypes;
if (!Object.values(BlockchainEventTypes).includes(eventType) ||
eventType === BlockchainEventTypes.FILTER_DELETED) {
Expand All @@ -62,21 +96,34 @@ export default class EventCallbackManager {
return filter;
}

getFilter(filterId): EventFilter {
/**
* Looks up an event filter with a filter ID.
* @param {string} filterId The filter ID.
* @returns {EventFilter} The event filter looked up.
*/
getFilter(filterId: string): EventFilter {
const filter = this._filters.get(filterId);
if (!filter) {
throw Error(`Non-existent filter ID (${filterId})`);
}
return filter;
}

/**
* Creates a new Subscription object.
* @param {EventFilter} filter The event filter.
* @param {BlockchainEventCallback} eventCallback The blockchain event callback function.
* @param {BlockchainErrorCallback} errorCallback The blockchain error callback function.
* @param {FilterDeletedEventCallback} filterDeletedEventCallback The filter deletion event callback function.
* @returns {Subscription} The subscription object created.
*/
createSubscription(
filter: EventFilter,
eventCallback?: BlockchainEventCallback,
errorCallback?: (error: any) => void,
errorCallback?: BlockchainErrorCallback,
filterDeletedEventCallback: FilterDeletedEventCallback = (payload) => console.log(
`Event filter (id: ${payload.filter_id}) is deleted because of ${payload.reason}`)
) {
): Subscription {
const subscription = new Subscription(filter);
subscription.on(
'filterDeleted', (payload: FilterDeletedEvent) => {
Expand All @@ -94,6 +141,10 @@ export default class EventCallbackManager {
return subscription;
}

/**
* Deletes an event filter.
* @param {string} filterId The event filter ID to delete.
*/
deleteFilter(filterId: string) {
if (!this._filterIdToSubscription.delete(filterId)) {
console.log(`Can't remove the subscription because it can't be found. (${filterId})`);
Expand Down
69 changes: 63 additions & 6 deletions src/event-manager/event-channel-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,36 @@ import {
EventChannelMessage,
BlockchainEventTypes,
EventChannelConnectionOptions,
DisconnectCallback,
DisconnectionCallback,
} from '../types';
import EventFilter from './event-filter';
import EventCallbackManager from './event-callback-manager';

const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000 + 1000; // NOTE: This time must be longer than blockchain event handler heartbeat interval.
const DEFAULT_HANDSHAKE_TIMEOUT_MS = 30000;

/**
* A class for managing event channels and event handling callback functions.
*/
export default class EventChannelClient {
/** The Ain object. */
private readonly _ain: Ain;
/** The event callback manager object. */
private readonly _eventCallbackManager: EventCallbackManager;
/** The web socket client. */
private _wsClient?: WebSocket;
/** The blockchain endpoint URL. */
private _endpointUrl?: string;
/** Whether it's connected or not. */
private _isConnected: boolean;
/** The heartbeat timeout object. */
private _heartbeatTimeout?: ReturnType<typeof setTimeout> | null;

/**
* Creates a new EventChannelClient object.
* @param {Ain} ain The Ain object.
* @param {EventCallbackManager} eventCallbackManager The event callback manager object.
*/
constructor(ain: Ain, eventCallbackManager: EventCallbackManager) {
this._ain = ain;
this._eventCallbackManager = eventCallbackManager;
Expand All @@ -34,7 +48,13 @@ export default class EventChannelClient {
return this._isConnected;
}

connect(connectionOption: EventChannelConnectionOptions, disconnectCallback?: DisconnectCallback) {
/**
* Opens a new event channel.
* @param {EventChannelConnectionOptions} connectionOption The event channel connection options.
* @param {DisconnectionCallback} disconnectionCallback The disconnection callback function.
* @returns {Promise<void>} A promise for the connection success.
*/
connect(connectionOption: EventChannelConnectionOptions, disconnectionCallback?: DisconnectionCallback): Promise<any> {
return new Promise(async (resolve, reject) => {
if (this.isConnected) {
reject(new Error(`Can't connect multiple channels`));
Expand Down Expand Up @@ -86,13 +106,16 @@ export default class EventChannelClient {
});
this._wsClient.on('close', () => {
this.disconnect();
if (disconnectCallback) {
disconnectCallback(this._wsClient);
if (disconnectionCallback) {
disconnectionCallback(this._wsClient);
}
});
})
}

/**
* Closes the current event channel.
*/
disconnect() {
this._isConnected = false;
this._wsClient!.terminate();
Expand All @@ -102,14 +125,22 @@ export default class EventChannelClient {
}
}

/**
* Starts the heartbeat timer for the event channel.
* @param {number} timeoutMs The timeout value in miliseconds.
*/
startHeartbeatTimer(timeoutMs: number) {
this._heartbeatTimeout = setTimeout(() => {
console.log(`Connection timeout! Terminate the connection. All event subscriptions are stopped.`);
this._wsClient!.terminate();
}, timeoutMs);
}

handleEmitEventMessage(messageData) {
/**
* Handles an emit-event message from the event channel.
* @param {any} messageData The payload data of the message.
*/
handleEmitEventMessage(messageData: any) {
const filterId = messageData.filter_id;
if (!filterId) {
throw Error(`Can't find filter ID from message data (${JSON.stringify(messageData, null, 2)})`);
Expand All @@ -126,7 +157,11 @@ export default class EventChannelClient {
this._eventCallbackManager.emitEvent(filterId, eventType, payload);
}

handleEmitErrorMessage(messageData) {
/**
* Handles an emit-error message from the event channel.
* @param {any} messageData The payload data of the message.
*/
handleEmitErrorMessage(messageData: any) {
const code = messageData.code;
if (!code) {
console.log(`Can't find code from message data (${JSON.stringify(messageData, null, 2)})`);
Expand All @@ -145,6 +180,10 @@ export default class EventChannelClient {
this._eventCallbackManager.emitError(filterId, code, errorMessage);
}

/**
* Handles a message from the event channel.
* @param {string} message The message.
*/
handleMessage(message: string) {
try {
const parsedMessage = JSON.parse(message);
Expand All @@ -171,26 +210,44 @@ export default class EventChannelClient {
}
}

/**
* Builds a message to be sent to the event channel.
* @param {EventChannelMessageTypes} messageType The message type.
* @param {any} data The payload data of the msssage.
* @returns
*/
buildMessage(messageType: EventChannelMessageTypes, data: any): EventChannelMessage {
return {
type: messageType,
data: data,
};
}

/**
* Sends a message to the event channel.
* @param {EventChannelMessage} message The message to be sent.
*/
sendMessage(message: EventChannelMessage) {
if (!this._isConnected) {
throw Error(`Failed to send message. Event channel is not connected!`);
}
this._wsClient!.send(JSON.stringify(message));
}

/**
* Sends a register-event-filter messsage to the event channel.
* @param {EventFilter} filter The event filter to register.
*/
registerFilter(filter: EventFilter) {
const filterObj = filter.toObject();
const registerMessage = this.buildMessage(EventChannelMessageTypes.REGISTER_FILTER, filterObj);
this.sendMessage(registerMessage);
}

/**
* Sends a deregister-event-filter messsage to the event channel.
* @param {EventFilter} filter The event filter to deregister.
*/
deregisterFilter(filter: EventFilter) {
const filterObj = filter.toObject();
const deregisterMessage = this.buildMessage(EventChannelMessageTypes.DEREGISTER_FILTER, filterObj);
Expand Down
22 changes: 19 additions & 3 deletions src/event-manager/event-filter.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import { BlockchainEventTypes, EventConfigType } from '../types';
import { BlockchainEventTypes, BlockchainEventConfig } from '../types';

/**
* A class for filtering blockchain events.
*/
export default class EventFilter {
/** The event filter ID. */
public readonly id: string;
/** The blockchain event type. */
public readonly type: BlockchainEventTypes;
public readonly config: EventConfigType;
/** The blockchain event configuration. */
public readonly config: BlockchainEventConfig;

constructor(id: string, type: BlockchainEventTypes, config: EventConfigType) {
/**
* Creates a new EventFilter object.
* @param {string} id The event filter ID object.
* @param {BlockchainEventTypes} type The blockchain event type value.
* @param {BlockchainEventConfig} config The blockchain event configuration object.
*/
constructor(id: string, type: BlockchainEventTypes, config: BlockchainEventConfig) {
this.id = id;
this.type = type;
this.config = config;
}

/**
* Converts to a javascript object.
* @returns {Object} The javascript object.
*/
toObject() {
return {
id: this.id,
Expand Down
Loading

0 comments on commit c12a712

Please sign in to comment.