From f9da1c48a25ccc0c19614476f5bddcf433a421c0 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Fri, 24 Nov 2023 13:02:48 -0500 Subject: [PATCH] refactor messages --- src/banner.ts | 37 ----------------------- src/fetch/GET.ts | 10 +++---- src/fetch/POST.ts | 4 +-- src/{ => fetch}/health.ts | 2 +- src/fetch/messages.ts | 61 ++++++++++++++++++++++++++++++++++++++ src/recentMessages.ts | 62 --------------------------------------- 6 files changed, 68 insertions(+), 108 deletions(-) delete mode 100644 src/banner.ts rename src/{ => fetch}/health.ts (78%) create mode 100644 src/fetch/messages.ts delete mode 100644 src/recentMessages.ts diff --git a/src/banner.ts b/src/banner.ts deleted file mode 100644 index e0885e9..0000000 --- a/src/banner.ts +++ /dev/null @@ -1,37 +0,0 @@ -import pkg from "../package.json" assert { type: "json" }; - -// https://fsymbols.com/generators/carty/ -export function banner() { - let text =` - - ░██╗░░░░░░░██╗███████╗██████╗░░██████╗░█████╗░░█████╗░██╗░░██╗███████╗████████╗░██████╗ - ░██║░░██╗░░██║██╔════╝██╔══██╗██╔════╝██╔══██╗██╔══██╗██║░██╔╝██╔════╝╚══██╔══╝██╔════╝ - ░╚██╗████╗██╔╝█████╗░░██████╦╝╚█████╗░██║░░██║██║░░╚═╝█████═╝░█████╗░░░░░██║░░░╚█████╗░ - ░░████╔═████║░██╔══╝░░██╔══██╗░╚═══██╗██║░░██║██║░░██╗██╔═██╗░██╔══╝░░░░░██║░░░░╚═══██╗ - ░░╚██╔╝░╚██╔╝░███████╗██████╦╝██████╔╝╚█████╔╝╚█████╔╝██║░╚██╗███████╗░░░██║░░░██████╔╝ - ░░░╚═╝░░░╚═╝░░╚══════╝╚═════╝░╚═════╝░░╚════╝░░╚════╝░╚═╝░░╚═╝╚══════╝░░░╚═╝░░░╚═════╝░ - -` - text += ` 🚀 ${pkg.description} v${pkg.version} - - Documentation: ${pkg.homepage} - - HTTP GET - /health - /metrics - /moduleHash - /moduleHashByChain - /traceId - /chain - - HTTP POST (Ed25519 signature) - / {timestamp, signature, body} - / {"message": "PING"} - - WebSocket - Message {"method": "ping"} - Message {"method": "time"} - Message {"method": "subscribe", "params": {moduleHash, chain}} -` - return text; -} \ No newline at end of file diff --git a/src/fetch/GET.ts b/src/fetch/GET.ts index c86bdad..8f7f4f1 100644 --- a/src/fetch/GET.ts +++ b/src/fetch/GET.ts @@ -1,4 +1,3 @@ -import { checkHealth } from "../health.js"; import * as prometheus from "../prometheus.js"; import * as sqlite from "../sqlite.js"; import { db } from "../../index.js"; @@ -8,8 +7,9 @@ import openapi from "./openapi.js"; import swaggerHtml from "../../swagger/index.html" import swaggerFavicon from "../../swagger/favicon.png" import { toFile, toJSON, toText } from "./cors.js"; -import { recentMessagesEndpoint } from "../recentMessages.js"; +import { selectMessages } from "./messages.js"; import { DEFAULT_RECENT_MESSAGES_LIMIT } from "../config.js"; +import { checkHealth } from "./health.js"; export default async function (req: Request, server: Server) { const { pathname, searchParams} = new URL(req.url); @@ -24,12 +24,10 @@ export default async function (req: Request, server: Server) { let limit = Number(searchParams.get("limit")); let sort = searchParams.get("sort"); - //error handling - + // error handling if (limit === null || limit === 0) limit = DEFAULT_RECENT_MESSAGES_LIMIT; if (isNaN(Number(limit))) return toText("limit must be a number", 400 ); if (sort === null) sort = "desc"; - console.log(distinct) if (distinct !== "true" && distinct !== null) return toText("distinct must be set to true if declared", 400 ); if (distinct === "true" && chain) return toText("chain cannot be set if distinct is set to true", 400 ); if (sort !== "asc" && sort !== "desc") return toText("sort must be asc or desc", 400 ); @@ -48,7 +46,7 @@ export default async function (req: Request, server: Server) { if ( pathname === "/traceId") return toJSON(sqlite.selectAll(db, "traceId")); if ( pathname === "/chain") return toJSON(sqlite.selectAll(db, "chain")); if ( pathname === "/openapi") return toJSON(openapi); - if ( pathname === "/recentMessages") return recentMessagesEndpoint(db, chain, moduleHash, limit, distinct, sort); + if ( pathname === "/messages") return selectMessages(db, chain, moduleHash, limit, distinct, sort); return toText("Not found", 400 ); } \ No newline at end of file diff --git a/src/fetch/POST.ts b/src/fetch/POST.ts index 19c7a87..914b938 100644 --- a/src/fetch/POST.ts +++ b/src/fetch/POST.ts @@ -6,7 +6,7 @@ import { verify } from "../verify.js"; import { PUBLIC_KEY } from "../config.js"; import { Server } from "bun"; import { toText } from "./cors.js"; -import { recentMessages } from "../recentMessages.js"; +import { insertMessages } from "./messages.js"; export default async function (req: Request, server: Server) { // get headers and body from POST request @@ -87,7 +87,7 @@ export default async function (req: Request, server: Server) { sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp); //Set timestamp as key to filter recent messages - recentMessages( db, traceId, JSON.stringify(json), chain ); + insertMessages( db, traceId, JSON.stringify(json), chain ); return toText("OK"); } \ No newline at end of file diff --git a/src/health.ts b/src/fetch/health.ts similarity index 78% rename from src/health.ts rename to src/fetch/health.ts index 18b17da..c44c614 100644 --- a/src/health.ts +++ b/src/fetch/health.ts @@ -1,4 +1,4 @@ -import { toText } from "./fetch/cors.js"; +import { toText } from "./cors.js"; // https://github.com/pinax-network/substreams-sink-websockets/issues/2#issuecomment-1746121519 export function checkHealth() { diff --git a/src/fetch/messages.ts b/src/fetch/messages.ts new file mode 100644 index 0000000..c790dfc --- /dev/null +++ b/src/fetch/messages.ts @@ -0,0 +1,61 @@ +import * as sqlite from "../sqlite.js"; +import { RECENT_MESSAGES_LIMIT } from "../config.js"; +import { toJSON } from "../http.js"; +import Database from "bun:sqlite"; + +export function insertMessages(db: Database, traceId: string, timestamp: string, chain?: string) { + const dbLength = sqlite.count(db, "messages"); + + if (dbLength >= RECENT_MESSAGES_LIMIT) { + let oldest = sqlite.selectAll(db, "messages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; + console.log("oldest", oldest) + + // update messages + sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp); + sqlite.deleteRow(db, "messages", oldest.key); + + // update messagesByChain + if (chain) { + oldest = sqlite.selectAll(db, "messagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; + console.log(oldest) + sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); + sqlite.deleteRow(db, "messagesByChain", `${oldest.key}`); + } + return; + } + // add messages if tables not full + sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp); + + if (chain) sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); +} + +export function selectMessages(db: Database, chain?: string, moduleHash?: string, limit?: number, distinct?: string, sortBy?: string) { + + let messages = sqlite.selectAllRecent(db, "messages", "*", sortBy, limit); + + if (distinct) messages = selectDistinct(distinct, messages, db, chain, sortBy, limit); + if (chain) messages = sqlite.selectAllRecent(db, "messagesByChain", "*", sortBy, limit).filter((message: any) => message.value.includes(chain)); + if (moduleHash) messages = messages.filter((message: any) => message.value.includes(moduleHash)); + + return toJSON(messages); +} + +export function selectDistinct(distinct?: string, messages?: any, db?: any, chain?: string, sortBy?: string, limit?: number) { + let chainList = sqlite.selectAll(db, "chain"); + if (distinct === "true") { + + let distinctChain = []; + for (let i = 0; i < chainList.length; i++) { + let chainName = chainList[i].key; + + messages = sqlite.selectAllRecent(db, "messagesByChain", "*", sortBy, limit) + messages = messages.filter((message: any) => message.value.includes(chainName)); + + distinctChain.push(messages[0]); + chainList.slice(i, 1); + } + let result = distinctChain.filter((notNull) => notNull !== undefined) + return result; + } + return; +} diff --git a/src/recentMessages.ts b/src/recentMessages.ts deleted file mode 100644 index 198fa11..0000000 --- a/src/recentMessages.ts +++ /dev/null @@ -1,62 +0,0 @@ -import * as sqlite from "../src/sqlite.js"; -import { RECENT_MESSAGES_LIMIT } from "./config.js"; -import { toJSON, toText } from "./http.js"; - -export function recentMessages(db: any, traceId: string, timestamp: string, chain?: string) { - const dbLength = sqlite.count(db, "recentMessages"); - - if (dbLength >= RECENT_MESSAGES_LIMIT) { - let oldest = sqlite.selectAll(db, "recentMessages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; - console.log("oldest", oldest) - - //update recentMessages - sqlite.replaceRecent(db, "recentMessages", String(Date.now()), `${traceId}`, timestamp); - sqlite.deleteRow(db, "recentMessages", oldest.key); - - //update recentMessagesByChain - if (chain) { - oldest = sqlite.selectAll(db, "recentMessagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; - console.log(oldest) - sqlite.replaceRecent(db, "recentMessagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); - sqlite.deleteRow(db, "recentMessagesByChain", `${oldest.key}`); - } - return; - } - //add messages if tables not full - sqlite.replaceRecent(db, "recentMessages", String(Date.now()), `${traceId}`, timestamp); - - if (chain) sqlite.replaceRecent(db, "recentMessagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); - return; -} - - -export function recentMessagesEndpoint(db: any, chain?: string, moduleHash?: string, limit?: number, distinct?: string, sortBy?: string) { - - let messages = sqlite.selectAllRecent(db, "recentMessages", "*", sortBy, limit); - - if (distinct) messages = fetchDistinct(distinct, messages, db, chain, sortBy, limit); - if (chain) messages = sqlite.selectAllRecent(db, "recentMessagesByChain", "*", sortBy, limit).filter((message: any) => message.value.includes(chain)); - if (moduleHash) messages = messages.filter((message: any) => message.value.includes(moduleHash)); - - return toJSON(messages); -} - -function fetchDistinct(distinct?: string, messages?: any, db?: any, chain?: string, sortBy?: string, limit?: number) { - let chainList = sqlite.selectAll(db, "chain"); - if (distinct === "true") { - - let distinctChain = []; - for (let i = 0; i < chainList.length; i++) { - let chainName = chainList[i].key; - - messages = sqlite.selectAllRecent(db, "recentMessagesByChain", "*", sortBy, limit) - messages = messages.filter((message: any) => message.value.includes(chainName)); - - distinctChain.push(messages[0]); - chainList.slice(i, 1); - } - let result = distinctChain.filter((notNull) => notNull !== undefined) - return result; - } - return; -}