From 2fd4b67733bfcf797734a4ec2c6b83bfd5ca6e26 Mon Sep 17 00:00:00 2001 From: Samuel Papineau Date: Thu, 12 Oct 2023 13:37:49 -0400 Subject: [PATCH] Subscribe API endpoint and saving sqlite limit data by ip --- src/fetch/GET.ts | 4 +++- src/sqlite.ts | 14 ++++++++++++++ src/websocket/open.ts | 21 ++++++++++++++++++--- src/websocket/websocketClient.ts | 29 +++++++++++++++++++++++++++++ 4 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 src/websocket/websocketClient.ts diff --git a/src/fetch/GET.ts b/src/fetch/GET.ts index 464da04..91c300c 100644 --- a/src/fetch/GET.ts +++ b/src/fetch/GET.ts @@ -6,6 +6,7 @@ import { db } from "../../index.js"; import { toJSON } from "../http.js"; import { Server } from "bun"; import { logger } from "../logger.js"; +import { websocketClient } from "../websocket/websocketClient.js" export default async function (req: Request, server: Server) { // Bun automatically returns a 101 Switching Protocols @@ -17,7 +18,7 @@ export default async function (req: Request, server: Server) { return; } - const { pathname } = new URL(req.url); + const { pathname, searchParams} = new URL(req.url); if ( pathname === "/") return new Response(banner()) if ( pathname === "/health") return checkHealth(); if ( pathname === "/metrics") return new Response(await prometheus.registry.metrics()); @@ -25,5 +26,6 @@ export default async function (req: Request, server: Server) { if ( pathname === "/moduleHashByChain") return toJSON(sqlite.selectAll(db, "moduleHashByChain")); if ( pathname === "/traceId") return toJSON(sqlite.selectAll(db, "traceId")); if ( pathname === "/chain") return toJSON(sqlite.selectAll(db, "chain")); + if ( pathname === "/subscribe") return new Response(await websocketClient(req.url, searchParams)); return new Response("Not found", { status: 400 }); } \ No newline at end of file diff --git a/src/sqlite.ts b/src/sqlite.ts index 31e65e4..3629cbb 100644 --- a/src/sqlite.ts +++ b/src/sqlite.ts @@ -17,6 +17,7 @@ export function createDb(filename: string) { create(db, "moduleHashByChain"); create(db, "traceId"); create(db, "chain"); + createInc(db, "connection"); return db; } @@ -25,6 +26,11 @@ export function create(db: Database, table: string) { return db.run(`CREATE TABLE IF NOT EXISTS ${table} (key TEXT PRIMARY KEY, value TEXT)`); } +export function createInc(db: Database, table: string) { + if ( !table ) throw new Error("table is required"); + return db.run(`CREATE TABLE IF NOT EXISTS ${table} (key TEXT PRIMARY KEY, value TEXT, timestamp INTEGER)`); +} + export function replace(db: Database, table: string, key: string, value: string|number) { return db.prepare(`REPLACE INTO ${table} (key, value) VALUES (?, ?)`).run(key, value); } @@ -46,6 +52,14 @@ export function exists(db: Database, table: string, key: string ) { return select(db, table, key).length > 0; } +export function increment(db: Database, table: string, key: string, value: number, timestamp: number) { + if ( exists(db, table, key) ) { + return db.prepare(`UPDATE ${table} SET value = value + ?, timestamp = ? WHERE key = ?`).run(value, timestamp, key); + } + return db.prepare(`REPLACE INTO ${table} (key, value, timestamp) VALUES (?, ?, ?)`).run(key, value, timestamp); +} + + // TO-DO: UPDATE (increment/decrement) // UPDATE product SET price = price + 50 WHERE id = 1 // UPDATE {table} SET {column} = {column} + {value} WHERE {condition} \ No newline at end of file diff --git a/src/websocket/open.ts b/src/websocket/open.ts index 01c95ed..8016c72 100644 --- a/src/websocket/open.ts +++ b/src/websocket/open.ts @@ -2,9 +2,24 @@ import { ServerWebSocket } from "bun"; import { logger } from "../logger.js"; import { ServerWebSocketData } from "../../index.js"; import * as prometheus from "../prometheus.js"; +import { increment, count } from "../sqlite.js"; +import { db } from "../../index.js"; export default function (ws: ServerWebSocket) { - prometheus.connection_active.inc(1); - prometheus.connection_open.inc(1); - logger.info('open', {key: ws.data.key, remoteAddress: ws.remoteAddress}); + //set count as limit + const count = true; + try { + if (count === true) { + increment(db, "connection", ws.remoteAddress, 1, Number(Date.now())); + prometheus.connection_active.inc(1); + prometheus.connection_open.inc(1); + logger.info('open', {key: ws.data.key, remoteAddress: ws.remoteAddress}); + } + } + catch(e) { + logger.error(e); + ws.send(JSON.stringify({ status: 429, error: { message: e.message } })); + ws.close(); + } + } diff --git a/src/websocket/websocketClient.ts b/src/websocket/websocketClient.ts new file mode 100644 index 0000000..c22bf11 --- /dev/null +++ b/src/websocket/websocketClient.ts @@ -0,0 +1,29 @@ +import { randomUUID } from "crypto"; + +let payload: Array = [] +export async function websocketClient(url: string, searchParams: URLSearchParams){ + const ws = new WebSocket(url); + let moduleHash = "" + + if (!searchParams.has("moduleHash")) throw new Error("moduleHash is required") + + if (!searchParams.has("chain")) moduleHash = searchParams.get("moduleHash") + if (searchParams.has("chain")) moduleHash = `${searchParams.get("chain")}:${searchParams.get("moduleHash")}`; + + ws.onopen = () => { + console.log("connected"); + ws.send(JSON.stringify({ + id: randomUUID(), + method: "subscribe", + params: { + moduleHash: moduleHash + } + })); + }; + + ws.onmessage = (event) => { + payload.push(String(event.data)) + }; + + return payload +} \ No newline at end of file