From d1126c8d82b8b6151988af0b328fb22cbe71a3bd Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Wed, 27 Sep 2023 12:51:57 +0530 Subject: [PATCH 1/6] adding subscription manager --- src/config.ts | 4 + src/subscription-manager.ts | 174 +++++++++++++++++++++++++++++ tests/subscription-manager.spec.ts | 46 ++++++++ tests/utils.ts | 14 +++ 4 files changed, 238 insertions(+) create mode 100644 src/subscription-manager.ts create mode 100644 tests/subscription-manager.spec.ts diff --git a/src/config.ts b/src/config.ts index 4e52380..fe09007 100644 --- a/src/config.ts +++ b/src/config.ts @@ -22,4 +22,8 @@ export const config = { // log level - trace/debug/info/warn/error logLevel: process.env.DWN_SERVER_LOG_LEVEL || 'INFO', + + subscriptionsEnabled: + { on: true, off: false }[process.env.SUBSCRIPTIONS] ?? true, + // where to store persistant data }; diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts new file mode 100644 index 0000000..bf01fda --- /dev/null +++ b/src/subscription-manager.ts @@ -0,0 +1,174 @@ +import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; +import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; +import type { + MessageStore, + SubscriptionRequestReply, +} from '@tbd54566975/dwn-sdk-js'; + +import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; +import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; +import type WebSocket from 'ws'; +import { WebSocketServer } from 'ws'; +import { v4 as uuidv4 } from 'uuid'; + +export class Subscription { + from?: string; + subscriptionId: string; + createdAt: string; + description: string; + filters?: SubscriptionFilter[]; + permissionGrant: PermissionsGrant; + connection: WebSocket; +} + +export interface SubscriptionController { + clear(): Promise; + close(): Promise; + start(): Promise; + subscribe( + request: RegisterSubscriptionRequest, + ): Promise; +} + +export type RegisterSubscriptionRequest = { + from: string; + socket: WebSocket; + filters?: SubscriptionFilter[]; + permissionGrant: PermissionsGrant; + subscriptionRequestMessage: SubscriptionRequest; +}; + +export type RegisterSubscriptionReply = { + reply: SubscriptionRequestReply; + subscriptionId?: string; +}; + +export type defaultSubscriptionChannel = 'event'; + +export type SubscriptionManagerOptions = { + subscriptionChannel: string; + wss: WebSocketServer; + dwn: Dwn; + messageStore: MessageStore; + tenant: string; +}; + +export class SubscriptionManager { + private wss: WebSocketServer; + private dwn: Dwn; + private connections: Map; + private messageStore: MessageStore; + private tenant: string; + options: SubscriptionManagerOptions; + #open: boolean; + + constructor(options?: SubscriptionManagerOptions) { + this.wss = options?.wss || new WebSocketServer(); + this.connections = new Map(); + this.messageStore = options?.messageStore; + this.tenant = options?.tenant; + this.dwn = options?.dwn; + this.options = options; + + this.wss.on('connection', (socket: WebSocket) => { + socket.on('subscribe', async (data) => { + await this.handleSubscribe(socket, data); + }); + }); + } + + async clear(): Promise { + this.wss.removeAllListeners(); + this.connections.clear(); + } + + async close(): Promise { + this.#open = false; + this.connections.clear(); + this.wss.close(); + } + + async open(): Promise { + this.#open = true; + } + + async start(): Promise { + this.open(); + } + + private async createSubscription( + from: string, + request: RegisterSubscriptionRequest, + ): Promise { + return { + from, + subscriptionId: uuidv4(), + createdAt: new Date().toISOString(), + description: 'subscription', + filters: request.filters, + permissionGrant: request.permissionGrant, + connection: request.socket, + }; + } + + async handleSubscribe( + socket: WebSocket, + data: any, + ): Promise { + // parse message + const req = SubscriptionRequest.parse(data); + return await this.subscribe(req, socket); + } + + createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse { + return { + id: uuidv4(), + jsonrpc: '2.0', + result: e, + }; + } + + async subscribe( + req: RegisterSubscriptionRequest, + socket: WebSocket, + ): Promise { + const subscriptionReply = await this.dwn.handleSubscriptionRequest( + this.tenant, + req.subscriptionRequestMessage, + ); + if (subscriptionReply.status.code !== 200) { + return { reply: subscriptionReply }; + } + const subscription = await this.createSubscription(req.from, req); + this.registerSubscription(subscription); + // set up forwarding. + subscriptionReply.subscription.emitter.on( + async (e: EventMessage): Promise => { + const jsonRpcResponse = this.createJSONRPCEvent(e); + const str = JSON.stringify(jsonRpcResponse); + return socket.send(Buffer.from(str)); + }, + ); + } + + private async registerSubscription( + subscription: Subscription, + ): Promise { + if (!this.#open) { + throw new Error("Can't register subscription. It's not opened."); + } + if (this.connections.has(subscription.subscriptionId)) { + throw new Error( + 'Failed to add connection to controller. ID already exists.', + ); + } + this.connections.set(subscription.subscriptionId, subscription); + subscription.connection.on('close', () => { + this.deleteSubscription(subscription.subscriptionId); + }); + } + + private async deleteSubscription(id: string): Promise { + this.connections.delete(id); + } +} diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts new file mode 100644 index 0000000..063d049 --- /dev/null +++ b/tests/subscription-manager.spec.ts @@ -0,0 +1,46 @@ +import { assert } from 'chai'; +import { createProfile } from './utils.js'; +import { Jws } from '@tbd54566975/dwn-sdk-js'; +import type { SubscriptionController } from '../src/subscription-manager.js'; +import { SubscriptionManager } from '../src/subscription-manager.js'; + +describe('Subscription Manager Test', () => { + let subscriptionManager: SubscriptionController; + + // important to follow the `before` and `after` pattern to initialize and clean the stores in tests + // so that different test suites can reuse the same backend store for testing + before(async () => { + subscriptionManager = new SubscriptionManager({}); + }); + + // before each, clear the subscriptions + beforeEach(async () => { + subscriptionManager.clear(); + }); + + // close at the end + after(async () => { + await subscriptionManager.close(); + }); + + it('test subscription manager registration', async () => { + try { + const alice = await createProfile(); + const req = await SubscriptionRequest.create({ + filter: { + eventType: EventType.Operation, + }, + authorizationSignatureInput: Jws.createSignatureInput(alice), + }); + const subscription = await subscriptionManager.subscribe({ + from: alice.did, + subscriptionRequestMessage: req, + permissionGrant: 'asdf', + }); + assert.isDefined(subscription.reply); + assert.isDefined(subscription.subscriptionId); + } catch (error) { + assert.fail(error, undefined, 'failed to register subscription'); + } + }); +}); diff --git a/tests/utils.ts b/tests/utils.ts index f6750e4..d9694bb 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -14,6 +14,7 @@ import { DidKeyResolver, PrivateKeySigner, RecordsWrite, + SubscriptionRequest, } from '@tbd54566975/dwn-sdk-js'; // __filename and __dirname are not defined in ES module scope @@ -67,6 +68,19 @@ export type GenerateProtocolsConfigureOutput = { dataStream: Readable | undefined; }; +export type CreateSubscriptionRequestOverride = {}; + +export async function createSubscriptionRequest( + signer: Profile, + overrides: CreateSubscriptionRequestOverride, +): Promise { + console.log(overrides); + const subscriptionRequest = await SubscriptionRequest.create({ + authorizationSignatureInput: signer.signatureInput, + }); + return subscriptionRequest; +} + export async function createRecordsWriteMessage( signer: Profile, overrides: CreateRecordsWriteOverrides = {}, From 3547c74ac992a6a566d98a4fdb7555fb0410c914 Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Wed, 27 Sep 2023 16:03:44 +0530 Subject: [PATCH 2/6] migrated to recent updates --- src/subscription-manager.ts | 27 ++++---- tests/http-api.spec.ts | 42 ++++++------ tests/subscription-manager.spec.ts | 102 ++++++++++++++++++++++++----- tests/utils.ts | 19 ++---- 4 files changed, 124 insertions(+), 66 deletions(-) diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index bf01fda..2318237 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -1,12 +1,10 @@ import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; -import type { - MessageStore, - SubscriptionRequestReply, -} from '@tbd54566975/dwn-sdk-js'; import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; +import type { MessageStore } from '@tbd54566975/dwn-sdk-js'; import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; +import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; import type WebSocket from 'ws'; import { WebSocketServer } from 'ws'; import { v4 as uuidv4 } from 'uuid'; @@ -34,8 +32,8 @@ export type RegisterSubscriptionRequest = { from: string; socket: WebSocket; filters?: SubscriptionFilter[]; - permissionGrant: PermissionsGrant; - subscriptionRequestMessage: SubscriptionRequest; + permissionGrant?: PermissionsGrant; + request: SubscriptionRequest; }; export type RegisterSubscriptionReply = { @@ -46,8 +44,7 @@ export type RegisterSubscriptionReply = { export type defaultSubscriptionChannel = 'event'; export type SubscriptionManagerOptions = { - subscriptionChannel: string; - wss: WebSocketServer; + wss?: WebSocketServer; dwn: Dwn; messageStore: MessageStore; tenant: string; @@ -116,8 +113,13 @@ export class SubscriptionManager { data: any, ): Promise { // parse message - const req = SubscriptionRequest.parse(data); - return await this.subscribe(req, socket); + const req = await SubscriptionRequest.parse(data); + + return await this.subscribe({ + request: req, + socket: socket, + from: req.author, + }); } createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse { @@ -130,11 +132,10 @@ export class SubscriptionManager { async subscribe( req: RegisterSubscriptionRequest, - socket: WebSocket, ): Promise { const subscriptionReply = await this.dwn.handleSubscriptionRequest( this.tenant, - req.subscriptionRequestMessage, + req.request.message, ); if (subscriptionReply.status.code !== 200) { return { reply: subscriptionReply }; @@ -146,7 +147,7 @@ export class SubscriptionManager { async (e: EventMessage): Promise => { const jsonRpcResponse = this.createJSONRPCEvent(e); const str = JSON.stringify(jsonRpcResponse); - return socket.send(Buffer.from(str)); + return req.socket.send(Buffer.from(str)); }, ); } diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index d0d59ca..72c5f4f 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -1,34 +1,18 @@ -// node.js 18 and earlier, needs globalThis.crypto polyfill -import { webcrypto } from 'node:crypto'; - -if (!globalThis.crypto) { - // @ts-ignore - globalThis.crypto = webcrypto; -} - -import type { Server } from 'http'; -import type { - JsonRpcErrorResponse, - JsonRpcResponse, -} from '../src/lib/json-rpc.js'; - -import fetch from 'node-fetch'; -import request from 'supertest'; - -import { expect } from 'chai'; -import { HttpApi } from '../src/http-api.js'; -import { v4 as uuidv4 } from 'uuid'; import { Cid, DataStream, RecordsQuery, RecordsRead, } from '@tbd54566975/dwn-sdk-js'; -import { clear as clearDwn, dwn } from './test-dwn.js'; import { - createJsonRpcRequest, JsonRpcErrorCodes, + createJsonRpcRequest, +} from '../src/lib/json-rpc.js'; +import type { + JsonRpcErrorResponse, + JsonRpcResponse, } from '../src/lib/json-rpc.js'; +import { clear as clearDwn, dwn } from './test-dwn.js'; import { createProfile, createRecordsWriteMessage, @@ -36,6 +20,20 @@ import { streamHttpRequest, } from './utils.js'; +import { HttpApi } from '../src/http-api.js'; +import type { Server } from 'http'; +import { expect } from 'chai'; +import fetch from 'node-fetch'; +import request from 'supertest'; +import { v4 as uuidv4 } from 'uuid'; +// node.js 18 and earlier, needs globalThis.crypto polyfill +import { webcrypto } from 'node:crypto'; + +if (!globalThis.crypto) { + // @ts-ignore + globalThis.crypto = webcrypto; +} + describe('http api', function () { let httpApi: HttpApi; let server: Server; diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts index 063d049..2e92a06 100644 --- a/tests/subscription-manager.spec.ts +++ b/tests/subscription-manager.spec.ts @@ -1,44 +1,110 @@ -import { assert } from 'chai'; -import { createProfile } from './utils.js'; +import http from 'node:http'; +import { WebSocket, type WebSocketServer } from 'ws'; + +import { + DataStoreLevel, + DidKeyResolver, + Dwn, + EventLogLevel, + MessageStoreLevel, + SubscriptionRequest, +} from '@tbd54566975/dwn-sdk-js'; + import { Jws } from '@tbd54566975/dwn-sdk-js'; import type { SubscriptionController } from '../src/subscription-manager.js'; import { SubscriptionManager } from '../src/subscription-manager.js'; +import { assert } from 'chai'; +import { createProfile } from './utils.js'; +import type { Profile } from './utils.js'; +import { WsApi } from '../src/ws-api.js'; -describe('Subscription Manager Test', () => { +describe('Subscription Manager Test', async () => { let subscriptionManager: SubscriptionController; + let wsServer: WebSocketServer; + let server: http.Server; + let dataStore: DataStoreLevel; + let eventLog: EventLogLevel; + let messageStore: MessageStoreLevel; + let alice: Profile; + let dwn: Dwn; + let socket: WebSocket; - // important to follow the `before` and `after` pattern to initialize and clean the stores in tests - // so that different test suites can reuse the same backend store for testing before(async () => { - subscriptionManager = new SubscriptionManager({}); + // Setup data stores... + dataStore = new DataStoreLevel({ + blockstoreLocation: 'data/DATASTORE', + }); + eventLog = new EventLogLevel({ location: 'data/EVENTLOG' }); + messageStore = new MessageStoreLevel({ + blockstoreLocation: 'data/MESSAGESTORE', + indexLocation: 'data/INDEX', + }); + + // create profile + alice = await createProfile(); + // create Dwn + dwn = await Dwn.create({ eventLog, dataStore, messageStore }); + + // create listeners... + server = http.createServer(); + server.listen(9002, '127.0.0.1'); + const wsApi = new WsApi(server, dwn); + wsServer = wsApi.start(); + + // create subscription manager... + subscriptionManager = new SubscriptionManager({ + dwn: dwn, + messageStore: messageStore, + tenant: alice.did, + wss: wsServer, + }); + return; }); // before each, clear the subscriptions beforeEach(async () => { subscriptionManager.clear(); + await dataStore.clear(); + await eventLog.clear(); + await messageStore.clear(); }); // close at the end after(async () => { await subscriptionManager.close(); + wsServer.close(); + server.close(); + server.closeAllConnections(); + socket.close(); }); it('test subscription manager registration', async () => { try { - const alice = await createProfile(); + const signer = await DidKeyResolver.generate(); + + // create a subscription request const req = await SubscriptionRequest.create({ - filter: { - eventType: EventType.Operation, - }, - authorizationSignatureInput: Jws.createSignatureInput(alice), + signer: Jws.createSigner(signer), }); - const subscription = await subscriptionManager.subscribe({ - from: alice.did, - subscriptionRequestMessage: req, - permissionGrant: 'asdf', - }); - assert.isDefined(subscription.reply); - assert.isDefined(subscription.subscriptionId); + + // setup a socket connection to wsServer + const socket = new WebSocket(wsServer.address.toString()); + socket.onopen = async (): Promise => { + console.log('sending req', req); + // send a subscription request + // const subscription = await subscriptionManager.subscribe({ + // from: alice.did, + // subscriptionRequestMessage: req, + // permissionGrant: 'asdf', + // }); + socket.send('subscription request'); + return; + }; + + socket.onmessage = (event): Promise => { + console.log('got message', event); + return; + }; } catch (error) { assert.fail(error, undefined, 'failed to register subscription'); } diff --git a/tests/utils.ts b/tests/utils.ts index d9694bb..3ec7872 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -12,10 +12,10 @@ import { Cid, DataStream, DidKeyResolver, - PrivateKeySigner, RecordsWrite, SubscriptionRequest, } from '@tbd54566975/dwn-sdk-js'; +import { Jws } from '@tbd54566975/dwn-sdk-js'; // __filename and __dirname are not defined in ES module scope const __filename = fileURLToPath(import.meta.url); @@ -32,18 +32,11 @@ export type Profile = { export async function createProfile(): Promise { const { did, keyPair, keyId } = await DidKeyResolver.generate(); - - // signer is required by all dwn message classes. it's used to sign messages - const signer = new PrivateKeySigner({ - privateJwk: keyPair.privateJwk, - algorithm: keyPair.privateJwk.alg, - keyId: `${did}#${keyId}`, - }); - + const signer = Jws.createSigner({ keyPair, keyId }); return { - did, - keyPair, - signer, + did: did, + keyPair: keyPair, + signer: signer, }; } @@ -76,7 +69,7 @@ export async function createSubscriptionRequest( ): Promise { console.log(overrides); const subscriptionRequest = await SubscriptionRequest.create({ - authorizationSignatureInput: signer.signatureInput, + signer: signer.signer, }); return subscriptionRequest; } From 2c16e0c6e2c9ef5969f7eb643e7229e27b3697a4 Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Thu, 28 Sep 2023 14:10:03 +0530 Subject: [PATCH 3/6] updated message processing and added handlers to ws-api --- src/json-rpc-handlers/dwn/process-message.ts | 35 +++-- src/lib/json-rpc-router.ts | 7 +- src/subscription-manager.ts | 8 +- src/ws-api.ts | 18 ++- tests/subscription-manager.spec.ts | 136 ++++++++++--------- tests/test-dwn.ts | 8 +- 6 files changed, 125 insertions(+), 87 deletions(-) diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 461f86a..183e83c 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -1,19 +1,19 @@ -import type { Readable as IsomorphicReadable } from 'readable-stream'; -import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { HandlerResponse, JsonRpcHandler, } from '../../lib/json-rpc-router.js'; - -import { v4 as uuidv4 } from 'uuid'; -import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; - import { + JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse, - JsonRpcErrorCodes, } from '../../lib/json-rpc.js'; +import type { Readable as IsomorphicReadable } from 'readable-stream'; +import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; +import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; +import { v4 as uuidv4 } from 'uuid'; + export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, @@ -36,6 +36,26 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( !dataStream ) { reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message); + } else if ( + messageType === + DwnInterfaceName.Subscriptions + DwnMethodName.Request + ) { + reply = (await dwn.processMessage( + target, + message, + )) as SubscriptionRequestReply; + if (!context.subscriptionManager || context.socket) { + throw new Error( + 'setup failure. improper context provided for subscription', + ); + } + const req = { + socket: context.socket, + from: dwnRequest.params?.descriptor, + request: {}, + }; + const subscription = await context.subscriptionManager.subscribe(req); + console.log(subscription); } else { reply = (await dwn.processMessage( target, @@ -43,7 +63,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( dataStream as IsomorphicReadable, )) as RecordsReadReply; } - // RecordsRead messages return record data as a stream to for accommodate large amounts of data let recordDataStream; if (reply?.record?.data !== undefined) { diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 33f7314..eb4d451 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -1,11 +1,16 @@ +import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; + import type { Dwn } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; -import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { SubscriptionController } from '../subscription-manager.js'; +import type { WebSocket } from 'ws'; export type RequestContext = { dwn: Dwn; transport: 'http' | 'ws'; dataStream?: Readable; + socket?: WebSocket; + subscriptionManager?: SubscriptionController; }; export type HandlerResponse = { diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index 2318237..58e02c9 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -2,7 +2,6 @@ import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; -import type { MessageStore } from '@tbd54566975/dwn-sdk-js'; import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; import type WebSocket from 'ws'; @@ -46,7 +45,6 @@ export type defaultSubscriptionChannel = 'event'; export type SubscriptionManagerOptions = { wss?: WebSocketServer; dwn: Dwn; - messageStore: MessageStore; tenant: string; }; @@ -54,7 +52,6 @@ export class SubscriptionManager { private wss: WebSocketServer; private dwn: Dwn; private connections: Map; - private messageStore: MessageStore; private tenant: string; options: SubscriptionManagerOptions; #open: boolean; @@ -62,13 +59,14 @@ export class SubscriptionManager { constructor(options?: SubscriptionManagerOptions) { this.wss = options?.wss || new WebSocketServer(); this.connections = new Map(); - this.messageStore = options?.messageStore; this.tenant = options?.tenant; this.dwn = options?.dwn; this.options = options; this.wss.on('connection', (socket: WebSocket) => { - socket.on('subscribe', async (data) => { + console.log('connected'); + socket.on('message', async (data) => { + console.log('got message...'); await this.handleSubscribe(socket, data); }); }); diff --git a/src/ws-api.ts b/src/ws-api.ts index cfe846b..6809f71 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -1,7 +1,7 @@ import { base64url } from 'multiformats/bases/base64'; import { v4 as uuidv4 } from 'uuid'; import { DataStream, type Dwn } from '@tbd54566975/dwn-sdk-js'; -import type { IncomingMessage, Server } from 'http'; +import { type IncomingMessage, type Server } from 'http'; import { type AddressInfo, type WebSocket, WebSocketServer } from 'ws'; import { jsonRpcApi } from './json-rpc-api.js'; @@ -12,6 +12,10 @@ import { JsonRpcErrorCodes, type JsonRpcResponse, } from './lib/json-rpc.js'; +import { + SubscriptionManager, + type SubscriptionController, +} from './subscription-manager.js'; const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); const HEARTBEAT_INTERVAL = 30_000; @@ -19,10 +23,16 @@ const HEARTBEAT_INTERVAL = 30_000; export class WsApi { #wsServer: WebSocketServer; dwn: Dwn; + #subscriptionManager: SubscriptionController; constructor(server: Server, dwn: Dwn) { this.dwn = dwn; this.#wsServer = new WebSocketServer({ server }); + this.#subscriptionManager = new SubscriptionManager({ + dwn: dwn, + tenant: 'asdf', + wss: this.#wsServer, + }); } // TODO: github.com/TBD54566975/dwn-server/issues/49 Add code coverage tracker, similar to either dwn-sdk-js or to web5-js @@ -63,7 +73,6 @@ export class WsApi { socket.on('message', async function (dataBuffer) { let dwnRequest; - try { // deserialize bytes into JSON object dwnRequest = dataBuffer.toString(); @@ -77,7 +86,6 @@ export class WsApi { const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); return socket.send(responseBuffer); } - dwnRequest = JSON.parse(dwnRequest); } catch (e) { const jsonRpcResponse = createJsonRpcErrorResponse( @@ -85,7 +93,6 @@ export class WsApi { JsonRpcErrorCodes.BadRequest, e.message, ); - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); return socket.send(responseBuffer); } @@ -101,6 +108,7 @@ export class WsApi { transport: 'ws', dataStream: requestDataStream, }; + const { jsonRpcResponse } = await jsonRpcApi.handle( dwnRequest, requestContext, @@ -140,9 +148,7 @@ export class WsApi { #setupWebSocket(): void { this.#wsServer.on('connection', this.#handleConnection.bind(this)); - const heartbeatInterval = this.#setupHeartbeat(); - this.#wsServer.on('close', function close() { clearInterval(heartbeatInterval); }); diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts index 2e92a06..9f84fd9 100644 --- a/tests/subscription-manager.spec.ts +++ b/tests/subscription-manager.spec.ts @@ -1,112 +1,120 @@ import http from 'node:http'; +import type { AddressInfo } from 'ws'; import { WebSocket, type WebSocketServer } from 'ws'; +import { v4 as uuidv4 } from 'uuid'; -import { - DataStoreLevel, - DidKeyResolver, - Dwn, - EventLogLevel, - MessageStoreLevel, - SubscriptionRequest, -} from '@tbd54566975/dwn-sdk-js'; +import { DidKeyResolver, SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; import { Jws } from '@tbd54566975/dwn-sdk-js'; -import type { SubscriptionController } from '../src/subscription-manager.js'; -import { SubscriptionManager } from '../src/subscription-manager.js'; import { assert } from 'chai'; import { createProfile } from './utils.js'; import type { Profile } from './utils.js'; import { WsApi } from '../src/ws-api.js'; +import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { clear as clearDwn, dwn } from './test-dwn.js'; describe('Subscription Manager Test', async () => { - let subscriptionManager: SubscriptionController; - let wsServer: WebSocketServer; let server: http.Server; - let dataStore: DataStoreLevel; - let eventLog: EventLogLevel; - let messageStore: MessageStoreLevel; + let wsServer: WebSocketServer; let alice: Profile; - let dwn: Dwn; let socket: WebSocket; before(async () => { - // Setup data stores... - dataStore = new DataStoreLevel({ - blockstoreLocation: 'data/DATASTORE', - }); - eventLog = new EventLogLevel({ location: 'data/EVENTLOG' }); - messageStore = new MessageStoreLevel({ - blockstoreLocation: 'data/MESSAGESTORE', - indexLocation: 'data/INDEX', - }); - - // create profile - alice = await createProfile(); - // create Dwn - dwn = await Dwn.create({ eventLog, dataStore, messageStore }); - // create listeners... server = http.createServer(); server.listen(9002, '127.0.0.1'); + const wsApi = new WsApi(server, dwn); wsServer = wsApi.start(); - + alice = await createProfile(); + // starts the ws server // create subscription manager... - subscriptionManager = new SubscriptionManager({ - dwn: dwn, - messageStore: messageStore, - tenant: alice.did, - wss: wsServer, - }); return; }); // before each, clear the subscriptions beforeEach(async () => { - subscriptionManager.clear(); - await dataStore.clear(); - await eventLog.clear(); - await messageStore.clear(); + // subscriptionManager.clear(); + }); + + afterEach(async () => { + await clearDwn(); }); // close at the end after(async () => { - await subscriptionManager.close(); + //await subscriptionManager.close(); wsServer.close(); server.close(); server.closeAllConnections(); - socket.close(); + if (socket) { + socket.close(); + } }); it('test subscription manager registration', async () => { try { const signer = await DidKeyResolver.generate(); - - // create a subscription request const req = await SubscriptionRequest.create({ signer: Jws.createSigner(signer), }); - // setup a socket connection to wsServer - const socket = new WebSocket(wsServer.address.toString()); - socket.onopen = async (): Promise => { - console.log('sending req', req); - // send a subscription request - // const subscription = await subscriptionManager.subscribe({ - // from: alice.did, - // subscriptionRequestMessage: req, - // permissionGrant: 'asdf', - // }); - socket.send('subscription request'); - return; - }; + const port = (wsServer.address() as AddressInfo).port; + const ip = (wsServer.address() as AddressInfo).address; + const addr = `ws://${ip}:${port}`; + const socket = new WebSocket(addr); - socket.onmessage = (event): Promise => { - console.log('got message', event); - return; - }; + const socketPromise = new Promise((resolve, reject) => { + // set up lisetner... + socket.onmessage = (event): Promise => { + try { + console.log('got message'); + resolve(event); + return; + } catch (error) { + reject(error); + } + }; + + socket.onerror = (error): void => { + reject(error); // Reject the promise if there's an error with the socket + }; + + socket.onclose = (event): void => { + if (event.wasClean) { + console.log( + `Connection closed cleanly, code=${event.code}, reason=${event.reason}`, + ); + } else { + console.error(`Connection abruptly closed`); + } + reject(new Error(`Connection closed: ${event.reason}`)); // Reject the promise on socket close + }; + + socket.onopen = async (): Promise => { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest( + requestId, + 'dwn.processMessage', + { + message: req.toJSON(), + target: alice.did, + }, + ); + try { + if (socket.readyState !== WebSocket.OPEN) { + reject(new Error('socket not open')); + } + socket.send(JSON.stringify(dwnRequest)); + } catch (error) { + reject(error); + } + return; + }; + }); + await socketPromise; } catch (error) { - assert.fail(error, undefined, 'failed to register subscription'); + assert.fail(error, undefined, 'failed to register subscription' + error); } }); }); diff --git a/tests/test-dwn.ts b/tests/test-dwn.ts index 515ece5..3354982 100644 --- a/tests/test-dwn.ts +++ b/tests/test-dwn.ts @@ -1,13 +1,15 @@ import { - Dwn, DataStoreLevel, + Dwn, EventLogLevel, MessageStoreLevel, } from '@tbd54566975/dwn-sdk-js'; -const dataStore = new DataStoreLevel({ blockstoreLocation: 'data/DATASTORE' }); +export const dataStore = new DataStoreLevel({ + blockstoreLocation: 'data/DATASTORE', +}); const eventLog = new EventLogLevel({ location: 'data/EVENTLOG' }); -const messageStore = new MessageStoreLevel({ +export const messageStore = new MessageStoreLevel({ blockstoreLocation: 'data/MESSAGESTORE', indexLocation: 'data/INDEX', }); From f3c0b62777f9b136f096c707181de05154297695 Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Mon, 2 Oct 2023 01:23:43 +0530 Subject: [PATCH 4/6] dwn-server test passing --- src/json-rpc-handlers/dwn/process-message.ts | 45 +++++++++++--------- src/subscription-manager.ts | 22 +++++----- src/ws-api.ts | 3 ++ tests/subscription-manager.spec.ts | 7 ++- 4 files changed, 46 insertions(+), 31 deletions(-) diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 183e83c..a9a1c63 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -1,4 +1,14 @@ -import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; +import { v4 as uuidv4 } from 'uuid'; +import type { Readable as IsomorphicReadable } from 'readable-stream'; +import type { + RecordsReadReply, + SubscriptionRequestReply, +} from '@tbd54566975/dwn-sdk-js'; +import { + DwnInterfaceName, + DwnMethodName, + SubscriptionRequest, +} from '@tbd54566975/dwn-sdk-js'; import type { HandlerResponse, JsonRpcHandler, @@ -9,11 +19,6 @@ import { createJsonRpcSuccessResponse, } from '../../lib/json-rpc.js'; -import type { Readable as IsomorphicReadable } from 'readable-stream'; -import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; -import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; -import { v4 as uuidv4 } from 'uuid'; - export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, @@ -21,16 +26,12 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( const { dwn, dataStream } = context; const { target, message } = dwnRequest.params; const requestId = dwnRequest.id ?? uuidv4(); - try { - let reply; + let reply: any; + const messageType = message?.descriptor?.interface + message?.descriptor?.method; - // When a record is deleted via `RecordsDelete`, the initial RecordsWrite is kept as a tombstone _in addition_ - // to the RecordsDelete message. the data associated to that initial RecordsWrite is deleted. If a record was written - // _and_ deleted before it ever got to dwn-server, we end up in a situation where we still need to process the tombstone - // so that we can process the RecordsDelete. if ( messageType === DwnInterfaceName.Records + DwnMethodName.Write && !dataStream @@ -44,18 +45,25 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( target, message, )) as SubscriptionRequestReply; - if (!context.subscriptionManager || context.socket) { + if (!context.subscriptionManager || !context.socket) { throw new Error( 'setup failure. improper context provided for subscription', ); } + + // FIXME: How to handle subscription requests? + const request = await SubscriptionRequest.create({}); const req = { socket: context.socket, - from: dwnRequest.params?.descriptor, - request: {}, + from: message.descriptor.author, + request: request, }; - const subscription = await context.subscriptionManager.subscribe(req); - console.log(subscription); + reply = await context.subscriptionManager.subscribe(req); + const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { + reply, + }); + const responsePayload: HandlerResponse = { jsonRpcResponse }; + return responsePayload; } else { reply = (await dwn.processMessage( target, @@ -63,7 +71,7 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( dataStream as IsomorphicReadable, )) as RecordsReadReply; } - // RecordsRead messages return record data as a stream to for accommodate large amounts of data + let recordDataStream; if (reply?.record?.data !== undefined) { recordDataStream = reply.record.data; @@ -83,7 +91,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( JsonRpcErrorCodes.InternalError, e.message, ); - return { jsonRpcResponse } as HandlerResponse; } }; diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index 58e02c9..0a93828 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -28,15 +28,15 @@ export interface SubscriptionController { } export type RegisterSubscriptionRequest = { - from: string; - socket: WebSocket; - filters?: SubscriptionFilter[]; - permissionGrant?: PermissionsGrant; - request: SubscriptionRequest; + from: string; // from connection + socket: WebSocket; // socket connection + filters?: SubscriptionFilter[]; // filters, if applicable + permissionGrant?: PermissionsGrant; //permission grant, if applicable + request: SubscriptionRequest; // subscription request }; export type RegisterSubscriptionReply = { - reply: SubscriptionRequestReply; + reply?: SubscriptionRequestReply; subscriptionId?: string; }; @@ -52,21 +52,17 @@ export class SubscriptionManager { private wss: WebSocketServer; private dwn: Dwn; private connections: Map; - private tenant: string; options: SubscriptionManagerOptions; #open: boolean; constructor(options?: SubscriptionManagerOptions) { this.wss = options?.wss || new WebSocketServer(); this.connections = new Map(); - this.tenant = options?.tenant; this.dwn = options?.dwn; this.options = options; this.wss.on('connection', (socket: WebSocket) => { - console.log('connected'); socket.on('message', async (data) => { - console.log('got message...'); await this.handleSubscribe(socket, data); }); }); @@ -132,7 +128,7 @@ export class SubscriptionManager { req: RegisterSubscriptionRequest, ): Promise { const subscriptionReply = await this.dwn.handleSubscriptionRequest( - this.tenant, + req.from, req.request.message, ); if (subscriptionReply.status.code !== 200) { @@ -148,6 +144,10 @@ export class SubscriptionManager { return req.socket.send(Buffer.from(str)); }, ); + return { + reply: subscriptionReply, + subscriptionId: subscription?.subscriptionId, + } as RegisterSubscriptionReply; } private async registerSubscription( diff --git a/src/ws-api.ts b/src/ws-api.ts index 6809f71..3b67b04 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -50,6 +50,7 @@ export class WsApi { */ #handleConnection(socket: WebSocket, _request: IncomingMessage): void { const dwn = this.dwn; + const subscriptionManager = this.#subscriptionManager; socket[SOCKET_ISALIVE_SYMBOL] = true; @@ -107,6 +108,8 @@ export class WsApi { dwn, transport: 'ws', dataStream: requestDataStream, + subscriptionManager: subscriptionManager, + socket: socket, }; const { jsonRpcResponse } = await jsonRpcApi.handle( diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts index 9f84fd9..cd3d78e 100644 --- a/tests/subscription-manager.spec.ts +++ b/tests/subscription-manager.spec.ts @@ -68,7 +68,10 @@ describe('Subscription Manager Test', async () => { // set up lisetner... socket.onmessage = (event): Promise => { try { - console.log('got message'); + const resp = JSON.parse(event.data.toString()); + if (resp.error) { + throw new Error(resp.error.message); + } resolve(event); return; } catch (error) { @@ -92,6 +95,7 @@ describe('Subscription Manager Test', async () => { }; socket.onopen = async (): Promise => { + // on open const requestId = uuidv4(); const dwnRequest = createJsonRpcRequest( requestId, @@ -101,6 +105,7 @@ describe('Subscription Manager Test', async () => { target: alice.did, }, ); + try { if (socket.readyState !== WebSocket.OPEN) { reject(new Error('socket not open')); From 6e02d411c3e6ac99d83e58a18a47724b23f440d7 Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Mon, 2 Oct 2023 11:07:09 +0530 Subject: [PATCH 5/6] updated tests. working now --- src/json-rpc-handlers/dwn/process-message.ts | 1 + src/subscription-manager.ts | 2 + tests/subscription-manager.spec.ts | 48 ++++++++++++++++++-- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index a9a1c63..eeef9d1 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -36,6 +36,7 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( messageType === DwnInterfaceName.Records + DwnMethodName.Write && !dataStream ) { + console.log('sending'); reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message); } else if ( messageType === diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index 0a93828..d3e1b2d 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -137,8 +137,10 @@ export class SubscriptionManager { const subscription = await this.createSubscription(req.from, req); this.registerSubscription(subscription); // set up forwarding. + // console.log('---------', subscriptionReply.subscription.emitter); subscriptionReply.subscription.emitter.on( async (e: EventMessage): Promise => { + // console.log('got a record', e); const jsonRpcResponse = this.createJSONRPCEvent(e); const str = JSON.stringify(jsonRpcResponse); return req.socket.send(Buffer.from(str)); diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts index cd3d78e..11fa11f 100644 --- a/tests/subscription-manager.spec.ts +++ b/tests/subscription-manager.spec.ts @@ -3,15 +3,23 @@ import type { AddressInfo } from 'ws'; import { WebSocket, type WebSocketServer } from 'ws'; import { v4 as uuidv4 } from 'uuid'; -import { DidKeyResolver, SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; +import { + DataStream, + DidKeyResolver, + SubscriptionRequest, +} from '@tbd54566975/dwn-sdk-js'; import { Jws } from '@tbd54566975/dwn-sdk-js'; import { assert } from 'chai'; -import { createProfile } from './utils.js'; +import { createProfile, createRecordsWriteMessage } from './utils.js'; import type { Profile } from './utils.js'; import { WsApi } from '../src/ws-api.js'; import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { clear as clearDwn, dwn } from './test-dwn.js'; +import { base64url } from 'multiformats/bases/base64'; +import { EventType } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName } from '@tbd54566975/dwn-sdk-js'; +import { DwnMethodName } from '@tbd54566975/dwn-sdk-js'; describe('Subscription Manager Test', async () => { let server: http.Server; @@ -57,12 +65,16 @@ describe('Subscription Manager Test', async () => { const signer = await DidKeyResolver.generate(); const req = await SubscriptionRequest.create({ signer: Jws.createSigner(signer), + filter: { + eventType: EventType.Operation, + }, }); const port = (wsServer.address() as AddressInfo).port; const ip = (wsServer.address() as AddressInfo).address; const addr = `ws://${ip}:${port}`; const socket = new WebSocket(addr); + let receivedCount = 0; const socketPromise = new Promise((resolve, reject) => { // set up lisetner... @@ -72,7 +84,16 @@ describe('Subscription Manager Test', async () => { if (resp.error) { throw new Error(resp.error.message); } - resolve(event); + receivedCount += 1; + if ( + resp.result?.descriptor?.eventDescriptor?.interface === + DwnInterfaceName.Records && + resp.result?.descriptor?.eventDescriptor?.method === + DwnMethodName.Write + ) { + resolve(event); + socket.close(); + } return; } catch (error) { reject(error); @@ -114,10 +135,31 @@ describe('Subscription Manager Test', async () => { } catch (error) { reject(error); } + try { + const { recordsWrite, dataStream } = + await createRecordsWriteMessage(alice); + const dataBytes = await DataStream.toBytes(dataStream); + const encodedData = base64url.baseEncode(dataBytes); + + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest( + requestId, + 'dwn.processMessage', + { + message: recordsWrite.toJSON(), + target: alice.did, + encodedData, + }, + ); + socket.send(JSON.stringify(dwnRequest)); + } catch (error) { + reject(error); + } return; }; }); await socketPromise; + assert.equal(receivedCount, 2, 'received count'); } catch (error) { assert.fail(error, undefined, 'failed to register subscription' + error); } From 800b2850bcca666266ab4fb0e8b8aa24802ddb1c Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Mon, 2 Oct 2023 11:12:26 +0530 Subject: [PATCH 6/6] refactored --- src/json-rpc-handlers/dwn/process-message.ts | 1 - src/subscription-manager.ts | 24 +------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index eeef9d1..a9a1c63 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -36,7 +36,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( messageType === DwnInterfaceName.Records + DwnMethodName.Write && !dataStream ) { - console.log('sending'); reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message); } else if ( messageType === diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index d3e1b2d..bb8c64d 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -2,7 +2,7 @@ import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; -import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; +import type { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; import type WebSocket from 'ws'; import { WebSocketServer } from 'ws'; @@ -60,12 +60,6 @@ export class SubscriptionManager { this.connections = new Map(); this.dwn = options?.dwn; this.options = options; - - this.wss.on('connection', (socket: WebSocket) => { - socket.on('message', async (data) => { - await this.handleSubscribe(socket, data); - }); - }); } async clear(): Promise { @@ -102,20 +96,6 @@ export class SubscriptionManager { }; } - async handleSubscribe( - socket: WebSocket, - data: any, - ): Promise { - // parse message - const req = await SubscriptionRequest.parse(data); - - return await this.subscribe({ - request: req, - socket: socket, - from: req.author, - }); - } - createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse { return { id: uuidv4(), @@ -137,10 +117,8 @@ export class SubscriptionManager { const subscription = await this.createSubscription(req.from, req); this.registerSubscription(subscription); // set up forwarding. - // console.log('---------', subscriptionReply.subscription.emitter); subscriptionReply.subscription.emitter.on( async (e: EventMessage): Promise => { - // console.log('got a record', e); const jsonRpcResponse = this.createJSONRPCEvent(e); const str = JSON.stringify(jsonRpcResponse); return req.socket.send(Buffer.from(str));