Skip to content

Commit

Permalink
refactor: improve handler args
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 committed Jan 28, 2024
1 parent 66629e9 commit 2791448
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 107 deletions.
2 changes: 1 addition & 1 deletion build.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ export default defineBuildConfig({
rollup: {
inlineDependencies: true,
},
externals: ["@cloudflare/workers-types"],
externals: ["@cloudflare/workers-types", "bun-types"],
});
65 changes: 38 additions & 27 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
// https://bun.sh/docs/api/websockets

// @ts-nocheck
import type * as _bun from "bun-types";
// @ts-expect-error
import type {} from "bun-types";

import { WebSocketMessage } from "../message";
// import { WebSocketError } from "../error";
import { WebSocketPeer } from "../peer";
import { WebSocketError } from "../error";
import { WebSocketPeerBase } from "../peer";
import { defineWebSocketAdapter } from "../adapter";

export const WebSocket = globalThis.WebSocket;

export interface AdapterOptions {}

type WebSocketHandler<T = unknown> = Extract<
Parameters<typeof Bun.serve<T>>[0],
type ContextData = { _peer?: WebSocketPeer };

type WebSocketHandler = Extract<
Parameters<typeof Bun.serve<ContextData>>[0],
{ websocket: any }
>["websocket"];

Expand All @@ -25,52 +27,61 @@ export interface Adapter {

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(handler, opts = {}) => {
const getPeer = (ws: ServerWebSocket) => {
if (ws.data?._peer) {
return ws.data._peer;
}
const peer = new WebSocketPeer({ bun: { ws } });
ws.data = ws.data || {};
ws.data._peer = peer;
return peer;
};

return {
websocket: {
message: (ws, message) => {
handler.onEvent?.("bun:message", ws, message);
const peer = new BunWebSocketPeer(ws);
const peer = getPeer(ws);
handler?.onEvent?.("bun:message", peer, ws, message);
handler.onMessage?.(peer, new WebSocketMessage(message));
},
open: (ws) => {
handler.onEvent?.("bun:open", ws);
const peer = new BunWebSocketPeer(ws);
const peer = getPeer(ws);
handler?.onEvent?.("bun:open", peer, ws);
handler.onOpen?.(peer);
},
close: (ws) => {
handler.onEvent?.("bun:close", ws);
const peer = new BunWebSocketPeer(ws);
const peer = getPeer(ws);
handler?.onEvent?.("bun:close", peer, ws);
handler.onClose?.(peer, 0, "");
},
// TODO
// error: (ws, error) => {
// handler.onEvent?.("bun:error", ws, error);
// const peer = new BunWebSocketPeer(ws);
// handler.onError?.(peer, new WebSocketError(error));
// },
drain: (ws) => {
handler.onEvent?.("bun:drain", ws);
const peer = getPeer(ws);
handler?.onEvent?.("bun:drain", peer);
},
// @ts-expect-error types unavailable but mentioned in docs
error: (ws, error) => {
const peer = getPeer(ws);
handler?.onEvent?.("bun:error", peer, error);
handler.onError?.(peer, new WebSocketError(error));
},
},
};
},
);

