diff --git a/packages/transport-http/src/subscriptions/sub-types/EventsSubscription.ts b/packages/transport-http/src/subscriptions/sub-types/EventsSubscription.ts deleted file mode 100644 index 9c70da028..000000000 --- a/packages/transport-http/src/subscriptions/sub-types/EventsSubscription.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { WebsocketSubscription } from "./WebsocketSubscription"; - -export class EventsSubscription implements WebsocketSubscription { - id: number - prremoteId?: string - private topic: string - private checkpointArgs: any - private callback: (data: any) => void - - constructor(opts: { - id: number - remoteId?: string - topic: string - checkpointArgs: any - callback: (data: any) => void - }) { - this.id = opts.id - this.remoteId = opts.remoteId - this.topic = opts.topic - this.checkpointArgs = opts.checkpointArgs - this.callback = opts.callback - - const test = new WebsocketSubscription({ - id: 1, - remoteId: "1", - topic: "1", - checkpointArgs: "1", - callback: () => {} - }) - } - - handleMessage(data: any): void { - this.callback(data) - } - - connect(): void { -} \ No newline at end of file diff --git a/packages/transport-http/src/subscriptions/sub-types/WebsocketSubscription.ts b/packages/transport-http/src/subscriptions/sub-types/WebsocketSubscription.ts deleted file mode 100644 index 0ceddc4a0..000000000 --- a/packages/transport-http/src/subscriptions/sub-types/WebsocketSubscription.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { SubscriptionTopic } from "../types" - -export interface WebsocketSubscriptionInfo { - id: number - remoteId?: string - topic: SubscriptionTopic - checkpointArgs: any - callback: (data: any) => void -} - -export abstract class WebsocketSubscription { - private id: number - private remoteId?: string - private topic: T - private checkpointArgs: any - private callback: (data: any) => void - - constructor(opts: WebsocketSubscription) { - this.id = opts.id - this.remoteId = opts.remoteId - this.topic = opts.topic - this.callback = opts.callback - } - - abstract handleMessage(data: any): void - - abstract connect(): void -} \ No newline at end of file diff --git a/packages/transport-http/src/subscriptions/subscription-manager.ts b/packages/transport-http/src/subscriptions/subscription-manager.ts index baaa35fe0..d27bc9d50 100644 --- a/packages/transport-http/src/subscriptions/subscription-manager.ts +++ b/packages/transport-http/src/subscriptions/subscription-manager.ts @@ -14,18 +14,24 @@ import { import {WebSocket} from "../websocket" interface SubscriptionInfo { - // Internal ID for the subscription - id: number - // Remote ID assigned by the server used for message routing and unsubscribing - remoteId?: string - // The topic of the subscription - topic: T - // The checkpoint to resume the subscription from - checkpoint: SubscriptionArguments - // The callback to call when a message is received - callback: (data: any) => void - // Function to update the checkpoint when a message is received - updateCheckpoint: (data: any) => void + // Internal ID for the subscription + id: number + // Remote ID assigned by the server used for message routing and unsubscribing + remoteId?: string + // The topic of the subscription + topic: T + // The checkpoint to resume the subscription from + checkpoint: SubscriptionArguments + // The callback to call when a data is received + onData: (data: any) => void + // The callback to call when an error occurs + onError: (error: Error) => void +} + +interface WebsocketTransportConfig { + hostname: string + reconnectInterval: number + reconnectAttempts: number } export class WebsocketTransport implements SubscriptionTransport { @@ -35,125 +41,154 @@ export class WebsocketTransport implements SubscriptionTransport { // WebSocket is only opened when there are active subscriptions private socket: WebSocket | null = null - constructor(private hostname: string) {} + private config: WebsocketTransportConfig + + private reconnectAttempts = 0 + + constructor(config: WebsocketTransportConfig) { + this.config = { + hostname: config.hostname, + reconnectInterval: config.reconnectInterval || 1000, + reconnectAttempts: config.reconnectAttempts || 10, + } + } // Lazy connect to the socket when the first subscription is made connect() { - // If the socket is already open, do nothing - if (this.socket?.readyState === WebSocket.OPEN) { - return - } + return new Promise((resolve) => { + // If the socket is already open, do nothing + if (this.socket?.readyState === WebSocket.OPEN) { + return + } - // Restore subscriptions if the socket was closed - if (this.socket?.readyState === WebSocket.CLOSED) { - this.subscriptions.forEach(sub => { - this.socket?.send(JSON.stringify(sub)) - }) - } + // Restore subscriptions if the socket was closed + if (this.socket?.readyState === WebSocket.CLOSED) { + this.subscriptions.forEach(sub => { + this.socket?.send(JSON.stringify(sub)) + }) + } + + this.socket = new WebSocket(this.config.hostname) + this.socket.onmessage = event => { + const data = JSON.parse(event.data) as + | MessageResponse + | SubscriptionDataMessage + + if ("action" in data) { + // TODO, waiting for AN team to decide what to do here + } else { + const sub = this.subscriptions.find(sub => sub.remoteId === data.id) + if (!sub) return - this.socket = new WebSocket(this.hostname) - this.socket.onmessage = event => { - const data = JSON.parse(event.data) as - | MessageResponse - | SubscriptionDataMessage - - if ("action" in data) { - // TODO, waiting for AN team to decide what to do here - } else { - const sub = this.subscriptions.find(sub => sub.remoteId === data.id) - if (!sub) return - - // Update the block height to checkpoint for disconnects - this.updateSubscriptionCheckpoint(sub, data) - - // Call the subscription callback - sub.callback(data.data) + // Update the block height to checkpoint for disconnects + this.updateSubscriptionCheckpoint(sub, data) + + // Call the subscription callback + sub.onData(data.data) + } } + this.socket.onclose = () => { + void this.reconnect() + } + this.socket.onerror = e => { + console.error(`WebSocket error: ${e}`) + this.reconnect() + } + + this.socket.onopen = () => { + // Restore subscriptions + Promise.all(this.subscriptions.map(async sub => { + const response = await this.sendSubscribe(sub) + sub.remoteId = response.id + })).then(() => { + resolve() + }) + } + }) + } + + private async reconnect() { + // Clear the socket + this.socket = null + + // If there are no subscriptions, do nothing + if (this.subscriptions.length === 0) { + return } - this.socket.onclose = () => { - this.subscriptions.forEach((_, id) => { - this.unsubscribe(id) - }) - this.socket?.close() - } - this.socket.onerror = () => {} - this.socket.onopen = () => { + // Clear all remote ids + this.subscriptions.forEach(sub => { + delete sub.remoteId + }) + + // Validate the number of reconnection attempts + if (this.reconnectAttempts >= this.config.reconnectAttempts) { this.subscriptions.forEach(sub => { - this.socket?.send(JSON.stringify(sub)) + sub.onError(new Error(`Failed to reconnect to the server after ${this.reconnectAttempts} attempts`)) }) + this.subscriptions = [] + this.reconnectAttempts = 0 + return } + + // Delay the reconnection + await new Promise(resolve => + setTimeout(resolve, this.config.reconnectInterval) + ) + + // Try to reconnect + this.reconnectAttempts++ + await this.connect() + this.reconnectAttempts = 0 } - async subscribe( - topic: T, - args: SubscriptionArguments, - callback: (data: SubscriptionResponse) => void - ): Promise { + async subscribe(opts: { + topic: T + args: SubscriptionArguments + onData: (data: SubscriptionResponse) => void + onError: (error: Error) => void + }): Promise { // Connect the socket if it's not already open this.connect() // Track the subscription locally - const subscription: SubscriptionInfo = { + const sub: SubscriptionInfo = { id: this.counter++, - topic, - checkpoint: args, - callback, + topic: opts.topic, + checkpoint: opts.args, + onData: opts.onData, + onError: opts.onError, } - this.subscriptions.push(subscription) + this.subscriptions.push(sub) - // Send the subscription message - const request: SubscribeMessageRequest = { - action: "subscribe", - topic, - arguments: args, - } - this.socket?.send(JSON.stringify(request)) - - // Add to the response queue - const response = await this.waitForResponse() + // Send the subscribe message + const response = await this.sendSubscribe(sub) if (!response.success) { throw new Error( - `Failed to subscribe to topic ${topic}, error message: ${response.error_message}` + `Failed to subscribe to topic ${sub.topic}, error message: ${response.error_message}` ) } // Update the subscription with the remote id - subscription.remoteId = response.id + sub.remoteId = response.id return { - unsubscribe: () => this.unsubscribe(subscription.id) + unsubscribe: () => this.unsubscribe(sub.id), } } private unsubscribe(id: number): void { // Get the subscription const subscription = this.subscriptions.find(sub => sub.id === id) + if (!subscription) return - if (!subscription) { - return - } - - // Send the unsubscribe message if the subscription has a remote id - const { remoteId } = subscription - if (remoteId) { - (async () => { - const request: UnsubscribeMessageRequest = { - action: "unsubscribe", - id: remoteId, - } - this.socket?.send(JSON.stringify(request)) - - const response = await this.waitForResponse() - - if (!response.success) { - throw new Error( - `Failed to unsubscribe from topic ${subscription.topic}, error message: ${response.error_message}` - ) - } - })().catch(console.error) - } + // Send the unsubscribe message + this.sendUnsubscribe(subscription).catch(e => { + console.error( + `Failed to unsubscribe from topic ${subscription.topic}, error: ${e}` + ) + }) // Remove the subscription this.subscriptions = this.subscriptions.filter(sub => sub.id !== id) @@ -164,6 +199,46 @@ export class WebsocketTransport implements SubscriptionTransport { } } + private async sendSubscribe(sub: SubscriptionInfo) { + // Send the subscription message + const request: SubscribeMessageRequest = { + action: "subscribe", + topic: sub.topic, + arguments: sub.checkpoint, + } + this.socket?.send(JSON.stringify(request)) + + const response = await this.waitForResponse() + + if (!response.success) { + throw new Error( + `Failed to subscribe to topic ${sub.topic}, error message: ${response.error_message}` + ) + } + + return response + } + + private async sendUnsubscribe(sub: SubscriptionInfo) { + // Send the unsubscribe message if the subscription has a remote id + const {remoteId} = sub + if (remoteId) { + const request: UnsubscribeMessageRequest = { + action: "unsubscribe", + id: remoteId, + } + this.socket?.send(JSON.stringify(request)) + + const response = await this.waitForResponse() + + if (!response.success) { + throw new Error( + `Failed to unsubscribe from topic ${sub.topic}, error message: ${response.error_message}` + ) + } + } + } + private async waitForResponse() { return new Promise(resolve => { // TODO: NOOP, waiting for AN team to decide what to do here @@ -172,14 +247,10 @@ export class WebsocketTransport implements SubscriptionTransport { // Update the subscription checkpoint when a message is received // These checkpoints are used to resume subscriptions after disconnects - private updateSubscriptionCheckpoint(sub: SubscriptionInfo, message: SubscriptionDataMessage) { - switch (sub.topic) { - case "events": - sub.checkpoint.blockHeight = message.data.block_height - break - case "blocks": - sub.checkpoint.blockHeight = message.data.block_height - break - } + private updateSubscriptionCheckpoint( + sub: SubscriptionInfo, + message: SubscriptionDataMessage + ) { + // TODO: Will be implemented in the future } } diff --git a/packages/transport-http/src/subscriptions/subscription-router.ts b/packages/transport-http/src/subscriptions/subscription-router.ts deleted file mode 100644 index 2c2cb5d5c..000000000 --- a/packages/transport-http/src/subscriptions/subscription-router.ts +++ /dev/null @@ -1,18 +0,0 @@ -export class SubscriptionRouter { - constructor(transport) { - this.transport = transport; - this.subscriptions = new Map(); - } - async subscribe(topic, args, callback) { - const subscription = await this.transport.subscribe(topic, args, callback); - this.subscriptions.set(subscription, callback); - return subscription; - } - unsubscribe(subscription) { - const callback = this.subscriptions.get(subscription); - if (callback) { - this.subscriptions.delete(subscription); - subscription.unsubscribe(); - } - } -} \ No newline at end of file diff --git a/packages/transport-http/src/subscriptions/types.ts b/packages/transport-http/src/subscriptions/types.ts index 375436436..82d9b844f 100644 --- a/packages/transport-http/src/subscriptions/types.ts +++ b/packages/transport-http/src/subscriptions/types.ts @@ -36,9 +36,10 @@ export type Subscription = { } export type SubscriptionTransport = { - subscribe: ( - topic: T, - args: SubscriptionArguments, - callback: (data: SubscriptionResponse) => void - ) => Promise + subscribe: (opts: { + topic: T + args: SubscriptionArguments + onData: (data: SubscriptionResponse) => void + onError: (error: Error) => void + }) => Promise }