diff --git a/packages/transport-http/src/sdk-send-http.ts b/packages/transport-http/src/sdk-send-http.ts index 08a5f76df..4240f310b 100644 --- a/packages/transport-http/src/sdk-send-http.ts +++ b/packages/transport-http/src/sdk-send-http.ts @@ -12,5 +12,6 @@ export {sendGetNetworkParameters} from "./send-get-network-parameters" export {sendGetNodeVersionInfo} from "./send-get-node-version-info" export {connectSubscribeEvents} from "./connect-subscribe-events" export {send} from "./send-http" +export {WsSubscriptionTransport as SubscriptionTransport} from "./subscriptions/ws-transport" export {WebsocketError} from "./connect-ws" export {HTTPRequestError} from "./http-request.js" diff --git a/packages/transport-http/src/subscriptions/types.ts b/packages/transport-http/src/subscriptions/types.ts deleted file mode 100644 index 82d9b844f..000000000 --- a/packages/transport-http/src/subscriptions/types.ts +++ /dev/null @@ -1,45 +0,0 @@ -export enum SubscriptionTopic { - EVENTS = "events", - BLOCKS = "blocks", -} - -export type SubscriptionSchema = { - [SubscriptionTopic.EVENTS]: { - args: { - startBlock: number - endBlock: number - } - response: { - type: string - data: any - } - } - [SubscriptionTopic.BLOCKS]: { - args: { - startBlock: number - endBlock: number - } - response: { - type: string - data: any - } - } -} - -export type SubscriptionArguments = - SubscriptionSchema[T]["args"] -export type SubscriptionResponse = - SubscriptionSchema[T]["response"] - -export type Subscription = { - unsubscribe: () => void -} - -export type SubscriptionTransport = { - subscribe: (opts: { - topic: T - args: SubscriptionArguments - onData: (data: SubscriptionResponse) => void - onError: (error: Error) => void - }) => Promise -} diff --git a/packages/transport-http/src/subscriptions/stream-controller.test.ts b/packages/transport-http/src/subscriptions/ws-transport.test.ts similarity index 92% rename from packages/transport-http/src/subscriptions/stream-controller.test.ts rename to packages/transport-http/src/subscriptions/ws-transport.test.ts index a88bc8275..3b02d033f 100644 --- a/packages/transport-http/src/subscriptions/stream-controller.test.ts +++ b/packages/transport-http/src/subscriptions/ws-transport.test.ts @@ -6,14 +6,14 @@ import { SubscriptionDataMessage, UnsubscribeMessageRequest, } from "./models" -import {StreamController} from "./stream-controller" -import {SubscriptionTopic} from "./types" +import {WsSubscriptionTransport} from "./ws-transport" +import {Subscription} from "@onflow/typedefs" jest.mock("../websocket", () => ({ WebSocket: mockSocket, })) -describe("StreamController", () => { +describe("WsSubscriptionTransport", () => { let mockWs: WS beforeEach(() => { mockWs = new WS("wss://localhost:8080") @@ -29,7 +29,7 @@ describe("StreamController", () => { reconnectInterval: 1000, reconnectAttempts: 10, } - new StreamController(config) + new WsSubscriptionTransport(config) await new Promise(resolve => setTimeout(resolve, 0)) expect(mockWs.server.clients).toHaveLength(0) @@ -41,8 +41,8 @@ describe("StreamController", () => { reconnectInterval: 1000, reconnectAttempts: 10, } - const streamController = new StreamController(config) - const topic = "topic" as SubscriptionTopic + const streamController = new WsSubscriptionTransport(config) + const topic = "topic" as Subscription.Topic const args = {key: "value"} as any const onData = jest.fn() const onError = jest.fn() @@ -93,8 +93,8 @@ describe("StreamController", () => { reconnectInterval: 1000, reconnectAttempts: 10, } - const streamController = new StreamController(config) - const topic = "topic" as SubscriptionTopic + const streamController = new WsSubscriptionTransport(config) + const topic = "topic" as Subscription.Topic const args = {key: "value"} as any const onData = jest.fn() const onError = jest.fn() @@ -165,8 +165,8 @@ describe("StreamController", () => { reconnectInterval: 1000, reconnectAttempts: 1, } - const streamController = new StreamController(config) - const topic = "topic" as SubscriptionTopic + const streamController = new WsSubscriptionTransport(config) + const topic = "topic" as Subscription.Topic const args = {key: "value"} as any const onData = jest.fn() const onError = jest.fn() diff --git a/packages/transport-http/src/subscriptions/stream-controller.ts b/packages/transport-http/src/subscriptions/ws-transport.ts similarity index 90% rename from packages/transport-http/src/subscriptions/stream-controller.ts rename to packages/transport-http/src/subscriptions/ws-transport.ts index ed0b09b74..12306da5a 100644 --- a/packages/transport-http/src/subscriptions/stream-controller.ts +++ b/packages/transport-http/src/subscriptions/ws-transport.ts @@ -10,16 +10,12 @@ import { } from "./models" import { Subscription, - SubscriptionArguments, - SubscriptionResponse, - SubscriptionTopic, - SubscriptionTransport, -} from "./types" +} from "@onflow/typedefs" import {WebSocket} from "../websocket" const WS_OPEN = 1 -interface SubscriptionInfo { +interface SubscriptionInfo { // Internal ID for the subscription id: number // Remote ID assigned by the server used for message routing and unsubscribing @@ -27,27 +23,27 @@ interface SubscriptionInfo { // The topic of the subscription topic: T // The checkpoint to resume the subscription from - checkpoint: SubscriptionArguments + checkpoint: Subscription.Arguments // The callback to call when a data is received onData: (data: any) => void // The callback to call when an error occurs onError: (error: Error) => void } -interface StreamControllerConfig { +interface SubscriptionTransportConfig { hostname: string reconnectInterval: number reconnectAttempts: number } -export class StreamController implements SubscriptionTransport { +export class WsSubscriptionTransport implements Subscription.Transport { private counter = 0 - private subscriptions: SubscriptionInfo[] = [] + private subscriptions: SubscriptionInfo[] = [] private socket: WebSocket | null = null - private config: StreamControllerConfig + private config: SubscriptionTransportConfig private reconnectAttempts = 0 - constructor(config: StreamControllerConfig) { + constructor(config: SubscriptionTransportConfig) { this.config = { hostname: config.hostname, reconnectInterval: config.reconnectInterval || 2000, @@ -143,12 +139,12 @@ export class StreamController implements SubscriptionTransport { this.reconnectAttempts = 0 } - async subscribe(opts: { + async subscribe(opts: { topic: T - args: SubscriptionArguments - onData: (data: SubscriptionResponse) => void + args: Subscription.Arguments + onData: (data: Subscription.Response) => void onError: (error: Error) => void - }): Promise { + }): Promise { // Connect the socket if it's not already open await this.connect() @@ -200,7 +196,7 @@ export class StreamController implements SubscriptionTransport { } } - private async sendSubscribe(sub: SubscriptionInfo) { + private async sendSubscribe(sub: SubscriptionInfo) { // Send the subscription message const request: SubscribeMessageRequest = { action: "subscribe", @@ -220,7 +216,7 @@ export class StreamController implements SubscriptionTransport { return response } - private async sendUnsubscribe(sub: SubscriptionInfo) { + private async sendUnsubscribe(sub: SubscriptionInfo) { // Send the unsubscribe message if the subscription has a remote id const {remoteId} = sub if (remoteId) { @@ -255,7 +251,7 @@ export class StreamController implements SubscriptionTransport { // Update the subscription checkpoint when a message is received // These checkpoints are used to resume subscriptions after disconnects private updateSubscriptionCheckpoint< - T extends SubscriptionTopic = SubscriptionTopic, + T extends Subscription.Topic = Subscription.Topic, >(sub: SubscriptionInfo, message: SubscriptionDataMessage) { // TODO: Will be implemented with each subscription topic } diff --git a/packages/typedefs/src/index.ts b/packages/typedefs/src/index.ts index 4f4082674..0e3c78d06 100644 --- a/packages/typedefs/src/index.ts +++ b/packages/typedefs/src/index.ts @@ -459,3 +459,4 @@ export type EventStream = StreamConnection<{ export * from "./interaction" export * from "./fvm-errors" +export * as Subscription from "./subscriptions" diff --git a/packages/typedefs/src/subscriptions.ts b/packages/typedefs/src/subscriptions.ts new file mode 100644 index 000000000..27403aeab --- /dev/null +++ b/packages/typedefs/src/subscriptions.ts @@ -0,0 +1,50 @@ +export enum Topic { + EVENTS = "events", + BLOCKS = "blocks", +} + +export type Schema = { + [Topic.EVENTS]: SchemaItem< + { + startBlock: number + endBlock: number + }, + { + type: string + data: any + } + > + [Topic.BLOCKS]: SchemaItem< + { + startBlock: number + endBlock: number + }, + { + type: string + data: any + } + > +} + +export type SchemaItem = { + args: TArgs + response: TResponse +} + +export type Arguments = + Schema[T]["args"] +export type Response = + Schema[T]["response"] + +export type Instance = { + unsubscribe: () => void +} + +export type Transport = { + subscribe: (opts: { + topic: T + args: Arguments + onData: (data: Response) => void + onError: (error: Error) => void + }) => Promise +}