class BunWebSocketPeer extends WebSocketPeer {
constructor(private _ws: ServerWebSocket) {
super();
}

class WebSocketPeer extends WebSocketPeerBase<{
bun: { ws: ServerWebSocket };
}> {
get id() {
return this._ws.remoteAddress;
return this.ctx.bun.ws.remoteAddress;
}

get readyState() {
return this._ws.readyState as any;
return this.ctx.bun.ws.readyState as any;
}

send(message: string | ArrayBuffer) {
this._ws.send(message);
this.ctx.bun.ws.send(message);
return 0;
}
}
33 changes: 20 additions & 13 deletions src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import type * as _cf from "@cloudflare/workers-types";

import { WebSocketPeer } from "../peer";
import { WebSocketPeerBase } from "../peer";
import { defineWebSocketAdapter } from "../adapter.js";
import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
Expand All @@ -25,30 +25,36 @@ export interface Adapter {
export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(handler, opts = {}) => {
const handleUpgrade = (
req: _cf.Request,
request: _cf.Request,
env: Env,
context: _cf.ExecutionContext,
) => {
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];

const peer = new CloudflareWebSocketPeer(client, server);
const peer = new CloudflareWebSocketPeer({
cloudflare: { client, server, request, env, context },
});

server.accept();

// open event is not fired by cloudflare!
handler.onEvent?.("cloudflare:accept", peer);
handler.onOpen?.(peer);

server.addEventListener("message", (event) => {
handler?.onEvent?.("cloudflare:message", peer, event);
handler.onMessage?.(peer, new WebSocketMessage(event.data));
});

server.addEventListener("error", (event) => {
handler?.onEvent?.("cloudflare:error", peer, event);
handler.onError?.(peer, new WebSocketError(event.error));
});

server.addEventListener("close", (event) => {
handler?.onEvent?.("cloudflare:close", peer, event);
handler.onClose?.(peer, event.code, event.reason);
});

Expand All @@ -65,24 +71,25 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
},
);

class CloudflareWebSocketPeer extends WebSocketPeer {
constructor(
private _client: _cf.WebSocket,
private _server: _cf.WebSocket,
) {
super();
}

class CloudflareWebSocketPeer extends WebSocketPeerBase<{
cloudflare: {
client: _cf.WebSocket;
server: _cf.WebSocket;
request: _cf.Request;
env: Env;
context: _cf.ExecutionContext;
};
}> {
get id() {
return undefined;
}

get readyState() {
return this._client.readyState as -1 | 0 | 1 | 2 | 3;
return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
}

send(message: string | ArrayBuffer) {
this._server.send(message);
this.ctx.cloudflare.server.send(message);
return 0;
}
}
42 changes: 18 additions & 24 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,40 @@
// https://deno.land/api?s=Deno.upgradeWebSocket
// https://examples.deno.land/http-server-websocket

// @ts-nocheck
import type * as _deno from "../../types/lib.deno.d.ts";
import "../../types/lib.deno.d.ts";

import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
import { WebSocketPeer } from "../peer";
import { WebSocketPeerBase } from "../peer";
import { defineWebSocketAdapter } from "../adapter.js";

export const WebSocket = globalThis.WebSocket;

export interface AdapterOptions {}

export interface Adapter {
handleUpgrade(req: Deno.Request): Response;
handleUpgrade(req: Request): Response;
}

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(handler, opts = {}) => {
const handleUpgrade = (req: Request) => {
const upgrade = Deno.upgradeWebSocket(req);
const handleUpgrade = (request: Request) => {
const upgrade = Deno.upgradeWebSocket(request);
const peer = new DenoWebSocketPeer({
deno: { ws: upgrade.socket, request },
});
upgrade.socket.addEventListener("open", () => {
handler.onEvent?.("deno:open", upgrade.socket);
const peer = new DenoWebSocketPeer(upgrade.socket);
handler.onEvent?.("deno:open", peer);
handler.onOpen?.(peer);
});
upgrade.socket.addEventListener("message", (event) => {
handler.onEvent?.("deno:message", upgrade.socket, event);
const peer = new DenoWebSocketPeer(upgrade.socket);
handler.onEvent?.("deno:message", peer, event);
handler.onMessage?.(peer, new WebSocketMessage(event.data));
});
upgrade.socket.addEventListener("close", () => {
handler.onEvent?.("deno:close", upgrade.socket);
const peer = new DenoWebSocketPeer(upgrade.socket);
handler.onEvent?.("deno:close", peer);
handler.onClose?.(peer, 0, "");
});
upgrade.socket.addEventListener("error", (error) => {
handler.onEvent?.("deno:error", upgrade.socket, error);
const peer = new DenoWebSocketPeer(upgrade.socket);
handler.onEvent?.("deno:error", peer, error);
handler.onError?.(peer, new WebSocketError(error));
});
return upgrade.response;
Expand All @@ -51,21 +47,19 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
},
);

class DenoWebSocketPeer extends WebSocketPeer {
constructor(private _ws: DenoWebSocketPeer) {
super();
}

class DenoWebSocketPeer extends WebSocketPeerBase<{
deno: { ws: any; request: Request };
}> {
get id() {
return this._ws.remoteAddress;
return this.ctx.deno.ws.remoteAddress;
}

get readyState() {
return this._ws.readyState as any;
return this.ctx.deno.ws.readyState as -1 | 0 | 1 | 2 | 3;
}

send(message: string | ArrayBuffer) {
this._ws.send(message);
this.ctx.deno.ws.send(message);
return 0;
}
}
Loading

0 comments on commit 2791448

Please sign in to comment.