diff --git a/packages/sdk/src/account/account.js b/packages/sdk/src/account/account.js index f3a2a75ed..59709f96a 100644 --- a/packages/sdk/src/account/account.js +++ b/packages/sdk/src/account/account.js @@ -3,7 +3,7 @@ import {atBlockId} from "../build/build-at-block-id.js" import {getAccount} from "../build/build-get-account.js" import {invariant} from "@onflow/util-invariant" import {decodeResponse as decode} from "../decode/decode.js" -import {send} from "../send/send.js" +import {send} from "../transport" /** * @typedef {import("@onflow/typedefs").Account} Account diff --git a/packages/sdk/src/block/block.js b/packages/sdk/src/block/block.js index 6cc92df8e..b039a9334 100644 --- a/packages/sdk/src/block/block.js +++ b/packages/sdk/src/block/block.js @@ -1,4 +1,4 @@ -import {send} from "../send/send.js" +import {send} from "../transport/send/send" import {getBlock} from "../build/build-get-block" import {atBlockHeight} from "../build/build-at-block-height.js" import {atBlockId} from "../build/build-at-block-id.js" diff --git a/packages/sdk/src/contract.test.js b/packages/sdk/src/contract.test.js index 40f95cea8..374257ef5 100644 --- a/packages/sdk/src/contract.test.js +++ b/packages/sdk/src/contract.test.js @@ -2,7 +2,7 @@ import * as root from "./sdk" import * as decode from "./decode/decode.js" import * as encode from "./encode/encode" import * as interaction from "./interaction/interaction" -import * as send from "./send/send.js" +import * as send from "./transport" import * as template from "@onflow/util-template" const interfaceContract = diff --git a/packages/sdk/src/node-version-info/node-version-info.ts b/packages/sdk/src/node-version-info/node-version-info.ts index c905a76cd..76d38e25f 100644 --- a/packages/sdk/src/node-version-info/node-version-info.ts +++ b/packages/sdk/src/node-version-info/node-version-info.ts @@ -1,4 +1,4 @@ -import {send} from "../send/send.js" +import {send} from "../transport/send/send" import {decodeResponse as decode} from "../decode/decode.js" import {getNodeVersionInfo} from "../build/build-get-node-version-info" import {NodeVersionInfo} from "@onflow/typedefs" diff --git a/packages/sdk/src/sdk.ts b/packages/sdk/src/sdk.ts index 9ea40de35..3f3b8550b 100644 --- a/packages/sdk/src/sdk.ts +++ b/packages/sdk/src/sdk.ts @@ -2,7 +2,7 @@ import * as logger from "@onflow/util-logger" // Base export {build} from "./build/build.js" export {resolve} from "./resolve/resolve.js" -export {send} from "./send/send.js" +export {send, subscribe, rawSubscribe} from "./transport" export {decode} from "./decode/sdk-decode.js" export { encodeTransactionPayload, diff --git a/packages/sdk/src/send/send.js b/packages/sdk/src/send/send.js deleted file mode 100644 index eab60a9e4..000000000 --- a/packages/sdk/src/send/send.js +++ /dev/null @@ -1,40 +0,0 @@ -import {Buffer} from "@onflow/rlp" -import {send as defaultSend} from "@onflow/transport-http" -import {initInteraction, pipe} from "../interaction/interaction" -import * as ixModule from "../interaction/interaction" -import {invariant} from "../build/build-invariant.js" -import {response} from "../response/response" -import {config} from "@onflow/config" -import {resolve as defaultResolve} from "../resolve/resolve.js" - -/** - * @description - Sends arbitrary scripts, transactions, and requests to Flow - * @param {Array. | Function} args - An array of functions that take interaction and return interaction - * @param {object} opts - Optional parameters - * @returns {Promise<*>} - A promise that resolves to a response - */ -export const send = async (args = [], opts = {}) => { - const sendFn = await config.first( - ["sdk.transport", "sdk.send"], - opts.send || defaultSend - ) - - invariant( - sendFn, - `Required value for sdk.transport is not defined in config. See: ${"https://github.com/onflow/fcl-js/blob/master/packages/sdk/CHANGELOG.md#0057-alpha1----2022-01-21"}` - ) - - const resolveFn = await config.first( - ["sdk.resolve"], - opts.resolve || defaultResolve - ) - - opts.node = opts.node || (await config().get("accessNode.api")) - - if (Array.isArray(args)) args = pipe(initInteraction(), args) - return sendFn( - await resolveFn(args), - {config, response, ix: ixModule, Buffer}, - opts - ) -} diff --git a/packages/sdk/src/transport/get-transport.test.ts b/packages/sdk/src/transport/get-transport.test.ts new file mode 100644 index 000000000..9e78b8d30 --- /dev/null +++ b/packages/sdk/src/transport/get-transport.test.ts @@ -0,0 +1,127 @@ +import {SdkTransport} from "@onflow/typedefs" +import {getTransport} from "./get-transport" +import {httpTransport} from "@onflow/transport-http" +import {config} from "@onflow/config" + +jest.mock("@onflow/transport-http", () => ({ + httpTransport: { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked, +})) + +describe("getTransport", () => { + beforeEach(() => { + jest.resetAllMocks() + }) + + test("fallback to http transport", async () => { + const transport = await getTransport() + expect(transport).toBe(httpTransport) + }) + + test("override with custom transport", async () => { + const customTransport = { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked + + const transport = await getTransport({transport: customTransport}) + expect(transport).toBe(customTransport) + }) + + test("override with custom send function", async () => { + const customSend = jest.fn() + + const transport = await getTransport({send: customSend}) + expect(transport).toEqual({ + send: customSend, + subscribe: expect.any(Function), + }) + }) + + test("override with both custom transport and send function", async () => { + await expect( + getTransport({ + send: jest.fn(), + transport: { + send: jest.fn(), + subscribe: jest.fn(), + }, + }) + ).rejects.toThrow( + /Cannot provide both "transport" and legacy "send" options/ + ) + }) + + test("transport from global config - sdk.transport", async () => { + const customTransport = { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked + + const tranpsort = await config().overload( + { + "sdk.transport": customTransport, + }, + async () => { + return await getTransport() + } + ) + + expect(tranpsort).toBe(customTransport) + }) + + test("send function from global config - sdk.transport", async () => { + const customSend = jest.fn() + + const transport = await config().overload( + { + "sdk.transport": customSend, + }, + async () => { + return await getTransport() + } + ) + expect(transport).toEqual({ + send: customSend, + subscribe: expect.any(Function), + }) + }) + + test("send function from global config - sdk.send", async () => { + const customSend = jest.fn() + + const transport = await config().overload( + { + "sdk.send": customSend, + }, + async () => { + return await getTransport() + } + ) + + expect(transport).toEqual({ + send: customSend, + subscribe: expect.any(Function), + }) + }) + + test("custom transport has priority over global config", async () => { + const customTransport = { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked + + const transport = await config().overload( + { + "sdk.transport": httpTransport, + }, + async () => { + return await getTransport({transport: customTransport}) + } + ) + + expect(transport).toBe(customTransport) + }) +}) diff --git a/packages/sdk/src/transport/get-transport.ts b/packages/sdk/src/transport/get-transport.ts new file mode 100644 index 000000000..635d6e054 --- /dev/null +++ b/packages/sdk/src/transport/get-transport.ts @@ -0,0 +1,54 @@ +import {config} from "@onflow/config" +import {httpTransport as defaultTransport} from "@onflow/transport-http" +import {SdkTransport} from "@onflow/typedefs" +import {invariant} from "@onflow/util-invariant" + +/** + * Get the SDK transport object, either from the provided override or from the global config. + * @param overrides - Override default configuration with custom transport or send function. + * @returns The SDK transport object. + */ +export async function getTransport( + override: { + send?: SdkTransport.SendFn + transport?: SdkTransport.Transport + } = {} +): Promise { + invariant( + override.send == null || override.transport == null, + `SDK Transport Error: Cannot provide both "transport" and legacy "send" options.` + ) + + const transportOrSend = + override.transport || + override.send || + (await config().first( + ["sdk.transport", "sdk.send"], + defaultTransport + )) + + // Backwards compatibility with legacy send function + if (!isTransportObject(transportOrSend)) { + return { + send: transportOrSend, + subscribe: () => { + throw new Error( + "Subscribe not supported with legacy send function transport, please provide a transport object." + ) + }, + } + } + + return transportOrSend +} + +function isTransportObject( + transport: any +): transport is SdkTransport.Transport { + return ( + transport.send !== undefined && + transport.subscribe !== undefined && + typeof transport.send === "function" && + typeof transport.subscribe === "function" + ) +} diff --git a/packages/sdk/src/transport/index.ts b/packages/sdk/src/transport/index.ts new file mode 100644 index 000000000..cf3d3ae64 --- /dev/null +++ b/packages/sdk/src/transport/index.ts @@ -0,0 +1,3 @@ +export {send} from "./send/send" +export {subscribe} from "./subscribe/subscribe" +export {rawSubscribe} from "./subscribe/raw-subscribe" diff --git a/packages/sdk/src/send/send.test.js b/packages/sdk/src/transport/send/send.test.js similarity index 100% rename from packages/sdk/src/send/send.test.js rename to packages/sdk/src/transport/send/send.test.js diff --git a/packages/sdk/src/transport/send/send.ts b/packages/sdk/src/transport/send/send.ts new file mode 100644 index 000000000..02ce8f1a6 --- /dev/null +++ b/packages/sdk/src/transport/send/send.ts @@ -0,0 +1,41 @@ +import {Buffer} from "@onflow/rlp" +import {initInteraction, pipe} from "../../interaction/interaction" +import * as ixModule from "../../interaction/interaction" +import {invariant} from "../../build/build-invariant" +import {response} from "../../response/response" +import {config} from "@onflow/config" +import {resolve as defaultResolve} from "../../resolve/resolve" +import {getTransport} from "../get-transport" + +/** + * @description - Sends arbitrary scripts, transactions, and requests to Flow + * @param args - An array of functions that take interaction and return interaction + * @param opts - Optional parameters + * @returns - A promise that resolves to a response + */ +export const send = async ( + args: Function | Function[] = [], + opts: any = {} +): Promise => { + const transport = await getTransport(opts) + const sendFn = transport.send.bind(transport) + + invariant( + sendFn, + `Required value for sdk.transport is not defined in config. See: ${"https://github.com/onflow/fcl-js/blob/master/packages/sdk/CHANGELOG.md#0057-alpha1----2022-01-21"}` + ) + + const resolveFn = await config.first( + ["sdk.resolve"], + opts.resolve || defaultResolve + ) + + opts.node = opts.node || (await config().get("accessNode.api")) + + if (Array.isArray(args)) args = pipe(initInteraction(), args as any) as any + return sendFn( + await resolveFn(args), + {config, response, ix: ixModule, Buffer} as any, + opts + ) +} diff --git a/packages/sdk/src/transport/subscribe/raw-subscribe.test.ts b/packages/sdk/src/transport/subscribe/raw-subscribe.test.ts new file mode 100644 index 000000000..bbf134730 --- /dev/null +++ b/packages/sdk/src/transport/subscribe/raw-subscribe.test.ts @@ -0,0 +1,47 @@ +import {config} from "@onflow/config" +import {rawSubscribe} from "./raw-subscribe" +import {SdkTransport} from "@onflow/typedefs" +import {getTransport} from "../get-transport" + +jest.mock("../get-transport") + +describe("subscribe", () => { + let mockTransport: jest.Mocked + let mockSub: jest.Mocked = { + unsubscribe: jest.fn(), + } + + beforeEach(() => { + jest.resetAllMocks() + + mockTransport = { + subscribe: jest.fn().mockReturnValue(mockSub), + send: jest.fn(), + } + jest.mocked(getTransport).mockResolvedValue(mockTransport) + }) + + test("subscribes to a topic and returns subscription from transport", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const sub = await config().overload( + { + "accessNode.api": "http://localhost:8080", + }, + async () => { + return await rawSubscribe({topic, args, onData, onError}) + } + ) + + expect(mockTransport.subscribe).toHaveBeenCalledTimes(1) + expect(mockTransport.subscribe).toHaveBeenCalledWith( + {topic, args, onData: onData, onError}, + {node: "http://localhost:8080"} + ) + + expect(sub).toBe(mockSub) + }) +}) diff --git a/packages/sdk/src/transport/subscribe/raw-subscribe.ts b/packages/sdk/src/transport/subscribe/raw-subscribe.ts new file mode 100644 index 000000000..6956fc649 --- /dev/null +++ b/packages/sdk/src/transport/subscribe/raw-subscribe.ts @@ -0,0 +1,41 @@ +import {config} from "@onflow/config" +import {SdkTransport} from "@onflow/typedefs" +import {getTransport} from "../get-transport" +import {invariant} from "@onflow/util-invariant" +import {SubscribeParams} from "./types" + +/** + * Subscribe to a topic without decoding the data. + * @param params - The parameters for the subscription. + * @param opts - Additional options for the subscription. + * @returns A promise that resolves once the subscription is active. + */ +export async function rawSubscribe( + {topic, args, onData, onError}: SubscribeParams, + opts: { + node?: string + transport?: SdkTransport.Transport + } = {} +) { + const transport = await getTransport(opts) + const node = opts?.node || (await config().get("accessNode.api")) + + invariant( + !!node, + `SDK Send Error: Either opts.node or "accessNode.api" in config must be defined.` + ) + + // Subscribe using the resolved transport + return transport.subscribe( + { + topic, + args, + onData, + onError, + }, + { + node, + ...opts, + } + ) +} diff --git a/packages/sdk/src/transport/subscribe/subscribe.test.ts b/packages/sdk/src/transport/subscribe/subscribe.test.ts new file mode 100644 index 000000000..ad7be7950 --- /dev/null +++ b/packages/sdk/src/transport/subscribe/subscribe.test.ts @@ -0,0 +1,112 @@ +import {SdkTransport} from "@onflow/typedefs" +import {subscribe} from "./subscribe" +import {rawSubscribe} from "./raw-subscribe" + +jest.mock("./raw-subscribe") +const mockRawSubscribe = jest.mocked(rawSubscribe) + +describe("subscribe", () => { + let mockSub: jest.Mocked = { + unsubscribe: jest.fn(), + } + + beforeEach(() => { + jest.resetAllMocks() + mockRawSubscribe.mockResolvedValue(mockSub) + }) + + test("subscribes to a topic and returns a subscription", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const sub = await subscribe({ + topic, + args, + onData, + onError, + }) + + expect(sub).toBe(mockSub) + expect(mockRawSubscribe).toHaveBeenCalledTimes(1) + expect(mockRawSubscribe).toHaveBeenCalledWith( + {topic, args, onData: expect.any(Function), onError}, + {} + ) + }) + + test("unsubscribes from a subscription", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const sub = await subscribe({ + topic, + args, + onData, + onError, + }) + + sub.unsubscribe() + + expect(mockSub.unsubscribe).toHaveBeenCalledTimes(1) + }) + + test("subscribes to a topic with a node", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const node = "http://localhost:8080" + + const sub = await subscribe( + { + topic, + args, + onData, + onError, + }, + {node} + ) + + expect(sub).toBe(mockSub) + expect(mockRawSubscribe).toHaveBeenCalledTimes(1) + expect(mockRawSubscribe).toHaveBeenCalledWith( + {topic, args, onData: expect.any(Function), onError}, + {node} + ) + }) + + test("subscribes to a topic with custom node and transport", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const node = "http://localhost:8080" + const transport = { + send: jest.fn(), + subscribe: jest.fn().mockResolvedValue(mockSub), + } as jest.Mocked + + const sub = await subscribe( + { + topic, + args, + onData, + onError, + }, + {node, transport} + ) + + expect(sub).toBe(mockSub) + expect(mockRawSubscribe).toHaveBeenCalledTimes(1) + expect(mockRawSubscribe).toHaveBeenCalledWith( + {topic, args, onData: expect.any(Function), onError}, + {node, transport} + ) + }) +}) diff --git a/packages/sdk/src/transport/subscribe/subscribe.ts b/packages/sdk/src/transport/subscribe/subscribe.ts new file mode 100644 index 000000000..07112374d --- /dev/null +++ b/packages/sdk/src/transport/subscribe/subscribe.ts @@ -0,0 +1,37 @@ +import {SdkTransport} from "@onflow/typedefs" +import {rawSubscribe} from "./raw-subscribe" +import {decodeResponse} from "../../decode/decode" +import {SubscribeParams} from "./types" + +/** + * Subscribe to a topic and decode the data. + * @param params - The parameters for the subscription. + * @param opts - Additional options for the subscription. + * @returns A promise that resolves when the subscription is active. + */ +export async function subscribe( + {topic, args, onData, onError}: SubscribeParams, + opts: { + node?: string + transport?: SdkTransport.Transport + } = {} +): Promise { + const sub = await rawSubscribe( + { + topic, + args, + onData: data => { + decodeResponse(data) + .then(onData) + .catch(e => { + onError(new Error(`Failed to decode response: ${e.message}`)) + sub.unsubscribe() + }) + }, + onError, + }, + opts + ) + + return sub +} diff --git a/packages/sdk/src/transport/subscribe/types.ts b/packages/sdk/src/transport/subscribe/types.ts new file mode 100644 index 000000000..0069edf74 --- /dev/null +++ b/packages/sdk/src/transport/subscribe/types.ts @@ -0,0 +1,8 @@ +import {SdkTransport} from "@onflow/typedefs" + +export type SubscribeParams = { + topic: T + args: SdkTransport.SubscriptionArguments + onData: (data: SdkTransport.SubscriptionData) => void + onError: (error: Error) => void +} diff --git a/packages/transport-http/src/subscribe/handlers/types.ts b/packages/transport-http/src/subscribe/handlers/types.ts new file mode 100644 index 000000000..939b6c59f --- /dev/null +++ b/packages/transport-http/src/subscribe/handlers/types.ts @@ -0,0 +1,50 @@ +export interface SubscriptionHandler< + T extends { + Topic: string + Args: any + Data: any + ArgsDto: any + DataDto: any + }, +> { + readonly topic: T["Topic"] + createSubscriber( + initialArgs: T["Args"], + onData: (data: T["Data"]) => void, + onError: (error: Error) => void + ): DataSubscriber +} + +export interface DataSubscriber { + /** + * The callback to call when a data is received + */ + onData(data: DataDto): void + + /** + * The callback to call when an error is received + */ + onError(error: Error): void + + /** + * The arguments to connect or reconnect to the subscription + */ + argsToDto(args: Args): ArgsDto + + /** + * Get the arguments to connect or reconnect to the subscription + */ + get connectionArgs(): Args +} + +export function createSubscriptionHandler< + T extends { + Topic: string + Args: any + Data: any + ArgsDto: any + DataDto: any + }, +>(handler: SubscriptionHandler): SubscriptionHandler { + return handler +} diff --git a/packages/transport-http/src/subscribe/subscribe.ts b/packages/transport-http/src/subscribe/subscribe.ts index 50e3ae3d4..9f47a0f5d 100644 --- a/packages/transport-http/src/subscribe/subscribe.ts +++ b/packages/transport-http/src/subscribe/subscribe.ts @@ -1,8 +1,13 @@ import {SdkTransport} from "@onflow/typedefs" import {SubscriptionManager} from "./subscription-manager" +const SUBSCRIPTION_HANDLERS: any[] = [] + // Map of SubscriptionManager instances by access node URL -let subscriptionManagerMap: Map = new Map() +let subscriptionManagerMap: Map< + string, + SubscriptionManager +> = new Map() export async function subscribe( { @@ -18,16 +23,17 @@ export async function subscribe( }, opts: {node: string} ): Promise { + // Get the SubscriptionManager instance for the access node, or create a new one + const node = opts.node const manager = - subscriptionManagerMap.get(opts.node) || - new SubscriptionManager({ - node: opts.node, - }) - subscriptionManagerMap.set(opts.node, manager) + subscriptionManagerMap.get(node) || + new SubscriptionManager(SUBSCRIPTION_HANDLERS, {node}) + subscriptionManagerMap.set(node, manager) return manager.subscribe({ topic, args, + // @ts-ignore - TODO: This is temporary until we start implementing the handlers onData, onError, }) diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts index 48d2b81cc..0d656ebb9 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.test.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -12,15 +12,34 @@ import { SubscriptionManagerConfig, } from "./subscription-manager" import {SdkTransport} from "@onflow/typedefs" +import {DataSubscriber, SubscriptionHandler} from "./handlers/types" jest.mock("./websocket", () => ({ WebSocket: mockSocket, })) -describe("WsSubscriptionTransport", () => { +describe("SubscriptionManager", () => { let mockWs: WS + let mockSubscriber: jest.Mocked> + let mockHandler: jest.Mocked> + const mockConnectionArgs = {mock: "connection args"} + beforeEach(() => { + jest.resetAllMocks() + mockWs = new WS("wss://localhost:8080") + mockSubscriber = { + onData: jest.fn(), + onError: jest.fn(), + argsToDto: jest.fn().mockReturnValue(mockConnectionArgs), + get connectionArgs() { + return mockConnectionArgs + }, + } + mockHandler = { + topic: "topic", + createSubscriber: jest.fn().mockReturnValue(mockSubscriber), + } }) afterEach(() => { @@ -28,11 +47,7 @@ describe("WsSubscriptionTransport", () => { }) test("does not connect to the socket when no subscriptions are made", async () => { - const config: SubscriptionManagerConfig = { - node: "wss://localhost:8080", - } - - new SubscriptionManager(config) + new SubscriptionManager([mockHandler], {node: "wss://localhost:8080"}) await new Promise(resolve => setTimeout(resolve, 0)) expect(mockWs.server.clients).toHaveLength(0) @@ -42,7 +57,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -56,7 +71,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -69,7 +84,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -92,7 +107,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -106,7 +121,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -119,7 +134,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -141,9 +156,9 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(1) - expect(onData).toHaveBeenCalledWith({key: "value"}) - expect(onError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.onData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.onData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.onError).toHaveBeenCalledTimes(0) serverPromise = (async () => { const msg = (await mockWs.nextMessage) as string @@ -162,7 +177,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -176,7 +191,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -189,7 +204,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -200,6 +215,12 @@ describe("WsSubscriptionTransport", () => { expect(subscription).toBeDefined() expect(subscription.unsubscribe).toBeInstanceOf(Function) + expect(mockHandler.createSubscriber).toHaveBeenCalledTimes(1) + expect(mockHandler.createSubscriber).toHaveBeenCalledWith( + args, + onData, + onError + ) serverPromise = (async () => { const data = { @@ -211,9 +232,9 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(1) - expect(onData).toHaveBeenCalledWith({key: "value"}) - expect(onError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.onData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.onData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.onError).toHaveBeenCalledTimes(0) // Close the connection and create a new one mockWs.close() @@ -227,7 +248,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -254,7 +275,7 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(2) - expect(onData.mock.calls[1]).toEqual([{key: "value2"}]) + expect(mockSubscriber.onData).toHaveBeenCalledTimes(2) + expect(mockSubscriber.onData.mock.calls[1]).toEqual([{key: "value2"}]) }) }) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index b7ea02436..cc9c3bf3e 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -9,8 +9,9 @@ import { SubscribeMessageResponse, UnsubscribeMessageRequest, } from "./models" -import type {SdkTransport} from "@onflow/typedefs" +import {SdkTransport} from "@onflow/typedefs" import {WebSocket} from "./websocket" +import {DataSubscriber, SubscriptionHandler} from "./handlers/types" import * as logger from "@onflow/util-logger" const WS_OPEN = 1 @@ -19,19 +20,17 @@ type DeepRequired = Required<{ [K in keyof T]: DeepRequired }> -interface SubscriptionInfo { +type InferHandler = T extends SubscriptionHandler ? H : never + +interface SubscriptionInfo { // Internal ID for the subscription id: number // Remote ID assigned by the server used for message routing and unsubscribing remoteId?: string // The topic of the subscription - topic: T - // The checkpoint to resume the subscription from - checkpoint: SdkTransport.SubscriptionArguments - // The callback to call when a data is received - onData: (data: any) => void - // The callback to call when an error occurs - onError: (error: Error) => void + topic: string + // Data provider for the subscription + subscriber: DataSubscriber } export interface SubscriptionManagerConfig { @@ -61,14 +60,15 @@ export interface SubscriptionManagerConfig { } } -export class SubscriptionManager { +export class SubscriptionManager[]> { private counter = 0 - private subscriptions: SubscriptionInfo[] = [] private socket: WebSocket | null = null + private subscriptions: SubscriptionInfo[] = [] private config: DeepRequired private reconnectAttempts = 0 + private handlers: Record> - constructor(config: SubscriptionManagerConfig) { + constructor(handlers: Handlers, config: SubscriptionManagerConfig) { this.config = { ...config, reconnectOptions: { @@ -78,6 +78,15 @@ export class SubscriptionManager { ...config.reconnectOptions, }, } + + // Map data providers by topic + this.handlers = handlers.reduce( + (acc, handler) => { + acc[handler.topic] = handler + return acc + }, + {} as Record> + ) } // Lazy connect to the socket when the first subscription is made @@ -90,21 +99,15 @@ export class SubscriptionManager { this.socket = new WebSocket(this.config.node) this.socket.onmessage = event => { - const data = JSON.parse(event.data) as + const message = JSON.parse(event.data) as | MessageResponse | SubscriptionDataMessage - if ("action" in data) { + if ("action" in message) { // TODO, waiting for AN team to decide what to do here } else { - const sub = this.subscriptions.find(sub => sub.remoteId === data.id) - if (!sub) return - // Update the block height to checkpoint for disconnects - this.updateSubscriptionCheckpoint(sub, data) - - // Call the subscription callback - sub.onData(data.data) + this.handleSubscriptionData(message) } } this.socket.onclose = () => { @@ -157,7 +160,7 @@ export class SubscriptionManager { }) this.subscriptions.forEach(sub => { - sub.onError( + sub.subscriber.onError( new Error( `Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}` ) @@ -182,22 +185,28 @@ export class SubscriptionManager { } } - async subscribe(opts: { - topic: T - args: SdkTransport.SubscriptionArguments - onData: (data: SdkTransport.SubscriptionData) => void + async subscribe(opts: { + topic: InferHandler["Topic"] + args: InferHandler["Args"] + onData: (data: InferHandler["Data"]) => void onError: (error: Error) => void }): Promise { // Connect the socket if it's not already open await this.connect() + // Get the data provider for the topic + const topicHandler = this.getHandler(opts.topic) + const subscriber = topicHandler.createSubscriber( + opts.args, + opts.onData, + opts.onError + ) + // Track the subscription locally - const sub: SubscriptionInfo = { + const sub: SubscriptionInfo = { id: this.counter++, topic: opts.topic, - checkpoint: opts.args, - onData: opts.onData, - onError: opts.onError, + subscriber: subscriber, } this.subscriptions.push(sub) @@ -239,14 +248,12 @@ export class SubscriptionManager { } } - private async sendSubscribe( - sub: SubscriptionInfo - ) { + private async sendSubscribe(sub: SubscriptionInfo) { // Send the subscription message const request: SubscribeMessageRequest = { action: Action.SUBSCRIBE, topic: sub.topic, - arguments: sub.checkpoint, + arguments: sub.subscriber.connectionArgs, } this.socket?.send(JSON.stringify(request)) @@ -261,9 +268,7 @@ export class SubscriptionManager { return response } - private async sendUnsubscribe( - sub: SubscriptionInfo - ) { + private async sendUnsubscribe(sub: SubscriptionInfo) { // Send the unsubscribe message if the subscription has a remote id const {remoteId} = sub if (remoteId) { @@ -297,10 +302,25 @@ export class SubscriptionManager { // Update the subscription checkpoint when a message is received // These checkpoints are used to resume subscriptions after disconnects - private updateSubscriptionCheckpoint< + private handleSubscriptionData< T extends SdkTransport.SubscriptionTopic = SdkTransport.SubscriptionTopic, - >(sub: SubscriptionInfo, message: SubscriptionDataMessage) { - // TODO: Will be implemented with each subscription topic + >(message: SubscriptionDataMessage) { + // Get the subscription + const sub = this.subscriptions.find(sub => sub.remoteId === message.id) + if (!sub) { + throw new Error(`No subscription found for id ${message.id}`) + } + + // Send data to the subscriber + sub.subscriber.onData(message.data) + } + + private getHandler(topic: string) { + const handler = this.handlers[topic] + if (!handler) { + throw new Error(`No handler found for topic ${topic}`) + } + return handler } /**