From 711907739073ad4c4e92c50ca019da15f0090659 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 25 Nov 2024 15:35:32 -0800 Subject: [PATCH 1/8] Add transport-http subscribe function --- package-lock.json | 105 ++++--- packages/transport-http/package.json | 14 +- packages/transport-http/src/index.ts | 17 ++ packages/transport-http/src/sdk-send-http.ts | 16 -- .../src/{ => send}/combine-urls.test.ts | 0 .../src/{ => send}/combine-urls.ts | 0 .../connect-subscribe-events.test.ts | 0 .../{ => send}/connect-subscribe-events.ts | 0 .../src/{ => send}/connect-ws.test.ts | 2 +- .../src/{ => send}/connect-ws.ts | 2 +- .../src/{ => send}/http-request.js | 0 .../src/{ => send}/http-request.test.js | 0 .../src/{ => send}/send-execute-script.js | 0 .../{ => send}/send-execute-script.test.js | 0 .../src/{ => send}/send-get-account.js | 0 .../src/{ => send}/send-get-account.test.js | 0 .../src/{ => send}/send-get-block-header.js | 0 .../{ => send}/send-get-block-header.test.js | 0 .../src/{ => send}/send-get-block.js | 0 .../src/{ => send}/send-get-block.test.js | 0 .../src/{ => send}/send-get-collection.js | 0 .../{ => send}/send-get-collection.test.js | 0 .../src/{ => send}/send-get-events.js | 0 .../src/{ => send}/send-get-events.test.js | 0 .../{ => send}/send-get-network-parameters.js | 0 .../send-get-network-parameters.test.js | 0 .../send-get-node-version-info.test.ts | 0 .../{ => send}/send-get-node-version-info.ts | 0 .../{ => send}/send-get-transaction-status.js | 0 .../send-get-transaction-status.test.js | 0 .../src/{ => send}/send-get-transaction.js | 0 .../{ => send}/send-get-transaction.test.js | 0 .../src/{ => send}/send-http.ts | 2 +- .../src/{ => send}/send-ping.test.ts | 0 .../src/{ => send}/send-ping.ts | 0 .../src/{ => send}/send-transaction.js | 0 .../src/{ => send}/send-transaction.test.js | 0 .../transport-http/src/{ => send}/utils.js | 0 .../transport-http/src/subscribe/models.ts | 66 +++++ .../transport-http/src/subscribe/subscribe.ts | 34 +++ .../subscribe/subscription-manager.test.ts | 265 +++++++++++++++++ .../src/subscribe/subscription-manager.ts | 272 ++++++++++++++++++ .../src/{ => subscribe}/websocket.ts | 5 +- packages/transport-http/src/transport.ts | 8 + packages/typedefs/src/index.ts | 1 + packages/typedefs/src/sdk-transport/index.ts | 16 ++ .../typedefs/src/sdk-transport/requests.ts | 85 ++++++ .../src/sdk-transport/subscriptions.ts | 51 ++++ 48 files changed, 892 insertions(+), 69 deletions(-) create mode 100644 packages/transport-http/src/index.ts delete mode 100644 packages/transport-http/src/sdk-send-http.ts rename packages/transport-http/src/{ => send}/combine-urls.test.ts (100%) rename packages/transport-http/src/{ => send}/combine-urls.ts (100%) rename packages/transport-http/src/{ => send}/connect-subscribe-events.test.ts (100%) rename packages/transport-http/src/{ => send}/connect-subscribe-events.ts (100%) rename packages/transport-http/src/{ => send}/connect-ws.test.ts (99%) rename packages/transport-http/src/{ => send}/connect-ws.ts (98%) rename packages/transport-http/src/{ => send}/http-request.js (100%) rename packages/transport-http/src/{ => send}/http-request.test.js (100%) rename packages/transport-http/src/{ => send}/send-execute-script.js (100%) rename packages/transport-http/src/{ => send}/send-execute-script.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-account.js (100%) rename packages/transport-http/src/{ => send}/send-get-account.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-block-header.js (100%) rename packages/transport-http/src/{ => send}/send-get-block-header.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-block.js (100%) rename packages/transport-http/src/{ => send}/send-get-block.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-collection.js (100%) rename packages/transport-http/src/{ => send}/send-get-collection.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-events.js (100%) rename packages/transport-http/src/{ => send}/send-get-events.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-network-parameters.js (100%) rename packages/transport-http/src/{ => send}/send-get-network-parameters.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-node-version-info.test.ts (100%) rename packages/transport-http/src/{ => send}/send-get-node-version-info.ts (100%) rename packages/transport-http/src/{ => send}/send-get-transaction-status.js (100%) rename packages/transport-http/src/{ => send}/send-get-transaction-status.test.js (100%) rename packages/transport-http/src/{ => send}/send-get-transaction.js (100%) rename packages/transport-http/src/{ => send}/send-get-transaction.test.js (100%) rename packages/transport-http/src/{ => send}/send-http.ts (98%) rename packages/transport-http/src/{ => send}/send-ping.test.ts (100%) rename packages/transport-http/src/{ => send}/send-ping.ts (100%) rename packages/transport-http/src/{ => send}/send-transaction.js (100%) rename packages/transport-http/src/{ => send}/send-transaction.test.js (100%) rename packages/transport-http/src/{ => send}/utils.js (100%) create mode 100644 packages/transport-http/src/subscribe/models.ts create mode 100644 packages/transport-http/src/subscribe/subscribe.ts create mode 100644 packages/transport-http/src/subscribe/subscription-manager.test.ts create mode 100644 packages/transport-http/src/subscribe/subscription-manager.ts rename packages/transport-http/src/{ => subscribe}/websocket.ts (58%) create mode 100644 packages/transport-http/src/transport.ts create mode 100644 packages/typedefs/src/sdk-transport/index.ts create mode 100644 packages/typedefs/src/sdk-transport/requests.ts create mode 100644 packages/typedefs/src/sdk-transport/subscriptions.ts diff --git a/package-lock.json b/package-lock.json index 641745064..066cef7fb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18893,6 +18893,16 @@ "node": ">=8" } }, + "node_modules/jest-websocket-mock": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/jest-websocket-mock/-/jest-websocket-mock-2.5.0.tgz", + "integrity": "sha512-a+UJGfowNIWvtIKIQBHoEWIUqRxxQHFx4CXT+R5KxxKBtEQ5rS3pPOV/5299sHzqbmeCzxxY5qE4+yfXePePig==", + "dev": true, + "dependencies": { + "jest-diff": "^29.2.0", + "mock-socket": "^9.3.0" + } + }, "node_modules/jest-worker": { "version": "29.7.0", "dev": true, @@ -21658,6 +21668,15 @@ "ufo": "^1.5.4" } }, + "node_modules/mock-socket": { + "version": "9.3.1", + "resolved": "https://registry.npmjs.org/mock-socket/-/mock-socket-9.3.1.tgz", + "integrity": "sha512-qxBgB7Qa2sEQgHFjj0dSigq7fX4k6Saisd5Nelwp2q8mlbAFh5dHV9JTTlF8viYJLSSWgMCZFUom8PJcMNBoJw==", + "dev": true, + "engines": { + "node": ">= 8" + } + }, "node_modules/modify-values": { "version": "1.0.1", "dev": true, @@ -29023,7 +29042,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/estree": "^1.0.6", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", @@ -29048,16 +29067,16 @@ }, "packages/fcl": { "name": "@onflow/fcl", - "version": "1.13.0-alpha.6", + "version": "1.13.0-alpha.7", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.25.7", "@onflow/config": "1.5.1-alpha.0", - "@onflow/fcl-core": "1.13.0-alpha.4", - "@onflow/fcl-wc": "5.4.1-alpha.4", + "@onflow/fcl-core": "1.13.0-alpha.5", + "@onflow/fcl-wc": "5.5.0-alpha.5", "@onflow/interaction": "0.0.11", "@onflow/rlp": "1.2.3-alpha.0", - "@onflow/sdk": "1.5.4-alpha.1", + "@onflow/sdk": "1.5.4-alpha.2", "@onflow/types": "1.4.1-alpha.0", "@onflow/util-actor": "1.3.4-alpha.0", "@onflow/util-address": "1.2.3-alpha.0", @@ -29074,8 +29093,8 @@ "sha3": "^2.1.4" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", - "@onflow/typedefs": "1.4.0-alpha.1", + "@onflow/fcl-bundle": "1.6.0-alpha.1", + "@onflow/typedefs": "1.4.0-alpha.2", "@types/estree": "^1.0.6", "@types/jest": "^29.5.13", "@types/node": "^18.19.57", @@ -29088,7 +29107,7 @@ }, "packages/fcl-bundle": { "name": "@onflow/fcl-bundle", - "version": "1.5.1-alpha.0", + "version": "1.6.0-alpha.1", "license": "Apache-2.0", "dependencies": { "@babel/plugin-transform-runtime": "^7.25.7", @@ -29122,7 +29141,7 @@ }, "packages/fcl-core": { "name": "@onflow/fcl-core", - "version": "1.13.0-alpha.4", + "version": "1.13.0-alpha.5", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.25.7", @@ -29130,7 +29149,7 @@ "@onflow/config": "1.5.1-alpha.0", "@onflow/interaction": "0.0.11", "@onflow/rlp": "1.2.3-alpha.0", - "@onflow/sdk": "1.5.4-alpha.1", + "@onflow/sdk": "1.5.4-alpha.2", "@onflow/transport-http": "1.10.3-alpha.0", "@onflow/types": "1.4.1-alpha.0", "@onflow/util-actor": "1.3.4-alpha.0", @@ -29144,8 +29163,8 @@ "cross-fetch": "^3.1.8" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", - "@onflow/typedefs": "1.4.0-alpha.1", + "@onflow/fcl-bundle": "1.6.0-alpha.1", + "@onflow/typedefs": "1.4.0-alpha.2", "@types/estree": "^1.0.6", "@types/jest": "^29.5.13", "@types/node": "^18.19.57", @@ -29170,15 +29189,15 @@ }, "packages/fcl-react-native": { "name": "@onflow/fcl-react-native", - "version": "1.9.7-alpha.4", + "version": "1.9.7-alpha.5", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.25.7", "@onflow/config": "1.5.1-alpha.0", - "@onflow/fcl-core": "1.13.0-alpha.4", + "@onflow/fcl-core": "1.13.0-alpha.5", "@onflow/interaction": "0.0.11", "@onflow/rlp": "1.2.3-alpha.0", - "@onflow/sdk": "1.5.4-alpha.1", + "@onflow/sdk": "1.5.4-alpha.2", "@onflow/types": "1.4.1-alpha.0", "@onflow/util-actor": "1.3.4-alpha.0", "@onflow/util-address": "1.2.3-alpha.0", @@ -29190,8 +29209,8 @@ "cross-fetch": "^3.1.8" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", - "@onflow/typedefs": "1.4.0-alpha.1", + "@onflow/fcl-bundle": "1.6.0-alpha.1", + "@onflow/typedefs": "1.4.0-alpha.2", "@types/estree": "^1.0.6", "@types/node": "^18.19.57", "eslint": "^8.57.1", @@ -29222,7 +29241,7 @@ }, "packages/fcl-wc": { "name": "@onflow/fcl-wc", - "version": "5.4.1-alpha.4", + "version": "5.5.0-alpha.5", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.25.7", @@ -29241,8 +29260,8 @@ "devDependencies": { "@babel/plugin-transform-react-jsx": "^7.25.9", "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", - "@onflow/typedefs": "^1.4.0-alpha.1", + "@onflow/fcl-bundle": "1.6.0-alpha.1", + "@onflow/typedefs": "^1.4.0-alpha.2", "autoprefixer": "^10.4.20", "eslint": "^8.57.1", "eslint-plugin-jsdoc": "^46.10.1", @@ -29250,7 +29269,7 @@ "jest-preset-preact": "^4.1.1" }, "peerDependencies": { - "@onflow/fcl-core": "1.13.0-alpha.4" + "@onflow/fcl-core": "1.13.0-alpha.5" } }, "packages/fcl/node_modules/typescript": { @@ -29322,7 +29341,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", "@typescript-eslint/parser": "^6.21.0", @@ -29333,14 +29352,14 @@ }, "packages/sdk": { "name": "@onflow/sdk", - "version": "1.5.4-alpha.1", + "version": "1.5.4-alpha.2", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.25.7", "@onflow/config": "1.5.1-alpha.0", "@onflow/rlp": "1.2.3-alpha.0", "@onflow/transport-http": "1.10.3-alpha.0", - "@onflow/typedefs": "1.4.0-alpha.1", + "@onflow/typedefs": "1.4.0-alpha.2", "@onflow/util-actor": "1.3.4-alpha.0", "@onflow/util-address": "1.2.3-alpha.0", "@onflow/util-invariant": "1.2.4-alpha.0", @@ -29352,7 +29371,7 @@ "uuid": "^9.0.1" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/uuid": "^9.0.8", "eslint": "^8.57.1", "eslint-plugin-jsdoc": "^46.10.1", @@ -29387,8 +29406,8 @@ "@onflow/util-template": "1.2.3-alpha.0" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", - "@onflow/sdk": "1.5.4-alpha.1", + "@onflow/fcl-bundle": "1.6.0-alpha.1", + "@onflow/sdk": "1.5.4-alpha.2", "jest": "^29.7.0" } }, @@ -29409,22 +29428,24 @@ "ws": "^8.18.0" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@onflow/rlp": "1.2.3-alpha.0", - "@onflow/sdk": "1.5.4-alpha.1", + "@onflow/sdk": "1.5.4-alpha.2", "@onflow/types": "1.4.1-alpha.0", - "jest": "^29.7.0" + "jest": "^29.7.0", + "jest-websocket-mock": "^2.5.0", + "mock-socket": "^9.3.1" } }, "packages/typedefs": { "name": "@onflow/typedefs", - "version": "1.4.0-alpha.1", + "version": "1.4.0-alpha.2", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.25.7" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/node": "^18.19.57", "eslint": "^8.57.1", "eslint-plugin-jsdoc": "^46.10.1", @@ -29455,7 +29476,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", "@typescript-eslint/parser": "^6.21.0", @@ -29474,7 +29495,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", "@typescript-eslint/parser": "^6.21.0", @@ -29492,7 +29513,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@onflow/types": "1.4.1-alpha.0", "@types/jest": "^29.5.13", "@types/node": "^18.19.57", @@ -29527,7 +29548,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@onflow/types": "1.4.1-alpha.0", "@types/jest": "^29.5.13", "@types/node": "^18.19.57", @@ -29560,7 +29581,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@onflow/types": "1.4.1-alpha.0", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", @@ -29579,7 +29600,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", "@typescript-eslint/parser": "^6.21.0", @@ -29605,7 +29626,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", "@typescript-eslint/parser": "^6.21.0", @@ -29622,7 +29643,7 @@ "@babel/runtime": "^7.25.7" }, "devDependencies": { - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "jest": "^29.7.0" } }, @@ -29636,7 +29657,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", "@typescript-eslint/parser": "^6.21.0", @@ -29654,7 +29675,7 @@ }, "devDependencies": { "@babel/preset-typescript": "^7.25.7", - "@onflow/fcl-bundle": "1.5.1-alpha.0", + "@onflow/fcl-bundle": "1.6.0-alpha.1", "@types/jest": "^29.5.13", "@typescript-eslint/eslint-plugin": "^6.21.0", "@typescript-eslint/parser": "^6.21.0", diff --git a/packages/transport-http/package.json b/packages/transport-http/package.json index 4cca54899..49a31549e 100644 --- a/packages/transport-http/package.json +++ b/packages/transport-http/package.json @@ -17,13 +17,15 @@ "@onflow/rlp": "1.2.3-alpha.0", "@onflow/sdk": "1.5.4-alpha.2", "@onflow/types": "1.4.1-alpha.0", - "jest": "^29.7.0" + "jest": "^29.7.0", + "jest-websocket-mock": "^2.5.0", + "mock-socket": "^9.3.1" }, - "source": "src/sdk-send-http.ts", - "main": "dist/sdk-send-http.js", - "module": "dist/sdk-send-http.module.js", - "unpkg": "dist/sdk-send-http.umd.js", - "types": "types/sdk-send-http.d.ts", + "source": "src/index.ts", + "main": "dist/index.js", + "module": "dist/index.module.js", + "unpkg": "dist/index.umd.js", + "types": "types/index.d.ts", "scripts": { "alpha": "npm publish --tag alpha", "prepublishOnly": "npm test && npm run build", diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts new file mode 100644 index 000000000..bbabf100b --- /dev/null +++ b/packages/transport-http/src/index.ts @@ -0,0 +1,17 @@ +export {sendExecuteScript} from "./send/send-execute-script" +export {sendGetAccount} from "./send/send-get-account" +export {sendGetBlockHeader} from "./send/send-get-block-header" +export {sendGetBlock} from "./send/send-get-block" +export {sendGetCollection} from "./send/send-get-collection" +export {sendGetEvents} from "./send/send-get-events" +export {sendGetTransaction} from "./send/send-get-transaction" +export {sendGetTransactionStatus} from "./send/send-get-transaction-status" +export {sendPing} from "./send/send-ping" +export {sendTransaction} from "./send/send-transaction" +export {sendGetNetworkParameters} from "./send/send-get-network-parameters" +export {sendGetNodeVersionInfo} from "./send/send-get-node-version-info" +export {connectSubscribeEvents} from "./send/connect-subscribe-events" +export {send} from "./send/send-http" +export {WebsocketError} from "./send/connect-ws" +export {HTTPRequestError} from "./send/http-request.js" +export {httpTransport} from "./transport" diff --git a/packages/transport-http/src/sdk-send-http.ts b/packages/transport-http/src/sdk-send-http.ts deleted file mode 100644 index 08a5f76df..000000000 --- a/packages/transport-http/src/sdk-send-http.ts +++ /dev/null @@ -1,16 +0,0 @@ -export {sendExecuteScript} from "./send-execute-script" -export {sendGetAccount} from "./send-get-account" -export {sendGetBlockHeader} from "./send-get-block-header" -export {sendGetBlock} from "./send-get-block" -export {sendGetCollection} from "./send-get-collection" -export {sendGetEvents} from "./send-get-events" -export {sendGetTransaction} from "./send-get-transaction" -export {sendGetTransactionStatus} from "./send-get-transaction-status" -export {sendPing} from "./send-ping" -export {sendTransaction} from "./send-transaction" -export {sendGetNetworkParameters} from "./send-get-network-parameters" -export {sendGetNodeVersionInfo} from "./send-get-node-version-info" -export {connectSubscribeEvents} from "./connect-subscribe-events" -export {send} from "./send-http" -export {WebsocketError} from "./connect-ws" -export {HTTPRequestError} from "./http-request.js" diff --git a/packages/transport-http/src/combine-urls.test.ts b/packages/transport-http/src/send/combine-urls.test.ts similarity index 100% rename from packages/transport-http/src/combine-urls.test.ts rename to packages/transport-http/src/send/combine-urls.test.ts diff --git a/packages/transport-http/src/combine-urls.ts b/packages/transport-http/src/send/combine-urls.ts similarity index 100% rename from packages/transport-http/src/combine-urls.ts rename to packages/transport-http/src/send/combine-urls.ts diff --git a/packages/transport-http/src/connect-subscribe-events.test.ts b/packages/transport-http/src/send/connect-subscribe-events.test.ts similarity index 100% rename from packages/transport-http/src/connect-subscribe-events.test.ts rename to packages/transport-http/src/send/connect-subscribe-events.test.ts diff --git a/packages/transport-http/src/connect-subscribe-events.ts b/packages/transport-http/src/send/connect-subscribe-events.ts similarity index 100% rename from packages/transport-http/src/connect-subscribe-events.ts rename to packages/transport-http/src/send/connect-subscribe-events.ts diff --git a/packages/transport-http/src/connect-ws.test.ts b/packages/transport-http/src/send/connect-ws.test.ts similarity index 99% rename from packages/transport-http/src/connect-ws.test.ts rename to packages/transport-http/src/send/connect-ws.test.ts index a7218e9a1..c2cc19948 100644 --- a/packages/transport-http/src/connect-ws.test.ts +++ b/packages/transport-http/src/send/connect-ws.test.ts @@ -1,5 +1,5 @@ import {buildConnectionUrl, connectWs} from "./connect-ws" -import * as WebSocketModule from "./websocket" +import * as WebSocketModule from "../subscribe/websocket" describe("connectWs", () => { describe("buildConnectionUrl", () => { diff --git a/packages/transport-http/src/connect-ws.ts b/packages/transport-http/src/send/connect-ws.ts similarity index 98% rename from packages/transport-http/src/connect-ws.ts rename to packages/transport-http/src/send/connect-ws.ts index b9f373791..a58d6c672 100644 --- a/packages/transport-http/src/connect-ws.ts +++ b/packages/transport-http/src/send/connect-ws.ts @@ -1,7 +1,7 @@ import {EventEmitter} from "events" import {safeParseJSON} from "./utils" import {StreamConnection} from "@onflow/typedefs" -import {WebSocket} from "./websocket" +import {WebSocket} from "../subscribe/websocket" export class WebsocketError extends Error { code?: number diff --git a/packages/transport-http/src/http-request.js b/packages/transport-http/src/send/http-request.js similarity index 100% rename from packages/transport-http/src/http-request.js rename to packages/transport-http/src/send/http-request.js diff --git a/packages/transport-http/src/http-request.test.js b/packages/transport-http/src/send/http-request.test.js similarity index 100% rename from packages/transport-http/src/http-request.test.js rename to packages/transport-http/src/send/http-request.test.js diff --git a/packages/transport-http/src/send-execute-script.js b/packages/transport-http/src/send/send-execute-script.js similarity index 100% rename from packages/transport-http/src/send-execute-script.js rename to packages/transport-http/src/send/send-execute-script.js diff --git a/packages/transport-http/src/send-execute-script.test.js b/packages/transport-http/src/send/send-execute-script.test.js similarity index 100% rename from packages/transport-http/src/send-execute-script.test.js rename to packages/transport-http/src/send/send-execute-script.test.js diff --git a/packages/transport-http/src/send-get-account.js b/packages/transport-http/src/send/send-get-account.js similarity index 100% rename from packages/transport-http/src/send-get-account.js rename to packages/transport-http/src/send/send-get-account.js diff --git a/packages/transport-http/src/send-get-account.test.js b/packages/transport-http/src/send/send-get-account.test.js similarity index 100% rename from packages/transport-http/src/send-get-account.test.js rename to packages/transport-http/src/send/send-get-account.test.js diff --git a/packages/transport-http/src/send-get-block-header.js b/packages/transport-http/src/send/send-get-block-header.js similarity index 100% rename from packages/transport-http/src/send-get-block-header.js rename to packages/transport-http/src/send/send-get-block-header.js diff --git a/packages/transport-http/src/send-get-block-header.test.js b/packages/transport-http/src/send/send-get-block-header.test.js similarity index 100% rename from packages/transport-http/src/send-get-block-header.test.js rename to packages/transport-http/src/send/send-get-block-header.test.js diff --git a/packages/transport-http/src/send-get-block.js b/packages/transport-http/src/send/send-get-block.js similarity index 100% rename from packages/transport-http/src/send-get-block.js rename to packages/transport-http/src/send/send-get-block.js diff --git a/packages/transport-http/src/send-get-block.test.js b/packages/transport-http/src/send/send-get-block.test.js similarity index 100% rename from packages/transport-http/src/send-get-block.test.js rename to packages/transport-http/src/send/send-get-block.test.js diff --git a/packages/transport-http/src/send-get-collection.js b/packages/transport-http/src/send/send-get-collection.js similarity index 100% rename from packages/transport-http/src/send-get-collection.js rename to packages/transport-http/src/send/send-get-collection.js diff --git a/packages/transport-http/src/send-get-collection.test.js b/packages/transport-http/src/send/send-get-collection.test.js similarity index 100% rename from packages/transport-http/src/send-get-collection.test.js rename to packages/transport-http/src/send/send-get-collection.test.js diff --git a/packages/transport-http/src/send-get-events.js b/packages/transport-http/src/send/send-get-events.js similarity index 100% rename from packages/transport-http/src/send-get-events.js rename to packages/transport-http/src/send/send-get-events.js diff --git a/packages/transport-http/src/send-get-events.test.js b/packages/transport-http/src/send/send-get-events.test.js similarity index 100% rename from packages/transport-http/src/send-get-events.test.js rename to packages/transport-http/src/send/send-get-events.test.js diff --git a/packages/transport-http/src/send-get-network-parameters.js b/packages/transport-http/src/send/send-get-network-parameters.js similarity index 100% rename from packages/transport-http/src/send-get-network-parameters.js rename to packages/transport-http/src/send/send-get-network-parameters.js diff --git a/packages/transport-http/src/send-get-network-parameters.test.js b/packages/transport-http/src/send/send-get-network-parameters.test.js similarity index 100% rename from packages/transport-http/src/send-get-network-parameters.test.js rename to packages/transport-http/src/send/send-get-network-parameters.test.js diff --git a/packages/transport-http/src/send-get-node-version-info.test.ts b/packages/transport-http/src/send/send-get-node-version-info.test.ts similarity index 100% rename from packages/transport-http/src/send-get-node-version-info.test.ts rename to packages/transport-http/src/send/send-get-node-version-info.test.ts diff --git a/packages/transport-http/src/send-get-node-version-info.ts b/packages/transport-http/src/send/send-get-node-version-info.ts similarity index 100% rename from packages/transport-http/src/send-get-node-version-info.ts rename to packages/transport-http/src/send/send-get-node-version-info.ts diff --git a/packages/transport-http/src/send-get-transaction-status.js b/packages/transport-http/src/send/send-get-transaction-status.js similarity index 100% rename from packages/transport-http/src/send-get-transaction-status.js rename to packages/transport-http/src/send/send-get-transaction-status.js diff --git a/packages/transport-http/src/send-get-transaction-status.test.js b/packages/transport-http/src/send/send-get-transaction-status.test.js similarity index 100% rename from packages/transport-http/src/send-get-transaction-status.test.js rename to packages/transport-http/src/send/send-get-transaction-status.test.js diff --git a/packages/transport-http/src/send-get-transaction.js b/packages/transport-http/src/send/send-get-transaction.js similarity index 100% rename from packages/transport-http/src/send-get-transaction.js rename to packages/transport-http/src/send/send-get-transaction.js diff --git a/packages/transport-http/src/send-get-transaction.test.js b/packages/transport-http/src/send/send-get-transaction.test.js similarity index 100% rename from packages/transport-http/src/send-get-transaction.test.js rename to packages/transport-http/src/send/send-get-transaction.test.js diff --git a/packages/transport-http/src/send-http.ts b/packages/transport-http/src/send/send-http.ts similarity index 98% rename from packages/transport-http/src/send-http.ts rename to packages/transport-http/src/send/send-http.ts index 3ea87d3f9..7589a7883 100644 --- a/packages/transport-http/src/send-http.ts +++ b/packages/transport-http/src/send/send-http.ts @@ -9,7 +9,7 @@ import {connectSubscribeEvents} from "./connect-subscribe-events.js" import {sendGetBlock} from "./send-get-block.js" import {sendGetBlockHeader} from "./send-get-block-header.js" import {sendGetCollection} from "./send-get-collection.js" -import {sendPing, ISendPingContext} from "./send-ping" +import {sendPing, ISendPingContext} from "./send-ping.js" import {sendGetNetworkParameters} from "./send-get-network-parameters.js" import {Interaction} from "@onflow/typedefs" import {sendGetNodeVersionInfo} from "./send-get-node-version-info.js" diff --git a/packages/transport-http/src/send-ping.test.ts b/packages/transport-http/src/send/send-ping.test.ts similarity index 100% rename from packages/transport-http/src/send-ping.test.ts rename to packages/transport-http/src/send/send-ping.test.ts diff --git a/packages/transport-http/src/send-ping.ts b/packages/transport-http/src/send/send-ping.ts similarity index 100% rename from packages/transport-http/src/send-ping.ts rename to packages/transport-http/src/send/send-ping.ts diff --git a/packages/transport-http/src/send-transaction.js b/packages/transport-http/src/send/send-transaction.js similarity index 100% rename from packages/transport-http/src/send-transaction.js rename to packages/transport-http/src/send/send-transaction.js diff --git a/packages/transport-http/src/send-transaction.test.js b/packages/transport-http/src/send/send-transaction.test.js similarity index 100% rename from packages/transport-http/src/send-transaction.test.js rename to packages/transport-http/src/send/send-transaction.test.js diff --git a/packages/transport-http/src/utils.js b/packages/transport-http/src/send/utils.js similarity index 100% rename from packages/transport-http/src/utils.js rename to packages/transport-http/src/send/utils.js diff --git a/packages/transport-http/src/subscribe/models.ts b/packages/transport-http/src/subscribe/models.ts new file mode 100644 index 000000000..48d6aad07 --- /dev/null +++ b/packages/transport-http/src/subscribe/models.ts @@ -0,0 +1,66 @@ +export enum Action { + LIST_SUBSCRIPTIONS = "list_subscriptions", + SUBSCRIBE = "subscribe", + UNSUBSCRIBE = "unsubscribe", +} +export interface BaseMessageRequest { + action: Action +} + +export interface BaseMessageResponse { + action?: Action + success: boolean + error_message?: string +} + +export interface ListSubscriptionsMessageRequest extends BaseMessageRequest { + action: Action.LIST_SUBSCRIPTIONS +} + +export interface ListSubscriptionsMessageResponse extends BaseMessageResponse { + action: Action.LIST_SUBSCRIPTIONS + subscriptions?: SubscriptionEntry[] +} + +export interface SubscribeMessageRequest extends BaseMessageRequest { + action: Action.SUBSCRIBE + topic: string + arguments: Record +} + +export interface SubscribeMessageResponse extends BaseMessageResponse { + action: Action.SUBSCRIBE + topic: string + id: string +} + +export interface UnsubscribeMessageRequest extends BaseMessageRequest { + action: Action.UNSUBSCRIBE + id: string +} + +export type UnsubscribeMessageResponse = BaseMessageResponse & { + action: Action.UNSUBSCRIBE + id: string +} + +export type SubscriptionEntry = { + id: string + topic: string + arguments: Record +} + +export type MessageRequest = + | ListSubscriptionsMessageRequest + | SubscribeMessageRequest + | UnsubscribeMessageRequest + +export type MessageResponse = + | ListSubscriptionsMessageResponse + | SubscribeMessageResponse + | UnsubscribeMessageResponse + +export type SubscriptionDataMessage = { + id: string + data: any +} diff --git a/packages/transport-http/src/subscribe/subscribe.ts b/packages/transport-http/src/subscribe/subscribe.ts new file mode 100644 index 000000000..50e3ae3d4 --- /dev/null +++ b/packages/transport-http/src/subscribe/subscribe.ts @@ -0,0 +1,34 @@ +import {SdkTransport} from "@onflow/typedefs" +import {SubscriptionManager} from "./subscription-manager" + +// Map of SubscriptionManager instances by access node URL +let subscriptionManagerMap: Map = new Map() + +export async function subscribe( + { + topic, + args, + onData, + onError, + }: { + topic: T + args: SdkTransport.SubscriptionArguments + onData: (data: SdkTransport.SubscriptionData) => void + onError: (error: Error) => void + }, + opts: {node: string} +): Promise { + const manager = + subscriptionManagerMap.get(opts.node) || + new SubscriptionManager({ + node: opts.node, + }) + subscriptionManagerMap.set(opts.node, manager) + + return manager.subscribe({ + topic, + args, + onData, + onError, + }) +} diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts new file mode 100644 index 000000000..4ce60cc57 --- /dev/null +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -0,0 +1,265 @@ +import WS from "jest-websocket-mock" +import {WebSocket as mockSocket} from "mock-socket" +import { + Action, + SubscribeMessageRequest, + SubscribeMessageResponse, + SubscriptionDataMessage, + UnsubscribeMessageRequest, +} from "./models" +import {SubscriptionManager} from "./subscription-manager" +import {SdkTransport} from "@onflow/typedefs" + +jest.mock("./websocket", () => ({ + WebSocket: mockSocket, +})) + +describe("WsSubscriptionTransport", () => { + let mockWs: WS + beforeEach(() => { + mockWs = new WS("wss://localhost:8080") + }) + + afterEach(() => { + WS.clean() + }) + + test("does not connect to the socket when no subscriptions are made", async () => { + const config = { + node: "wss://localhost:8080", + reconnectInterval: 1000, + reconnectAttempts: 10, + } + + new SubscriptionManager(config) + + await new Promise(resolve => setTimeout(resolve, 0)) + expect(mockWs.server.clients).toHaveLength(0) + }) + + test("disconnects from the socket when the last subscription is removed", async () => { + const config = { + node: "wss://localhost:8080", + reconnectInterval: 1000, + reconnectAttempts: 10, + } + const streamController = new SubscriptionManager(config) + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {key: "value"} as any + const onData = jest.fn() + const onError = jest.fn() + + let serverPromise = (async () => { + await mockWs.connected + + const msg = (await mockWs.nextMessage) as string + const data = JSON.parse(msg) as SubscribeMessageRequest + expect(data).toEqual({ + action: "subscribe", + topic, + arguments: args, + }) + + const response: SubscribeMessageResponse = { + id: "id", + action: Action.SUBSCRIBE, + success: true, + topic, + } + mockWs.send(JSON.stringify(response)) + })() + + const [subscription] = await Promise.all([ + streamController.subscribe({ + topic, + args, + onData, + onError, + }), + serverPromise, + ]) + + expect(subscription).toBeDefined() + expect(subscription.unsubscribe).toBeInstanceOf(Function) + + subscription.unsubscribe() + await new Promise(resolve => setTimeout(resolve, 0)) + + await mockWs.closed + expect(mockWs.server.clients).toHaveLength(0) + }) + + test("subscribes, receives data, and unsubscribes", async () => { + const config = { + node: "wss://localhost:8080", + reconnectInterval: 1000, + reconnectAttempts: 10, + } + const streamController = new SubscriptionManager(config) + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {key: "value"} as any + const onData = jest.fn() + const onError = jest.fn() + + let serverPromise = (async () => { + await mockWs.connected + + const msg = (await mockWs.nextMessage) as string + const data = JSON.parse(msg) as SubscribeMessageRequest + expect(data).toEqual({ + action: "subscribe", + topic, + arguments: args, + }) + + const response: SubscribeMessageResponse = { + id: "id", + action: Action.SUBSCRIBE, + success: true, + topic, + } + mockWs.send(JSON.stringify(response)) + })() + + const [subscription] = await Promise.all([ + streamController.subscribe({ + topic, + args, + onData, + onError, + }), + serverPromise, + ]) + + expect(subscription).toBeDefined() + expect(subscription.unsubscribe).toBeInstanceOf(Function) + + serverPromise = (async () => { + const data = { + id: "id", + data: {key: "value"}, + } as SubscriptionDataMessage + mockWs.send(JSON.stringify(data)) + })() + + await serverPromise + + expect(onData).toHaveBeenCalledTimes(1) + expect(onData).toHaveBeenCalledWith({key: "value"}) + expect(onError).toHaveBeenCalledTimes(0) + + serverPromise = (async () => { + const msg = (await mockWs.nextMessage) as string + const data = JSON.parse(msg) as UnsubscribeMessageRequest + expect(data).toEqual({ + action: "unsubscribe", + id: "id", + }) + })() + + subscription.unsubscribe() + await serverPromise + }) + + test("reconnects to stream on close", async () => { + const config = { + node: "wss://localhost:8080", + reconnectInterval: 1000, + reconnectAttempts: 1, + } + const streamController = new SubscriptionManager(config) + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {key: "value"} as any + const onData = jest.fn() + const onError = jest.fn() + + let serverPromise = (async () => { + await mockWs.connected + + const msg = (await mockWs.nextMessage) as string + const data = JSON.parse(msg) as SubscribeMessageRequest + expect(data).toEqual({ + action: "subscribe", + topic, + arguments: args, + }) + + const response: SubscribeMessageResponse = { + id: "id1", + action: Action.SUBSCRIBE, + success: true, + topic, + } + mockWs.send(JSON.stringify(response)) + })() + + const [subscription] = await Promise.all([ + streamController.subscribe({ + topic, + args, + onData, + onError, + }), + serverPromise, + ]) + + expect(subscription).toBeDefined() + expect(subscription.unsubscribe).toBeInstanceOf(Function) + + serverPromise = (async () => { + const data = { + id: "id1", + data: {key: "value"}, + } as SubscriptionDataMessage + mockWs.send(JSON.stringify(data)) + })() + + await serverPromise + + expect(onData).toHaveBeenCalledTimes(1) + expect(onData).toHaveBeenCalledWith({key: "value"}) + expect(onError).toHaveBeenCalledTimes(0) + + // Close the connection and create a new one + mockWs.close() + mockWs = new WS("wss://localhost:8080") + + serverPromise = (async () => { + await mockWs.connected + + const msg = (await mockWs.nextMessage) as string + const data = JSON.parse(msg) as SubscribeMessageRequest + expect(data).toEqual({ + action: "subscribe", + topic, + arguments: args, + }) + + const response: SubscribeMessageResponse = { + id: "id2", + action: Action.SUBSCRIBE, + success: true, + topic, + } + mockWs.send(JSON.stringify(response)) + })() + + await serverPromise + + // Wait for client to register the new subscription + await new Promise(resolve => setTimeout(resolve, 0)) + + serverPromise = (async () => { + const data = { + id: "id2", + data: {key: "value2"}, + } as SubscriptionDataMessage + mockWs.send(JSON.stringify(data)) + })() + + await serverPromise + + expect(onData).toHaveBeenCalledTimes(2) + expect(onData.mock.calls[1]).toEqual([{key: "value2"}]) + }) +}) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts new file mode 100644 index 000000000..2c7ffe328 --- /dev/null +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -0,0 +1,272 @@ +import { + Action, + MessageResponse, + SubscriptionDataMessage, + UnsubscribeMessageResponse, +} from "./models" +import { + SubscribeMessageRequest, + SubscribeMessageResponse, + UnsubscribeMessageRequest, +} from "./models" +import type {SdkTransport} from "@onflow/typedefs" +import {WebSocket} from "./websocket" + +const WS_OPEN = 1 + +interface SubscriptionInfo { + // Internal ID for the subscription + id: number + // Remote ID assigned by the server used for message routing and unsubscribing + remoteId?: string + // The topic of the subscription + topic: T + // The checkpoint to resume the subscription from + checkpoint: SdkTransport.SubscriptionArguments + // The callback to call when a data is received + onData: (data: any) => void + // The callback to call when an error occurs + onError: (error: Error) => void +} + +interface WsTransportConfig { + /** + * The URL of the node to connect to + */ + node: string + /** + * The interval in milliseconds to wait before reconnecting + * @default 1000 + */ + reconnectInterval?: number + /** + * The number of reconnection attempts before giving up + * @default 10 + */ + reconnectAttempts?: number +} + +export class SubscriptionManager { + private counter = 0 + private subscriptions: SubscriptionInfo[] = [] + private socket: WebSocket | null = null + private config: Required + private reconnectAttempts = 0 + + constructor(config: WsTransportConfig) { + this.config = { + reconnectInterval: 1000, + reconnectAttempts: 10, + ...config, + } + } + + // Lazy connect to the socket when the first subscription is made + private async connect() { + return new Promise(resolve => { + // If the socket is already open, do nothing + if (this.socket?.readyState === WS_OPEN) { + return + } + + this.socket = new WebSocket(this.config.node) + this.socket.onmessage = event => { + const data = JSON.parse(event.data) as + | MessageResponse + | SubscriptionDataMessage + + if ("action" in data) { + // TODO, waiting for AN team to decide what to do here + } else { + const sub = this.subscriptions.find(sub => sub.remoteId === data.id) + if (!sub) return + + // Update the block height to checkpoint for disconnects + this.updateSubscriptionCheckpoint(sub, data) + + // Call the subscription callback + sub.onData(data.data) + } + } + this.socket.onclose = () => { + void this.reconnect() + } + this.socket.onerror = e => { + console.error(`WebSocket error: ${e}`) + this.reconnect() + } + + this.socket.onopen = () => { + // Restore subscriptions + Promise.all( + this.subscriptions.map(async sub => { + const response = await this.sendSubscribe(sub) + sub.remoteId = response.id + }) + ).then(() => { + resolve() + }) + } + }) + } + + private async reconnect() { + // Clear the socket + this.socket = null + + // If there are no subscriptions, do nothing + if (this.subscriptions.length === 0) { + return + } + + // Clear all remote ids + this.subscriptions.forEach(sub => { + delete sub.remoteId + }) + + // Validate the number of reconnection attempts + if (this.reconnectAttempts >= this.config.reconnectAttempts) { + this.subscriptions.forEach(sub => { + sub.onError( + new Error( + `Failed to reconnect to the server after ${this.reconnectAttempts} attempts` + ) + ) + }) + this.subscriptions = [] + this.reconnectAttempts = 0 + return + } + + // Delay the reconnection + await new Promise(resolve => + setTimeout(resolve, this.config.reconnectInterval) + ) + + // Try to reconnect + this.reconnectAttempts++ + await this.connect() + this.reconnectAttempts = 0 + } + + async subscribe(opts: { + topic: T + args: SdkTransport.SubscriptionArguments + onData: (data: SdkTransport.SubscriptionData) => void + onError: (error: Error) => void + }): Promise { + // Connect the socket if it's not already open + await this.connect() + + // Track the subscription locally + const sub: SubscriptionInfo = { + id: this.counter++, + topic: opts.topic, + checkpoint: opts.args, + onData: opts.onData, + onError: opts.onError, + } + this.subscriptions.push(sub) + + // Send the subscribe message + const response = await this.sendSubscribe(sub) + + if (!response.success) { + throw new Error( + `Failed to subscribe to topic ${sub.topic}, error message: ${response.error_message}` + ) + } + + // Update the subscription with the remote id + sub.remoteId = response.id + + return { + unsubscribe: () => this.unsubscribe(sub.id), + } + } + + private unsubscribe(id: number): void { + // Get the subscription + const sub = this.subscriptions.find(sub => sub.id === id) + if (!sub) return + + // Send the unsubscribe message + this.sendUnsubscribe(sub).catch(e => { + console.error( + `Failed to unsubscribe from topic ${sub.topic}, error: ${e}` + ) + }) + + // Remove the subscription + this.subscriptions = this.subscriptions.filter(sub => sub.id !== id) + + // Close the socket if there are no more subscriptions + if (this.subscriptions.length === 0) { + this.socket?.close() + } + } + + private async sendSubscribe( + sub: SubscriptionInfo + ) { + // Send the subscription message + const request: SubscribeMessageRequest = { + action: Action.SUBSCRIBE, + topic: sub.topic, + arguments: sub.checkpoint, + } + this.socket?.send(JSON.stringify(request)) + + const response: SubscribeMessageResponse = await this.waitForResponse() + + if (!response.success) { + throw new Error( + `Failed to subscribe to topic ${sub.topic}, error message: ${response.error_message}` + ) + } + + return response + } + + private async sendUnsubscribe( + sub: SubscriptionInfo + ) { + // Send the unsubscribe message if the subscription has a remote id + const {remoteId} = sub + if (remoteId) { + const request: UnsubscribeMessageRequest = { + action: Action.UNSUBSCRIBE, + id: remoteId, + } + this.socket?.send(JSON.stringify(request)) + + const response: UnsubscribeMessageResponse = await this.waitForResponse() + + if (!response.success) { + throw new Error( + `Failed to unsubscribe from topic ${sub.topic}, error message: ${response.error_message}` + ) + } + } + } + + private async waitForResponse(): Promise { + // TODO: NOOP, waiting for AN team to decide what to do here, this is a placeholder + return new Promise(resolve => { + this.socket?.addEventListener("message", event => { + const data = JSON.parse(event.data) as T + if (data.action) { + resolve(data) + } + }) + }) + } + + // Update the subscription checkpoint when a message is received + // These checkpoints are used to resume subscriptions after disconnects + private updateSubscriptionCheckpoint< + T extends SdkTransport.SubscriptionTopic = SdkTransport.SubscriptionTopic, + >(sub: SubscriptionInfo, message: SubscriptionDataMessage) { + // TODO: Will be implemented with each subscription topic + } +} diff --git a/packages/transport-http/src/websocket.ts b/packages/transport-http/src/subscribe/websocket.ts similarity index 58% rename from packages/transport-http/src/websocket.ts rename to packages/transport-http/src/subscribe/websocket.ts index 370efcede..996c99553 100644 --- a/packages/transport-http/src/websocket.ts +++ b/packages/transport-http/src/subscribe/websocket.ts @@ -1,6 +1,7 @@ import _WebSocket from "isomorphic-ws" -export const WebSocket = _WebSocket as new ( +export const WebSocket = _WebSocket as (new ( url: string | URL, protocols?: string | string[] | undefined -) => WebSocket +) => WebSocket) & + WebSocket diff --git a/packages/transport-http/src/transport.ts b/packages/transport-http/src/transport.ts new file mode 100644 index 000000000..c0484d062 --- /dev/null +++ b/packages/transport-http/src/transport.ts @@ -0,0 +1,8 @@ +import {SdkTransport} from "@onflow/typedefs" +import {send} from "./send/send-http" +import {subscribe} from "./subscribe/subscribe" + +export const httpTransport: SdkTransport.Transport = { + send, + subscribe, +} diff --git a/packages/typedefs/src/index.ts b/packages/typedefs/src/index.ts index 4f4082674..b9d516de3 100644 --- a/packages/typedefs/src/index.ts +++ b/packages/typedefs/src/index.ts @@ -459,3 +459,4 @@ export type EventStream = StreamConnection<{ export * from "./interaction" export * from "./fvm-errors" +export * as SdkTransport from "./sdk-transport" diff --git a/packages/typedefs/src/sdk-transport/index.ts b/packages/typedefs/src/sdk-transport/index.ts new file mode 100644 index 000000000..51b71a706 --- /dev/null +++ b/packages/typedefs/src/sdk-transport/index.ts @@ -0,0 +1,16 @@ +import {SendFn} from "./requests" +import {SubscribeFn} from "./subscriptions" + +export type Transport = { + send: SendFn + subscribe: SubscribeFn +} + +export type TransportConfig = { + node: string +} + +export type TransportFactory = (config: TransportConfig) => Transport + +export * from "./subscriptions" +export * from "./requests" diff --git a/packages/typedefs/src/sdk-transport/requests.ts b/packages/typedefs/src/sdk-transport/requests.ts new file mode 100644 index 000000000..07a0b0665 --- /dev/null +++ b/packages/typedefs/src/sdk-transport/requests.ts @@ -0,0 +1,85 @@ +import {Interaction} from "../interaction" + +interface InteractionModule { + isTransaction: (ix: Interaction) => boolean + isGetTransactionStatus: (ix: Interaction) => boolean + isGetTransaction: (ix: Interaction) => boolean + isScript: (ix: Interaction) => boolean + isGetAccount: (ix: Interaction) => boolean + isGetEvents: (ix: Interaction) => boolean + isGetBlock: (ix: Interaction) => boolean + isGetBlockHeader: (ix: Interaction) => boolean + isGetCollection: (ix: Interaction) => boolean + isPing: (ix: Interaction) => boolean + isGetNetworkParameters: (ix: Interaction) => boolean + isSubscribeEvents?: (ix: Interaction) => boolean + isGetNodeVersionInfo?: (ix: Interaction) => boolean +} +interface IContext { + ix: InteractionModule +} +interface IOptsCommon { + node?: string +} + +interface IOpts extends IOptsCommon { + sendTransaction?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendGetTransactionStatus?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendGetTransaction?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendExecuteScript?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendGetAccount?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendGetEvents?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendGetBlockHeader?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendGetCollection?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendPing?: (ix: Interaction, context: IContext, opts: IOptsCommon) => void + sendGetBlock?: (ix: Interaction, context: IContext, opts: IOptsCommon) => void + sendGetNetworkParameters?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + connectSubscribeEvents?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void + sendGetNodeVersionInfo?: ( + ix: Interaction, + context: IContext, + opts: IOptsCommon + ) => void +} + +export type SendFn = (ix: Interaction, context: IContext, opts: IOpts) => void diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts new file mode 100644 index 000000000..cc4c06d58 --- /dev/null +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -0,0 +1,51 @@ +type SchemaItem = { + args: TArgs + data: TData +} + +export enum SubscriptionTopic { + EVENTS = "events", + BLOCKS = "blocks", +} + +export type SubscriptionSchema = { + [SubscriptionTopic.EVENTS]: SchemaItem< + { + startBlock: number + endBlock: number + }, + { + type: string + data: any + } + > + [SubscriptionTopic.BLOCKS]: SchemaItem< + { + startBlock: number + endBlock: number + }, + { + type: string + data: any + } + > +} + +export type SubscriptionArguments = + SubscriptionSchema[T]["args"] +export type SubscriptionData = + SubscriptionSchema[T]["data"] + +export type Subscription = { + unsubscribe: () => void +} + +export type SubscribeFn = ( + params: { + topic: T + args: SubscriptionArguments + onData: (data: SubscriptionData) => void + onError: (error: Error) => void + }, + opts: {node: string} +) => Promise From 9c19d7df60a940b1150c16f5b9c4775884572b5c Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 25 Nov 2024 15:44:04 -0800 Subject: [PATCH 2/8] switch to placeholder topic --- .../src/sdk-transport/subscriptions.ts | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index cc4c06d58..68a9cab04 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -4,29 +4,14 @@ type SchemaItem = { } export enum SubscriptionTopic { - EVENTS = "events", - BLOCKS = "blocks", + PLACEHOLDER = "PLACEHOLDER", } export type SubscriptionSchema = { - [SubscriptionTopic.EVENTS]: SchemaItem< + [SubscriptionTopic.PLACEHOLDER]: SchemaItem< + {}, { - startBlock: number - endBlock: number - }, - { - type: string - data: any - } - > - [SubscriptionTopic.BLOCKS]: SchemaItem< - { - startBlock: number - endBlock: number - }, - { - type: string - data: any + placeholder: string } > } From 62528dd7cf63197f197e9004ddca8f01ab7c910c Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 25 Nov 2024 15:44:06 -0800 Subject: [PATCH 3/8] f --- packages/typedefs/src/sdk-transport/subscriptions.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index 68a9cab04..53240d183 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -3,6 +3,7 @@ type SchemaItem = { data: TData } +// TODO: PLACEHOLDER - Replace with actual subscription topics export enum SubscriptionTopic { PLACEHOLDER = "PLACEHOLDER", } From 5fed33dab2939f8d467a55f1cdc79c7e144c0de6 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 25 Nov 2024 16:23:28 -0800 Subject: [PATCH 4/8] remove transportfactory --- packages/typedefs/src/sdk-transport/index.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/typedefs/src/sdk-transport/index.ts b/packages/typedefs/src/sdk-transport/index.ts index 51b71a706..915962252 100644 --- a/packages/typedefs/src/sdk-transport/index.ts +++ b/packages/typedefs/src/sdk-transport/index.ts @@ -6,11 +6,5 @@ export type Transport = { subscribe: SubscribeFn } -export type TransportConfig = { - node: string -} - -export type TransportFactory = (config: TransportConfig) => Transport - export * from "./subscriptions" export * from "./requests" From 65761158300cc949530e8686c3a3a1e2424002db Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 25 Nov 2024 16:32:41 -0800 Subject: [PATCH 5/8] apply feedback --- .../src/subscribe/subscription-manager.ts | 55 ++++++++++++------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index 2c7ffe328..c65450b1d 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -11,6 +11,7 @@ import { } from "./models" import type {SdkTransport} from "@onflow/typedefs" import {WebSocket} from "./websocket" +import * as logger from "@onflow/util-logger" const WS_OPEN = 1 @@ -35,13 +36,13 @@ interface WsTransportConfig { */ node: string /** - * The interval in milliseconds to wait before reconnecting - * @default 1000 + * Starting interval for reconnection attempts in milliseconds, exponential backoff is applied + * @default 500 */ reconnectInterval?: number /** * The number of reconnection attempts before giving up - * @default 10 + * @default 5 */ reconnectAttempts?: number } @@ -55,8 +56,8 @@ export class SubscriptionManager { constructor(config: WsTransportConfig) { this.config = { - reconnectInterval: 1000, - reconnectAttempts: 10, + reconnectInterval: 500, + reconnectAttempts: 5, ...config, } } @@ -92,8 +93,7 @@ export class SubscriptionManager { void this.reconnect() } this.socket.onerror = e => { - console.error(`WebSocket error: ${e}`) - this.reconnect() + this.reconnect(e) } this.socket.onopen = () => { @@ -110,7 +110,7 @@ export class SubscriptionManager { }) } - private async reconnect() { + private async reconnect(error?: any) { // Clear the socket this.socket = null @@ -126,27 +126,36 @@ export class SubscriptionManager { // Validate the number of reconnection attempts if (this.reconnectAttempts >= this.config.reconnectAttempts) { + logger.log({ + level: logger.LEVELS.error, + title: "WebSocket Error", + message: `Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}`, + }) + this.subscriptions.forEach(sub => { sub.onError( new Error( - `Failed to reconnect to the server after ${this.reconnectAttempts} attempts` + `Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}` ) ) }) this.subscriptions = [] this.reconnectAttempts = 0 - return - } + } else { + logger.log({ + level: logger.LEVELS.warn, + title: "WebSocket Error", + message: `WebSocket error, reconnecting in ${this.backoffInterval}ms: ${error}`, + }) - // Delay the reconnection - await new Promise(resolve => - setTimeout(resolve, this.config.reconnectInterval) - ) + // Delay the reconnection + await new Promise(resolve => setTimeout(resolve, this.backoffInterval)) - // Try to reconnect - this.reconnectAttempts++ - await this.connect() - this.reconnectAttempts = 0 + // Try to reconnect + this.reconnectAttempts++ + await this.connect() + this.reconnectAttempts = 0 + } } async subscribe(opts: { @@ -269,4 +278,12 @@ export class SubscriptionManager { >(sub: SubscriptionInfo, message: SubscriptionDataMessage) { // TODO: Will be implemented with each subscription topic } + + /** + * Calculate the backoff interval for reconnection attempts + * @returns The backoff interval in milliseconds + */ + private get backoffInterval() { + return this.config.reconnectInterval * (this.reconnectAttempts ^ 2) + } } From 206fa5d8f131fdbe2bc09847f215e6bdeb4de269 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 25 Nov 2024 16:34:10 -0800 Subject: [PATCH 6/8] Catch subscription restoration error --- .../src/subscribe/subscription-manager.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index c65450b1d..f12355bf1 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -64,7 +64,7 @@ export class SubscriptionManager { // Lazy connect to the socket when the first subscription is made private async connect() { - return new Promise(resolve => { + return new Promise((resolve, reject) => { // If the socket is already open, do nothing if (this.socket?.readyState === WS_OPEN) { return @@ -103,9 +103,13 @@ export class SubscriptionManager { const response = await this.sendSubscribe(sub) sub.remoteId = response.id }) - ).then(() => { - resolve() - }) + ) + .then(() => { + resolve() + }) + .catch(e => { + reject(new Error(`Failed to restore subscriptions: ${e}`)) + }) } }) } From 3bb0ff29945dcd630bf3ce3f31ad550095499704 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Tue, 26 Nov 2024 10:10:46 -0800 Subject: [PATCH 7/8] fix reconnect api --- .../src/subscribe/subscription-manager.ts | 50 ++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index f12355bf1..043aa0f6d 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -15,6 +15,10 @@ import * as logger from "@onflow/util-logger" const WS_OPEN = 1 +type DeepRequired = Required<{ + [K in keyof T]: DeepRequired +}> + interface SubscriptionInfo { // Internal ID for the subscription id: number @@ -36,29 +40,43 @@ interface WsTransportConfig { */ node: string /** - * Starting interval for reconnection attempts in milliseconds, exponential backoff is applied - * @default 500 - */ - reconnectInterval?: number - /** - * The number of reconnection attempts before giving up - * @default 5 + * Options for reconnecting to the server */ - reconnectAttempts?: number + reconnectOptions?: { + /** + * The initial delay in milliseconds before reconnecting + * @default 500 + */ + initialReconnectDelay?: number + /** + * The maximum number of reconnection attempts + * @default 5 + */ + reconnectAttempts?: number + /** + * The maximum delay in milliseconds between reconnection attempts + * @default 5000 + */ + maxReconnectDelay?: number + } } export class SubscriptionManager { private counter = 0 private subscriptions: SubscriptionInfo[] = [] private socket: WebSocket | null = null - private config: Required + private config: DeepRequired private reconnectAttempts = 0 constructor(config: WsTransportConfig) { this.config = { - reconnectInterval: 500, - reconnectAttempts: 5, ...config, + reconnectOptions: { + initialReconnectDelay: 500, + reconnectAttempts: 5, + maxReconnectDelay: 5000, + ...config.reconnectOptions, + }, } } @@ -129,7 +147,9 @@ export class SubscriptionManager { }) // Validate the number of reconnection attempts - if (this.reconnectAttempts >= this.config.reconnectAttempts) { + if ( + this.reconnectAttempts >= this.config.reconnectOptions.reconnectAttempts + ) { logger.log({ level: logger.LEVELS.error, title: "WebSocket Error", @@ -288,6 +308,10 @@ export class SubscriptionManager { * @returns The backoff interval in milliseconds */ private get backoffInterval() { - return this.config.reconnectInterval * (this.reconnectAttempts ^ 2) + return Math.min( + this.config.reconnectOptions.maxReconnectDelay, + this.config.reconnectOptions.initialReconnectDelay * + 2 ** this.reconnectAttempts + ) } } From 0773a8a9b55ceb6aa74ad7feb3c07d52229b0fb1 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Tue, 26 Nov 2024 10:12:22 -0800 Subject: [PATCH 8/8] fix tests --- .../subscribe/subscription-manager.test.ts | 21 +++++++------------ .../src/subscribe/subscription-manager.ts | 6 +++--- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts index 4ce60cc57..48d2b81cc 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.test.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -7,7 +7,10 @@ import { SubscriptionDataMessage, UnsubscribeMessageRequest, } from "./models" -import {SubscriptionManager} from "./subscription-manager" +import { + SubscriptionManager, + SubscriptionManagerConfig, +} from "./subscription-manager" import {SdkTransport} from "@onflow/typedefs" jest.mock("./websocket", () => ({ @@ -25,10 +28,8 @@ describe("WsSubscriptionTransport", () => { }) test("does not connect to the socket when no subscriptions are made", async () => { - const config = { + const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", - reconnectInterval: 1000, - reconnectAttempts: 10, } new SubscriptionManager(config) @@ -38,10 +39,8 @@ describe("WsSubscriptionTransport", () => { }) test("disconnects from the socket when the last subscription is removed", async () => { - const config = { + const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", - reconnectInterval: 1000, - reconnectAttempts: 10, } const streamController = new SubscriptionManager(config) const topic = "topic" as SdkTransport.SubscriptionTopic @@ -90,10 +89,8 @@ describe("WsSubscriptionTransport", () => { }) test("subscribes, receives data, and unsubscribes", async () => { - const config = { + const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", - reconnectInterval: 1000, - reconnectAttempts: 10, } const streamController = new SubscriptionManager(config) const topic = "topic" as SdkTransport.SubscriptionTopic @@ -162,10 +159,8 @@ describe("WsSubscriptionTransport", () => { }) test("reconnects to stream on close", async () => { - const config = { + const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", - reconnectInterval: 1000, - reconnectAttempts: 1, } const streamController = new SubscriptionManager(config) const topic = "topic" as SdkTransport.SubscriptionTopic diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index 043aa0f6d..b7ea02436 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -34,7 +34,7 @@ interface SubscriptionInfo { onError: (error: Error) => void } -interface WsTransportConfig { +export interface SubscriptionManagerConfig { /** * The URL of the node to connect to */ @@ -65,10 +65,10 @@ export class SubscriptionManager { private counter = 0 private subscriptions: SubscriptionInfo[] = [] private socket: WebSocket | null = null - private config: DeepRequired + private config: DeepRequired private reconnectAttempts = 0 - constructor(config: WsTransportConfig) { + constructor(config: SubscriptionManagerConfig) { this.config = { ...config, reconnectOptions: {