-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding subscription handling to dwn server. #68
Changes from 5 commits
d1126c8
3547c74
2c16e0c
f3c0b62
6e02d41
800b285
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,22 @@ | ||
import { v4 as uuidv4 } from 'uuid'; | ||
import type { Readable as IsomorphicReadable } from 'readable-stream'; | ||
import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; | ||
import type { | ||
RecordsReadReply, | ||
SubscriptionRequestReply, | ||
} from '@tbd54566975/dwn-sdk-js'; | ||
import { | ||
DwnInterfaceName, | ||
DwnMethodName, | ||
SubscriptionRequest, | ||
} from '@tbd54566975/dwn-sdk-js'; | ||
import type { | ||
HandlerResponse, | ||
JsonRpcHandler, | ||
} from '../../lib/json-rpc-router.js'; | ||
|
||
import { v4 as uuidv4 } from 'uuid'; | ||
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; | ||
|
||
import { | ||
JsonRpcErrorCodes, | ||
createJsonRpcErrorResponse, | ||
createJsonRpcSuccessResponse, | ||
JsonRpcErrorCodes, | ||
} from '../../lib/json-rpc.js'; | ||
|
||
export const handleDwnProcessMessage: JsonRpcHandler = async ( | ||
|
@@ -21,21 +26,45 @@ | |
const { dwn, dataStream } = context; | ||
const { target, message } = dwnRequest.params; | ||
const requestId = dwnRequest.id ?? uuidv4(); | ||
|
||
try { | ||
let reply; | ||
let reply: any; | ||
|
||
const messageType = | ||
message?.descriptor?.interface + message?.descriptor?.method; | ||
|
||
// When a record is deleted via `RecordsDelete`, the initial RecordsWrite is kept as a tombstone _in addition_ | ||
// to the RecordsDelete message. the data associated to that initial RecordsWrite is deleted. If a record was written | ||
// _and_ deleted before it ever got to dwn-server, we end up in a situation where we still need to process the tombstone | ||
// so that we can process the RecordsDelete. | ||
if ( | ||
messageType === DwnInterfaceName.Records + DwnMethodName.Write && | ||
!dataStream | ||
) { | ||
console.log('sending'); | ||
reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message); | ||
} else if ( | ||
messageType === | ||
DwnInterfaceName.Subscriptions + DwnMethodName.Request | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need to handle subscription requests differently than a normal request, because we also need to setup and manage the socket connection, unlike most messages which terminate after the data is sent. |
||
) { | ||
reply = (await dwn.processMessage( | ||
target, | ||
message, | ||
)) as SubscriptionRequestReply; | ||
if (!context.subscriptionManager || !context.socket) { | ||
throw new Error( | ||
'setup failure. improper context provided for subscription', | ||
); | ||
} | ||
|
||
// FIXME: How to handle subscription requests? | ||
const request = await SubscriptionRequest.create({}); | ||
const req = { | ||
socket: context.socket, | ||
from: message.descriptor.author, | ||
request: request, | ||
}; | ||
reply = await context.subscriptionManager.subscribe(req); | ||
const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { | ||
reply, | ||
}); | ||
const responsePayload: HandlerResponse = { jsonRpcResponse }; | ||
return responsePayload; | ||
} else { | ||
reply = (await dwn.processMessage( | ||
target, | ||
|
@@ -44,7 +73,6 @@ | |
)) as RecordsReadReply; | ||
} | ||
|
||
// RecordsRead messages return record data as a stream to for accommodate large amounts of data | ||
let recordDataStream; | ||
if (reply?.record?.data !== undefined) { | ||
recordDataStream = reply.record.data; | ||
|
@@ -64,7 +92,6 @@ | |
JsonRpcErrorCodes.InternalError, | ||
e.message, | ||
); | ||
|
||
return { jsonRpcResponse } as HandlerResponse; | ||
} | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; | ||
import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; | ||
|
||
import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; | ||
import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; | ||
import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; | ||
import type WebSocket from 'ws'; | ||
import { WebSocketServer } from 'ws'; | ||
import { v4 as uuidv4 } from 'uuid'; | ||
|
||
export class Subscription { | ||
from?: string; | ||
subscriptionId: string; | ||
createdAt: string; | ||
description: string; | ||
filters?: SubscriptionFilter[]; | ||
permissionGrant: PermissionsGrant; | ||
connection: WebSocket; | ||
} | ||
|
||
export interface SubscriptionController { | ||
clear(): Promise<void>; | ||
close(): Promise<void>; | ||
start(): Promise<void>; | ||
subscribe( | ||
request: RegisterSubscriptionRequest, | ||
): Promise<RegisterSubscriptionReply>; | ||
} | ||
|
||
export type RegisterSubscriptionRequest = { | ||
from: string; // from connection | ||
socket: WebSocket; // socket connection | ||
filters?: SubscriptionFilter[]; // filters, if applicable | ||
permissionGrant?: PermissionsGrant; //permission grant, if applicable | ||
request: SubscriptionRequest; // subscription request | ||
}; | ||
|
||
export type RegisterSubscriptionReply = { | ||
reply?: SubscriptionRequestReply; | ||
subscriptionId?: string; | ||
}; | ||
|
||
export type defaultSubscriptionChannel = 'event'; | ||
|
||
export type SubscriptionManagerOptions = { | ||
wss?: WebSocketServer; | ||
dwn: Dwn; | ||
tenant: string; | ||
}; | ||
|
||
export class SubscriptionManager { | ||
private wss: WebSocketServer; | ||
private dwn: Dwn; | ||
private connections: Map<string, Subscription>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. keep track of all the subscriptions here. IMPORTANT is to close the subscription when it's finished. |
||
options: SubscriptionManagerOptions; | ||
#open: boolean; | ||
|
||
constructor(options?: SubscriptionManagerOptions) { | ||
this.wss = options?.wss || new WebSocketServer(); | ||
this.connections = new Map(); | ||
this.dwn = options?.dwn; | ||
this.options = options; | ||
|
||
this.wss.on('connection', (socket: WebSocket) => { | ||
socket.on('message', async (data) => { | ||
await this.handleSubscribe(socket, data); | ||
}); | ||
}); | ||
} | ||
|
||
async clear(): Promise<void> { | ||
this.wss.removeAllListeners(); | ||
this.connections.clear(); | ||
} | ||
|
||
async close(): Promise<void> { | ||
this.#open = false; | ||
this.connections.clear(); | ||
this.wss.close(); | ||
} | ||
|
||
async open(): Promise<void> { | ||
this.#open = true; | ||
} | ||
|
||
async start(): Promise<void> { | ||
this.open(); | ||
} | ||
|
||
private async createSubscription( | ||
from: string, | ||
request: RegisterSubscriptionRequest, | ||
): Promise<Subscription> { | ||
return { | ||
from, | ||
subscriptionId: uuidv4(), | ||
createdAt: new Date().toISOString(), | ||
description: 'subscription', | ||
filters: request.filters, | ||
permissionGrant: request.permissionGrant, | ||
connection: request.socket, | ||
}; | ||
} | ||
|
||
async handleSubscribe( | ||
socket: WebSocket, | ||
data: any, | ||
): Promise<RegisterSubscriptionReply> { | ||
// parse message | ||
const req = await SubscriptionRequest.parse(data); | ||
|
||
return await this.subscribe({ | ||
request: req, | ||
socket: socket, | ||
from: req.author, | ||
}); | ||
} | ||
|
||
createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse { | ||
return { | ||
id: uuidv4(), | ||
jsonrpc: '2.0', | ||
result: e, | ||
}; | ||
} | ||
|
||
async subscribe( | ||
req: RegisterSubscriptionRequest, | ||
): Promise<RegisterSubscriptionReply> { | ||
const subscriptionReply = await this.dwn.handleSubscriptionRequest( | ||
req.from, | ||
req.request.message, | ||
); | ||
if (subscriptionReply.status.code !== 200) { | ||
return { reply: subscriptionReply }; | ||
} | ||
const subscription = await this.createSubscription(req.from, req); | ||
this.registerSubscription(subscription); | ||
// set up forwarding. | ||
// console.log('---------', subscriptionReply.subscription.emitter); | ||
subscriptionReply.subscription.emitter.on( | ||
async (e: EventMessage): Promise<void> => { | ||
// console.log('got a record', e); | ||
const jsonRpcResponse = this.createJSONRPCEvent(e); | ||
const str = JSON.stringify(jsonRpcResponse); | ||
return req.socket.send(Buffer.from(str)); | ||
}, | ||
); | ||
return { | ||
reply: subscriptionReply, | ||
subscriptionId: subscription?.subscriptionId, | ||
} as RegisterSubscriptionReply; | ||
} | ||
|
||
private async registerSubscription( | ||
subscription: Subscription, | ||
): Promise<void> { | ||
if (!this.#open) { | ||
throw new Error("Can't register subscription. It's not opened."); | ||
} | ||
if (this.connections.has(subscription.subscriptionId)) { | ||
throw new Error( | ||
'Failed to add connection to controller. ID already exists.', | ||
); | ||
} | ||
this.connections.set(subscription.subscriptionId, subscription); | ||
subscription.connection.on('close', () => { | ||
this.deleteSubscription(subscription.subscriptionId); | ||
}); | ||
} | ||
|
||
private async deleteSubscription(id: string): Promise<void> { | ||
this.connections.delete(id); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please change the environment variable to
DWN_SUBSCRIPTIONS
. I realize not all environment variables currently follow that convention, but I would like to move in that direction.