From f8c4a606b7354daff0390c963863c7d3a9c489f9 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Wed, 8 Nov 2023 20:46:15 -0800 Subject: [PATCH] [WIP] Event streaming --- package-lock.json | 188 ++++++++---------- .../sdk/src/build/build-subscribe-events.js | 23 +++ .../src/build/build-subscribe-events.test.js | 26 +++ packages/sdk/src/build/build-subscription.js | 8 + .../sdk/src/build/build-subscription.test.js | 20 ++ packages/sdk/src/decode/decode.js | 2 + packages/sdk/src/interaction/interaction.js | 116 ++++++----- packages/sdk/src/response/response.js | 3 +- packages/transport-http/src/http-request.js | 9 +- packages/transport-http/src/send-http.js | 3 + .../src/send-subscribe-events.js | 73 +++++++ .../src/send-subscribe-events.test.js | 102 ++++++++++ packages/transport-http/src/subscribe-ws.js | 36 ++++ packages/transport-http/src/utils.js | 7 + 14 files changed, 450 insertions(+), 166 deletions(-) create mode 100644 packages/sdk/src/build/build-subscribe-events.js create mode 100644 packages/sdk/src/build/build-subscribe-events.test.js create mode 100644 packages/sdk/src/build/build-subscription.js create mode 100644 packages/sdk/src/build/build-subscription.test.js create mode 100644 packages/transport-http/src/send-subscribe-events.js create mode 100644 packages/transport-http/src/send-subscribe-events.test.js create mode 100644 packages/transport-http/src/subscribe-ws.js create mode 100644 packages/transport-http/src/utils.js diff --git a/package-lock.json b/package-lock.json index a99a790d8..a1597fc80 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19575,19 +19575,19 @@ }, "packages/config": { "name": "@onflow/config", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", - "@onflow/util-actor": "^1.3.0-typescript.0", - "@onflow/util-invariant": "^1.2.0-typescript.0", - "@onflow/util-logger": "^1.3.0-typescript.0", + "@onflow/util-actor": "^1.3.0", + "@onflow/util-invariant": "^1.2.0", + "@onflow/util-logger": "^1.3.0", "eslint": "^8.34.0", "eslint-plugin-jsdoc": "^40.0.0" }, "devDependencies": { "@babel/preset-typescript": "^7.22.11", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/estree": "^1.0.1", "@types/jest": "^29.5.4", "@typescript-eslint/eslint-plugin": "^6.5.0", @@ -19705,27 +19705,27 @@ }, "packages/fcl": { "name": "@onflow/fcl", - "version": "1.8.0-typescript.0", + "version": "1.8.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", - "@onflow/config": "^1.2.0-typescript.0", + "@onflow/config": "^1.2.0", "@onflow/interaction": "0.0.11", - "@onflow/rlp": "^1.2.0-typescript.0", - "@onflow/sdk": "^1.3.0-typescript.0", - "@onflow/types": "^1.2.0-typescript.0", - "@onflow/util-actor": "^1.3.0-typescript.0", - "@onflow/util-address": "^1.2.0-typescript.0", - "@onflow/util-invariant": "^1.2.0-typescript.0", - "@onflow/util-logger": "^1.3.0-typescript.0", + "@onflow/rlp": "^1.2.0", + "@onflow/sdk": "^1.3.0", + "@onflow/types": "^1.2.0", + "@onflow/util-actor": "^1.3.0", + "@onflow/util-address": "^1.2.0", + "@onflow/util-invariant": "^1.2.0", + "@onflow/util-logger": "^1.3.0", "@onflow/util-semver": "^1.0.0", - "@onflow/util-template": "^1.2.0-typescript.0", - "@onflow/util-uid": "^1.2.0-typescript.0", + "@onflow/util-template": "^1.2.0", + "@onflow/util-uid": "^1.2.0", "cross-fetch": "^3.1.6" }, "devDependencies": { - "@onflow/fcl-bundle": "^1.4.0-typescript.0", - "@onflow/typedefs": "^1.2.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", + "@onflow/typedefs": "^1.2.0", "@types/estree": "^1.0.1", "@types/node": "^18.13.0", "eslint": "^8.35.0", @@ -19761,7 +19761,7 @@ }, "packages/fcl-bundle": { "name": "@onflow/fcl-bundle", - "version": "1.4.0-typescript.0", + "version": "1.4.0", "license": "Apache-2.0", "dependencies": { "@babel/plugin-transform-runtime": "^7.18.2", @@ -19791,25 +19791,25 @@ }, "packages/fcl-wc": { "name": "@onflow/fcl-wc", - "version": "5.0.0-typescript.0", + "version": "5.0.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.9", - "@onflow/config": "^1.2.0-typescript.0", - "@onflow/util-invariant": "^1.2.0-typescript.0", - "@onflow/util-logger": "^1.3.0-typescript.0", + "@onflow/config": "^1.2.0", + "@onflow/util-invariant": "^1.2.0", + "@onflow/util-logger": "^1.3.0", "@walletconnect/modal": "^2.4.7", "@walletconnect/sign-client": "^2.8.1", "@walletconnect/types": "^2.8.1", "@walletconnect/utils": "^2.8.1" }, "devDependencies": { - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "better-sqlite3": "^7.6.2", "jest": "^29.5.0" }, "peerDependencies": { - "@onflow/fcl": "^1.8.0-typescript.0" + "@onflow/fcl": "^1.8.0" } }, "packages/fcl/node_modules/eslint-plugin-jsdoc": { @@ -20348,7 +20348,7 @@ }, "packages/rlp": { "name": "@onflow/rlp", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "MPL-2.0", "dependencies": { "@babel/runtime": "^7.18.6", @@ -20356,7 +20356,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.4.0", "@typescript-eslint/parser": "^6.4.0", @@ -20454,25 +20454,25 @@ }, "packages/sdk": { "name": "@onflow/sdk", - "version": "1.3.0-typescript.0", + "version": "1.3.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", - "@onflow/config": "^1.2.0-typescript.0", - "@onflow/rlp": "^1.2.0-typescript.0", - "@onflow/transport-http": "^1.8.0-typescript.0", - "@onflow/util-actor": "^1.3.0-typescript.0", - "@onflow/util-address": "^1.2.0-typescript.0", - "@onflow/util-invariant": "^1.2.0-typescript.0", - "@onflow/util-logger": "^1.3.0-typescript.0", - "@onflow/util-template": "^1.2.0-typescript.0", + "@onflow/config": "^1.2.0", + "@onflow/rlp": "^1.2.0", + "@onflow/transport-http": "^1.8.0", + "@onflow/util-actor": "^1.3.0", + "@onflow/util-address": "^1.2.0", + "@onflow/util-invariant": "^1.2.0", + "@onflow/util-logger": "^1.3.0", + "@onflow/util-template": "^1.2.0", "deepmerge": "^4.2.2", "sha3": "^2.1.4", "uuid": "^9.0.1" }, "devDependencies": { - "@onflow/fcl-bundle": "^1.4.0-typescript.0", - "@onflow/typedefs": "^1.2.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", + "@onflow/typedefs": "^1.2.0", "eslint": "^8.35.0", "eslint-plugin-jsdoc": "^40.0.1", "jest": "^29.5.0", @@ -20560,21 +20560,21 @@ }, "packages/transport-grpc": { "name": "@onflow/transport-grpc", - "version": "1.3.0-typescript.0", + "version": "1.3.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", "@improbable-eng/grpc-web": "^0.14.0", "@improbable-eng/grpc-web-node-http-transport": "^0.14.0", "@onflow/protobuf": "^1.2.1", - "@onflow/rlp": "^1.2.0-typescript.0", - "@onflow/util-address": "^1.2.0-typescript.0", - "@onflow/util-invariant": "^1.2.0-typescript.0", - "@onflow/util-template": "^1.2.0-typescript.0" + "@onflow/rlp": "^1.2.0", + "@onflow/util-address": "^1.2.0", + "@onflow/util-invariant": "^1.2.0", + "@onflow/util-template": "^1.2.0" }, "devDependencies": { - "@onflow/fcl-bundle": "^1.4.0-typescript.0", - "@onflow/sdk": "^1.3.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", + "@onflow/sdk": "^1.3.0", "jest": "^29.5.0" } }, @@ -20602,34 +20602,34 @@ }, "packages/transport-http": { "name": "@onflow/transport-http", - "version": "1.8.0-typescript.0", + "version": "1.8.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", - "@onflow/util-address": "^1.2.0-typescript.0", - "@onflow/util-invariant": "^1.2.0-typescript.0", - "@onflow/util-logger": "^1.3.0-typescript.0", - "@onflow/util-template": "^1.2.0-typescript.0", + "@onflow/util-address": "^1.2.0", + "@onflow/util-invariant": "^1.2.0", + "@onflow/util-logger": "^1.3.0", + "@onflow/util-template": "^1.2.0", "abort-controller": "^3.0.0", "cross-fetch": "^3.1.6" }, "devDependencies": { - "@onflow/fcl-bundle": "^1.4.0-typescript.0", - "@onflow/rlp": "^1.2.0-typescript.0", - "@onflow/sdk": "^1.3.0-typescript.0", - "@onflow/types": "^1.2.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", + "@onflow/rlp": "^1.2.0", + "@onflow/sdk": "^1.3.0", + "@onflow/types": "^1.2.0", "jest": "^29.5.0" } }, "packages/typedefs": { "name": "@onflow/typedefs", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6" }, "devDependencies": { - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/node": "^18.13.0", "eslint": "^8.33.0", "eslint-plugin-jsdoc": "^39.7.5", @@ -20652,15 +20652,15 @@ }, "packages/types": { "name": "@onflow/types", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", - "@onflow/util-logger": "^1.3.0-typescript.0" + "@onflow/util-logger": "^1.3.0" }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.4.0", "@typescript-eslint/parser": "^6.4.0", @@ -20682,21 +20682,6 @@ "node": ">=16" } }, - "packages/types/node_modules/@onflow/util-logger": { - "version": "1.3.0", - "license": "Apache-2.0", - "dependencies": { - "@babel/runtime": "^7.18.6" - }, - "peerDependencies": { - "@onflow/util-config": ">1.1.1" - }, - "peerDependenciesMeta": { - "@onflow/util-config": { - "optional": true - } - } - }, "packages/types/node_modules/comment-parser": { "version": "1.4.0", "dev": true, @@ -20773,7 +20758,7 @@ }, "packages/util-actor": { "name": "@onflow/util-actor", - "version": "1.3.0-typescript.0", + "version": "1.3.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", @@ -20781,7 +20766,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.4.0", "@typescript-eslint/parser": "^6.4.0", @@ -20879,15 +20864,15 @@ }, "packages/util-address": { "name": "@onflow/util-address", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6" }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", - "@onflow/types": "^1.2.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", + "@onflow/types": "^1.2.0", "@types/jest": "^29.5.3", "@types/node": "^18.13.0", "@typescript-eslint/eslint-plugin": "^6.4.0", @@ -20999,17 +20984,17 @@ }, "packages/util-encode-key": { "name": "@onflow/util-encode-key", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", - "@onflow/rlp": "^1.2.0-typescript.0", - "@onflow/util-invariant": "^1.2.0-typescript.0" + "@onflow/rlp": "^1.2.0", + "@onflow/util-invariant": "^1.2.0" }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", - "@onflow/types": "^1.2.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", + "@onflow/types": "^1.2.0", "@types/jest": "^29.5.3", "@types/node": "^18.13.0", "@typescript-eslint/eslint-plugin": "^6.4.0", @@ -21121,15 +21106,15 @@ }, "packages/util-invariant": { "name": "@onflow/util-invariant", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6" }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", - "@onflow/types": "^1.2.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", + "@onflow/types": "^1.2.0", "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.4.0", "@typescript-eslint/parser": "^6.4.0", @@ -21227,14 +21212,14 @@ }, "packages/util-logger": { "name": "@onflow/util-logger", - "version": "1.3.0-typescript.0", + "version": "1.3.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6" }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.4.0", "@typescript-eslint/parser": "^6.4.0", @@ -21352,15 +21337,15 @@ }, "packages/util-template": { "name": "@onflow/util-template", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6", - "@onflow/util-logger": "^1.3.0-typescript.0" + "@onflow/util-logger": "^1.3.0" }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.4.0", "@typescript-eslint/parser": "^6.4.0", @@ -21382,21 +21367,6 @@ "node": ">=16" } }, - "packages/util-template/node_modules/@onflow/util-logger": { - "version": "1.3.0", - "license": "Apache-2.0", - "dependencies": { - "@babel/runtime": "^7.18.6" - }, - "peerDependencies": { - "@onflow/util-config": ">1.1.1" - }, - "peerDependenciesMeta": { - "@onflow/util-config": { - "optional": true - } - } - }, "packages/util-template/node_modules/comment-parser": { "version": "1.4.0", "dev": true, @@ -21473,14 +21443,14 @@ }, "packages/util-uid": { "name": "@onflow/util-uid", - "version": "1.2.0-typescript.0", + "version": "1.2.0", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.18.6" }, "devDependencies": { "@babel/preset-typescript": "^7.22.5", - "@onflow/fcl-bundle": "^1.4.0-typescript.0", + "@onflow/fcl-bundle": "^1.4.0", "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.4.0", "@typescript-eslint/parser": "^6.4.0", diff --git a/packages/sdk/src/build/build-subscribe-events.js b/packages/sdk/src/build/build-subscribe-events.js new file mode 100644 index 000000000..1f21693c3 --- /dev/null +++ b/packages/sdk/src/build/build-subscribe-events.js @@ -0,0 +1,23 @@ +import {pipe, Ok, makeSubscribeEvents} from "../interaction/interaction.js" + +export function subscribeEvents({ + startBlockId, + startHeight, + eventTypes, + addresses, + contracts, + heartbeatInterval, +}) { + return pipe([ + makeSubscribeEvents, + ix => { + ix.subscribeEvents.startBlockId = startBlockId + ix.subscribeEvents.startHeight = startHeight + ix.subscribeEvents.eventTypes = eventTypes + ix.subscribeEvents.addresses = addresses + ix.subscribeEvents.contracts = contracts + ix.subscribeEvents.heartbeatInterval = heartbeatInterval + return Ok(ix) + }, + ]) +} diff --git a/packages/sdk/src/build/build-subscribe-events.test.js b/packages/sdk/src/build/build-subscribe-events.test.js new file mode 100644 index 000000000..3d4dec385 --- /dev/null +++ b/packages/sdk/src/build/build-subscribe-events.test.js @@ -0,0 +1,26 @@ +import {interaction} from "../interaction/interaction.js" +import {subscribeEvents} from "./build-subscribe-events.js" + +describe("Subscribe Events", () => { + test("Subscribe Events", async () => { + let ix = await subscribeEvents({ + startBlockId: "abc123", + startHeight: 1, + eventTypes: ["A.7e60df042a9c0868.FlowToken.TokensWithdrawn"], + addresses: ["0x1", "0x2"], + contracts: ["A.7e60df042a9c0868.FlowToken"], + heartbeatInterval: 1000, + })(interaction()) + + expect(ix.subscribeEvents.startBlockId).toBe("abc123") + expect(ix.subscribeEvents.startHeight).toBe(1) + expect(ix.subscribeEvents.eventTypes).toEqual([ + "A.7e60df042a9c0868.FlowToken.TokensWithdrawn", + ]) + expect(ix.subscribeEvents.addresses).toEqual(["0x1", "0x2"]) + expect(ix.subscribeEvents.contracts).toEqual([ + "A.7e60df042a9c0868.FlowToken", + ]) + expect(ix.subscribeEvents.heartbeatInterval).toBe(1000) + }) +}) diff --git a/packages/sdk/src/build/build-subscription.js b/packages/sdk/src/build/build-subscription.js new file mode 100644 index 000000000..63f65b8d8 --- /dev/null +++ b/packages/sdk/src/build/build-subscription.js @@ -0,0 +1,8 @@ +export function subscription({onData, onError, onComplete}) { + return ix => { + ix.subscription.onData = onData + ix.subscription.onError = onError + ix.subscription.onComplete = onComplete + return ix + } +} diff --git a/packages/sdk/src/build/build-subscription.test.js b/packages/sdk/src/build/build-subscription.test.js new file mode 100644 index 000000000..a13d65303 --- /dev/null +++ b/packages/sdk/src/build/build-subscription.test.js @@ -0,0 +1,20 @@ +import {interaction} from "../interaction/interaction.js" +import {subscription} from "./build-subscription.js" + +describe("Build Subscription", () => { + test("build subscription", async () => { + const onData = () => {} + const onError = () => {} + const onComplete = () => {} + + let ix = await subscription({ + onData, + onError, + onComplete, + })(interaction()) + + expect(ix.subscription.onData).toBe(onData) + expect(ix.subscription.onError).toBe(onError) + expect(ix.subscription.onComplete).toBe(onComplete) + }) +}) diff --git a/packages/sdk/src/decode/decode.js b/packages/sdk/src/decode/decode.js index 0d1199876..c005ea987 100644 --- a/packages/sdk/src/decode/decode.js +++ b/packages/sdk/src/decode/decode.js @@ -223,6 +223,8 @@ export const decodeResponse = async (response, customDecoders = {}) => { return { chainId: chainIdMap[response.networkParameters.chainId], } + } else if (response.unsubscribeCallback) { + return response.unsubscribeCallback } return null diff --git a/packages/sdk/src/interaction/interaction.js b/packages/sdk/src/interaction/interaction.js index 703ba18f6..518c5504c 100644 --- a/packages/sdk/src/interaction/interaction.js +++ b/packages/sdk/src/interaction/interaction.js @@ -7,6 +7,7 @@ export const TRANSACTION /* */ = "TRANSACTION" export const GET_TRANSACTION_STATUS /* */ = "GET_TRANSACTION_STATUS" export const GET_ACCOUNT /* */ = "GET_ACCOUNT" export const GET_EVENTS /* */ = "GET_EVENTS" +export const SUBSCRIBE_EVENTS /* */ = "SUBSCRIBE_EVENTS" export const PING /* */ = "PING" export const GET_TRANSACTION /* */ = "GET_TRANSACTION" export const GET_BLOCK /* */ = "GET_BLOCK" @@ -89,6 +90,14 @@ const IX = `{ "end":null, "blockIds":[] }, + "subscribeEvents": { + "startBlockId":null, + "startHeight":null, + "eventTypes":null, + "addresses":null, + "contracts":null, + "heartbeatInterval":null + }, "transaction": { "id":null }, @@ -102,6 +111,11 @@ const IX = `{ }, "collection": { "id":null + }, + "subscription": { + "onData":null, + "onError":null, + "onComplete":null } }` @@ -147,52 +161,54 @@ const prepAccountKeyId = acct => { } } -export const prepAccount = (acct, opts = {}) => ix => { - invariant( - typeof acct === "function" || typeof acct === "object", - "prepAccount must be passed an authorization function or an account object" - ) - invariant(opts.role != null, "Account must have a role") - - const ACCOUNT = JSON.parse(ACCT) - const role = opts.role - const tempId = uuidv4() - - if (acct.authorization && isFn(acct.authorization)) - acct = {resolve: acct.authorization} - if (!acct.authorization && isFn(acct)) acct = {resolve: acct} - - const resolve = acct.resolve - if (resolve) - acct.resolve = (acct, ...rest) => - [resolve, prepAccountKeyId].reduce( - async (d, fn) => fn(await d, ...rest), - acct - ) - acct = prepAccountKeyId(acct) - - ix.accounts[tempId] = { - ...ACCOUNT, - tempId, - ...acct, - role: { - ...ACCOUNT.role, - ...(typeof acct.role === "object" ? acct.role : {}), - [role]: true, - }, - } - - if (role === AUTHORIZER) { - ix.authorizations.push(tempId) - } else if (role === PAYER) { - ix.payer.push(tempId) - } else { - ix[role] = tempId +export const prepAccount = + (acct, opts = {}) => + ix => { + invariant( + typeof acct === "function" || typeof acct === "object", + "prepAccount must be passed an authorization function or an account object" + ) + invariant(opts.role != null, "Account must have a role") + + const ACCOUNT = JSON.parse(ACCT) + const role = opts.role + const tempId = uuidv4() + + if (acct.authorization && isFn(acct.authorization)) + acct = {resolve: acct.authorization} + if (!acct.authorization && isFn(acct)) acct = {resolve: acct} + + const resolve = acct.resolve + if (resolve) + acct.resolve = (acct, ...rest) => + [resolve, prepAccountKeyId].reduce( + async (d, fn) => fn(await d, ...rest), + acct + ) + acct = prepAccountKeyId(acct) + + ix.accounts[tempId] = { + ...ACCOUNT, + tempId, + ...acct, + role: { + ...ACCOUNT.role, + ...(typeof acct.role === "object" ? acct.role : {}), + [role]: true, + }, + } + + if (role === AUTHORIZER) { + ix.authorizations.push(tempId) + } else if (role === PAYER) { + ix.payer.push(tempId) + } else { + ix[role] = tempId + } + + return ix } - return ix -} - export const makeArgument = arg => ix => { let tempId = uuidv4() ix.message.arguments.push(tempId) @@ -217,6 +233,7 @@ export const makeGetTransactionStatus /* */ = makeIx(GET_TRANSACTION_STATUS) export const makeGetTransaction /* */ = makeIx(GET_TRANSACTION) export const makeGetAccount /* */ = makeIx(GET_ACCOUNT) export const makeGetEvents /* */ = makeIx(GET_EVENTS) +export const makeSubscribeEvents /* */ = makeIx(SUBSCRIBE_EVENTS) export const makePing /* */ = makeIx(PING) export const makeGetBlock /* */ = makeIx(GET_BLOCK) export const makeGetBlockHeader /* */ = makeIx(GET_BLOCK_HEADER) @@ -232,6 +249,7 @@ export const isGetTransactionStatus /* */ = is(GET_TRANSACTION_STATUS) export const isGetTransaction /* */ = is(GET_TRANSACTION) export const isGetAccount /* */ = is(GET_ACCOUNT) export const isGetEvents /* */ = is(GET_EVENTS) +export const isSubscribeEvents /* */ = is(SUBSCRIBE_EVENTS) export const isPing /* */ = is(PING) export const isGetBlock /* */ = is(GET_BLOCK) export const isGetBlockHeader /* */ = is(GET_BLOCK_HEADER) @@ -287,10 +305,12 @@ export const put = (key, value) => ix => { return Ok(ix) } -export const update = (key, fn = identity) => ix => { - ix.assigns[key] = fn(ix.assigns[key], ix) - return Ok(ix) -} +export const update = + (key, fn = identity) => + ix => { + ix.assigns[key] = fn(ix.assigns[key], ix) + return Ok(ix) + } export const destroy = key => ix => { delete ix.assigns[key] diff --git a/packages/sdk/src/response/response.js b/packages/sdk/src/response/response.js index e5e19e385..2d5a774c0 100644 --- a/packages/sdk/src/response/response.js +++ b/packages/sdk/src/response/response.js @@ -10,7 +10,8 @@ const DEFAULT_RESPONSE = `{ "blockHeader":null, "latestBlock":null, "collection":null, - "networkParameters":null + "networkParameters":null, + "unsubscribeCallback":null, }` export const response = () => JSON.parse(DEFAULT_RESPONSE) diff --git a/packages/transport-http/src/http-request.js b/packages/transport-http/src/http-request.js index 04805d20a..397d1d841 100644 --- a/packages/transport-http/src/http-request.js +++ b/packages/transport-http/src/http-request.js @@ -1,5 +1,6 @@ import * as logger from "@onflow/util-logger" import fetchTransport from "cross-fetch" +import {safeParseJSON} from "./utils" const AbortController = globalThis.AbortController || require("abort-controller") @@ -161,11 +162,3 @@ export async function httpRequest({ // Keep retrying request until server available or max attempts exceeded return await requestLoop() } - -function safeParseJSON(data) { - try { - return JSON.parse(data) - } catch { - return null - } -} diff --git a/packages/transport-http/src/send-http.js b/packages/transport-http/src/send-http.js index ce26a5acf..f4334f88e 100644 --- a/packages/transport-http/src/send-http.js +++ b/packages/transport-http/src/send-http.js @@ -5,6 +5,7 @@ import {sendGetTransaction} from "./send-get-transaction.js" import {sendExecuteScript} from "./send-execute-script.js" import {sendGetAccount} from "./send-get-account.js" import {sendGetEvents} from "./send-get-events.js" +import {sendSubscribeEvents} from "./send-subscribe-events.js" import {sendGetBlock} from "./send-get-block.js" import {sendGetBlockHeader} from "./send-get-block-header.js" import {sendGetCollection} from "./send-get-collection.js" @@ -34,6 +35,8 @@ export const send = async (ix, context = {}, opts = {}) => { return opts.sendGetAccount ? opts.sendGetAccount(ix, context, opts) : sendGetAccount(ix, context, opts) case context.ix.isGetEvents(ix): return opts.sendGetEvents ? opts.sendGetEvents(ix, context, opts) : sendGetEvents(ix, context, opts) + case context.ix.isSubscribeEvents(ix): + return opts.sendSubscribeEvents ? opts.sendSubscribeEvents(ix, context, opts) : sendSubscribeEvents(ix, context, opts) case context.ix.isGetBlock(ix): return opts.sendGetBlock ? opts.sendGetBlock(ix, context, opts) : sendGetBlock(ix, context, opts) case context.ix.isGetBlockHeader(ix): diff --git a/packages/transport-http/src/send-subscribe-events.js b/packages/transport-http/src/send-subscribe-events.js new file mode 100644 index 000000000..4e3f27aad --- /dev/null +++ b/packages/transport-http/src/send-subscribe-events.js @@ -0,0 +1,73 @@ +import {invariant} from "@onflow/util-invariant" +import {subscribeWs as defaultSubscribeWs} from "./subscribe-ws" + +function constructData(ix, context, data) { + let ret = context.response() + ret.tag = ix.tag + + ret.events = data.events + ? data.events.map(event => ({ + blockId: data.BlockId, + blockHeight: Number(data.Height), + blockTimestamp: data.Timestamp, + type: event.Type, + transactionId: event.TransactionId, + transactionIndex: Number(event.TransactionIndex), + eventIndex: Number(event.EventIndex), + payload: JSON.parse( + context.Buffer.from(event.Payload, "base64").toString() + ), + })) + : [] + + return ret +} + +function constructResponse(ix, context, callback) { + let ret = context.response() + ret.tag = ix.tag + + ret.unsubscribeCallback = callback + + return ret +} + +export async function sendSubscribeEvents(ix, context = {}, opts = {}) { + invariant(opts.node, `SDK Send Get Events Error: opts.node must be defined.`) + invariant( + context.response, + `SDK Send Get Events Error: context.response must be defined.` + ) + invariant( + context.Buffer, + `SDK Send Get Events Error: context.Buffer must be defined.` + ) + + ix = await ix + + const subscribeWs = opts.subscribeWs || defaultSubscribeWs + const {onData, onError, onComplete} = ix.subscription + + const unsubscribe = subscribeWs({ + hostname: opts.node, + path: `/v1/subscribe_events`, + params: { + start_block_id: ix.subscribeEvents.startBlockId, + start_height: ix.subscribeEvents.startHeight, + event_types: ix.subscribeEvents.eventTypes, + addresses: ix.subscribeEvents.addresses, + contracts: ix.subscribeEvents.contracts, + heartbeat_interval: ix.subscribeEvents.heartbeatInterval, + }, + onData: handleData, + onError, + onComplete, + }) + + function handleData(data) { + const resp = constructData(ix, context, data) + onData(resp) + } + + return constructResponse(ix, context, unsubscribe) +} diff --git a/packages/transport-http/src/send-subscribe-events.test.js b/packages/transport-http/src/send-subscribe-events.test.js new file mode 100644 index 000000000..078db3d08 --- /dev/null +++ b/packages/transport-http/src/send-subscribe-events.test.js @@ -0,0 +1,102 @@ +import {sendSubscribeEvents} from "./send-subscribe-events" +import {Buffer} from "@onflow/rlp" +import { + build, + subscribeEvents, + subscription, + resolve, + response as responseADT, +} from "@onflow/sdk" + +describe("Subscribe Events", () => { + let subscribeWsMock + + let onData + let onError + let onComplete + + let unsubscribe + let response + + beforeEach(async () => { + subscribeWsMock = jest.fn() + + onData = jest.fn() + onError = jest.fn() + onComplete = jest.fn() + + unsubscribe = jest.fn() + + subscribeWsMock.mockReturnValue({ + unsubscribeCallback: unsubscribe, + }) + + const response = await sendSubscribeEvents( + await resolve( + await build([ + subscribeEvents({ + startBlockId: "abc123", + startHeight: 1, + eventTypes: ["A.7e60df042a9c0868.FlowToken.TokensWithdrawn"], + addresses: ["0x1", "0x2"], + contracts: ["A.7e60df042a9c0868.FlowToken"], + heartbeatInterval: 1000, + }), + subscription({ + onData, + onError, + onComplete, + }), + ]) + ), + { + response: responseADT, + Buffer, + }, + { + subscribeWs: subscribeWsMock, + node: "localhost", + } + ) + }) + + test("calls subscribeWs with correct params", async () => { + expect(subscribeWsMock.mock.calls.length).toEqual(1) + + const httpRequestMockArgs = httpRequestMock.mock.calls[0] + + expect(httpRequestMockArgs.length).toEqual(1) + + const valueSent = httpRequestMock.mock.calls[0][0] + + expect(valueSent.hostname).toBe("localhost") + expect(valueSent.path).toBe("/v1/subscribe_events") + expect(valueSent.params).toEqual({ + start_block_id: "abc123", + start_height: 1, + event_types: ["A.7e60df042a9c0868.FlowToken.TokensWithdrawn"], + addresses: ["0x1", "0x2"], + contracts: ["A.7e60df042a9c0868.FlowToken"], + heartbeat_interval: 1000, + }) + }) + + test("calls onData with response ADT formatted data", async () => { + const data = {foo: "bar"} + + // Similate WS message + subscribeWsMock.mock.calls[0][0].onData(data) + + expect(onData.mock.calls.length).toEqual(1) + expect(onData.mock.calls[0][0]).toEqual({ + tag: "SubscribeEvents", + data: data, + }) + }) + + test("calling response.unsubscribeCallback calls subscribeWs unsubscribe", async () => { + response.unsubscribeCallback() + + expect(unsubscribe.mock.calls.length).toEqual(1) + }) +}) diff --git a/packages/transport-http/src/subscribe-ws.js b/packages/transport-http/src/subscribe-ws.js new file mode 100644 index 000000000..bd6ce893f --- /dev/null +++ b/packages/transport-http/src/subscribe-ws.js @@ -0,0 +1,36 @@ +import {safeParseJSON} from "./utils" + +// TODO: Implement retries +export function subscribeWs({ + hostname, + path, + params, + onData, + onClose, + onError, + retryLimit = 5, + retryIntervalMs = 1000, +}) { + const url = new URL(path, hostname) + url.search = new URLSearchParams(params) + var ws = new WebSocket(url) + + ws.onmessage = function (e) { + const data = safeParseJSON(e.data) + if (data) { + onData(data) + } + } + + ws.onclose = function (e) { + onClose(e) + } + + ws.onerror = function (err) { + onError(err) + } + + return () => { + ws.close() + } +} diff --git a/packages/transport-http/src/utils.js b/packages/transport-http/src/utils.js new file mode 100644 index 000000000..75fc84ace --- /dev/null +++ b/packages/transport-http/src/utils.js @@ -0,0 +1,7 @@ +export function safeParseJSON(data) { + try { + return JSON.parse(data) + } catch { + return null + } +}