diff --git a/src/banner.ts b/src/banner.ts deleted file mode 100644 index e0885e9..0000000 --- a/src/banner.ts +++ /dev/null @@ -1,37 +0,0 @@ -import pkg from "../package.json" assert { type: "json" }; - -// https://fsymbols.com/generators/carty/ -export function banner() { - let text =` - - ░██╗░░░░░░░██╗███████╗██████╗░░██████╗░█████╗░░█████╗░██╗░░██╗███████╗████████╗░██████╗ - ░██║░░██╗░░██║██╔════╝██╔══██╗██╔════╝██╔══██╗██╔══██╗██║░██╔╝██╔════╝╚══██╔══╝██╔════╝ - ░╚██╗████╗██╔╝█████╗░░██████╦╝╚█████╗░██║░░██║██║░░╚═╝█████═╝░█████╗░░░░░██║░░░╚█████╗░ - ░░████╔═████║░██╔══╝░░██╔══██╗░╚═══██╗██║░░██║██║░░██╗██╔═██╗░██╔══╝░░░░░██║░░░░╚═══██╗ - ░░╚██╔╝░╚██╔╝░███████╗██████╦╝██████╔╝╚█████╔╝╚█████╔╝██║░╚██╗███████╗░░░██║░░░██████╔╝ - ░░░╚═╝░░░╚═╝░░╚══════╝╚═════╝░╚═════╝░░╚════╝░░╚════╝░╚═╝░░╚═╝╚══════╝░░░╚═╝░░░╚═════╝░ - -` - text += ` 🚀 ${pkg.description} v${pkg.version} - - Documentation: ${pkg.homepage} - - HTTP GET - /health - /metrics - /moduleHash - /moduleHashByChain - /traceId - /chain - - HTTP POST (Ed25519 signature) - / {timestamp, signature, body} - / {"message": "PING"} - - WebSocket - Message {"method": "ping"} - Message {"method": "time"} - Message {"method": "subscribe", "params": {moduleHash, chain}} -` - return text; -} \ No newline at end of file diff --git a/src/fetch/GET.ts b/src/fetch/GET.ts index c86bdad..d82c7e7 100644 --- a/src/fetch/GET.ts +++ b/src/fetch/GET.ts @@ -1,4 +1,3 @@ -import { checkHealth } from "../health.js"; import * as prometheus from "../prometheus.js"; import * as sqlite from "../sqlite.js"; import { db } from "../../index.js"; @@ -8,8 +7,8 @@ import openapi from "./openapi.js"; import swaggerHtml from "../../swagger/index.html" import swaggerFavicon from "../../swagger/favicon.png" import { toFile, toJSON, toText } from "./cors.js"; -import { recentMessagesEndpoint } from "../recentMessages.js"; -import { DEFAULT_RECENT_MESSAGES_LIMIT } from "../config.js"; +import { handleMessages } from "./messages.js"; +import { checkHealth } from "./health.js"; export default async function (req: Request, server: Server) { const { pathname, searchParams} = new URL(req.url); @@ -19,21 +18,7 @@ export default async function (req: Request, server: Server) { const key = req.headers.get("sec-websocket-key") const chain = searchParams.get("chain") const moduleHash = searchParams.get("moduleHash"); - const distinct = searchParams.get("distinct"); const success = server.upgrade(req, {data: {key, chain, moduleHash}}); - let limit = Number(searchParams.get("limit")); - let sort = searchParams.get("sort"); - - //error handling - - if (limit === null || limit === 0) limit = DEFAULT_RECENT_MESSAGES_LIMIT; - if (isNaN(Number(limit))) return toText("limit must be a number", 400 ); - if (sort === null) sort = "desc"; - console.log(distinct) - if (distinct !== "true" && distinct !== null) return toText("distinct must be set to true if declared", 400 ); - if (distinct === "true" && chain) return toText("chain cannot be set if distinct is set to true", 400 ); - if (sort !== "asc" && sort !== "desc") return toText("sort must be asc or desc", 400 ); - if (success) { logger.info('upgrade', {key, chain, moduleHash}); return; @@ -48,7 +33,7 @@ export default async function (req: Request, server: Server) { if ( pathname === "/traceId") return toJSON(sqlite.selectAll(db, "traceId")); if ( pathname === "/chain") return toJSON(sqlite.selectAll(db, "chain")); if ( pathname === "/openapi") return toJSON(openapi); - if ( pathname === "/recentMessages") return recentMessagesEndpoint(db, chain, moduleHash, limit, distinct, sort); + if ( pathname === "/messages") return handleMessages(req); return toText("Not found", 400 ); } \ No newline at end of file diff --git a/src/fetch/POST.ts b/src/fetch/POST.ts index 19c7a87..914b938 100644 --- a/src/fetch/POST.ts +++ b/src/fetch/POST.ts @@ -6,7 +6,7 @@ import { verify } from "../verify.js"; import { PUBLIC_KEY } from "../config.js"; import { Server } from "bun"; import { toText } from "./cors.js"; -import { recentMessages } from "../recentMessages.js"; +import { insertMessages } from "./messages.js"; export default async function (req: Request, server: Server) { // get headers and body from POST request @@ -87,7 +87,7 @@ export default async function (req: Request, server: Server) { sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp); //Set timestamp as key to filter recent messages - recentMessages( db, traceId, JSON.stringify(json), chain ); + insertMessages( db, traceId, JSON.stringify(json), chain ); return toText("OK"); } \ No newline at end of file diff --git a/src/health.ts b/src/fetch/health.ts similarity index 78% rename from src/health.ts rename to src/fetch/health.ts index 18b17da..c44c614 100644 --- a/src/health.ts +++ b/src/fetch/health.ts @@ -1,4 +1,4 @@ -import { toText } from "./fetch/cors.js"; +import { toText } from "./cors.js"; // https://github.com/pinax-network/substreams-sink-websockets/issues/2#issuecomment-1746121519 export function checkHealth() { diff --git a/src/fetch/messages.ts b/src/fetch/messages.ts new file mode 100644 index 0000000..48bfb31 --- /dev/null +++ b/src/fetch/messages.ts @@ -0,0 +1,90 @@ +import * as sqlite from "../sqlite.js"; +import { DEFAULT_RECENT_MESSAGES_LIMIT, RECENT_MESSAGES_LIMIT } from "../config.js"; +import Database from "bun:sqlite"; +import { db } from "../../index.js"; +import { toJSON } from "./cors.js"; + +export function parseLimit(searchParams: URLSearchParams) { + const value = searchParams.get("limit"); + if (!value) return DEFAULT_RECENT_MESSAGES_LIMIT; + const limit = Number(value); + if (isNaN(limit)) throw new Error("limit must be a number"); + return limit; +} + +export function parseSort(searchParams: URLSearchParams) { + const value = searchParams.get("sort"); + if (!value) return "desc"; + if (!["asc", "desc"].includes(value)) throw new Error("sort must be asc or desc"); + return value; +} + +export function handleMessages(req: Request) { + const { searchParams} = new URL(req.url); + // const distinct = searchParams.get("distinct"); + const limit = parseLimit(searchParams); + const sort = parseSort(searchParams); + const chain = searchParams.get("chain") + const moduleHash = searchParams.get("moduleHash"); + + // // error handling + // if (distinct !== "true" && distinct !== null) return toText("distinct must be set to true if declared", 400 ); + // if (distinct === "true" && chain) return toText("chain cannot be set if distinct is set to true", 400 ); + + selectMessages(db, limit, sort, chain, moduleHash); +} + +export function insertMessages(db: Database, traceId: string, timestamp: string, chain?: string) { + const dbLength = sqlite.count(db, "messages"); + + if (dbLength >= RECENT_MESSAGES_LIMIT) { + let oldest = sqlite.selectAll(db, "messages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; + + // update messages + sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp); + sqlite.deleteRow(db, "messages", oldest.key); + + // update messagesByChain + if (chain) { + oldest = sqlite.selectAll(db, "messagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; + sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); + sqlite.deleteRow(db, "messagesByChain", `${oldest.key}`); + } + return; + } + // add messages if tables not full + sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp); + + if (chain) sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); +} + +export function selectMessages(db: Database, limit: number, sortBy: string, chain?: string, moduleHash?: string,) { + + let messages = sqlite.selectAllRecent(db, "messages", "*", sortBy, limit); + + // if (distinct) messages = selectDistinct(distinct, messages, db, chain, sortBy, limit); + if (chain) messages = sqlite.selectAllRecent(db, "messagesByChain", "*", sortBy, limit).filter((message: any) => message.value.includes(chain)); + if (moduleHash) messages = messages.filter((message: any) => message.value.includes(moduleHash)); + + return toJSON(messages); +} + +// export function selectDistinct(distinct?: string, messages?: any, db?: any, chain?: string, sortBy?: string, limit?: number) { +// let chainList = sqlite.selectAll(db, "chain"); +// if (distinct === "true") { + +// let distinctChain = []; +// for (let i = 0; i < chainList.length; i++) { +// let chainName = chainList[i].key; + +// messages = sqlite.selectAllRecent(db, "messagesByChain", "*", sortBy, limit) +// messages = messages.filter((message: any) => message.value.includes(chainName)); + +// distinctChain.push(messages[0]); +// chainList.slice(i, 1); +// } +// let result = distinctChain.filter((notNull) => notNull !== undefined) +// return result; +// } +// return; +// } diff --git a/src/http.ts b/src/http.ts deleted file mode 100644 index 1cd5af8..0000000 --- a/src/http.ts +++ /dev/null @@ -1,10 +0,0 @@ -export function toText(data: any, status = 200) { - const headers = { 'Content-Type': 'text/plain; charset=utf-8' }; - return new Response(data, { status, headers }); -} - -export function toJSON(data: any, status = 200) { - const body = JSON.stringify(data, null, 2); - const headers = { 'Content-Type': 'application/json' }; - return new Response(body, { status, headers }); -} \ No newline at end of file diff --git a/src/recentMessages.ts b/src/recentMessages.ts deleted file mode 100644 index 71a44f7..0000000 --- a/src/recentMessages.ts +++ /dev/null @@ -1,60 +0,0 @@ -import * as sqlite from "../src/sqlite.js"; -import { RECENT_MESSAGES_LIMIT } from "./config.js"; -import { toJSON, toText } from "./http.js"; - -export function recentMessages(db: any, traceId: string, timestamp: string, chain?: string) { - const dbLength = sqlite.count(db, "recentMessages"); - - if (dbLength >= RECENT_MESSAGES_LIMIT) { - let oldest = sqlite.selectAll(db, "recentMessages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; - - //update recentMessages - sqlite.replaceRecent(db, "recentMessages", String(Date.now()), `${traceId}`, timestamp); - sqlite.deleteRow(db, "recentMessages", oldest.key); - - //update recentMessagesByChain - if (chain) { - oldest = sqlite.selectAll(db, "recentMessagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; - sqlite.replaceRecent(db, "recentMessagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); - sqlite.deleteRow(db, "recentMessagesByChain", `${oldest.key}`); - } - return; - } - //add messages if tables not full - sqlite.replaceRecent(db, "recentMessages", String(Date.now()), `${traceId}`, timestamp); - - if (chain) sqlite.replaceRecent(db, "recentMessagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); - return; -} - - -export function recentMessagesEndpoint(db: any, chain?: string, moduleHash?: string, limit?: number, distinct?: string, sortBy?: string) { - - let messages = sqlite.selectAllRecent(db, "recentMessages", "*", sortBy, limit); - - if (distinct) messages = fetchDistinct(distinct, messages, db, chain, sortBy, limit); - if (chain) messages = sqlite.selectAllRecent(db, "recentMessagesByChain", "*", sortBy, limit).filter((message: any) => message.value.includes(chain)); - if (moduleHash) messages = messages.filter((message: any) => message.value.includes(moduleHash)); - - return toJSON(messages); -} - -function fetchDistinct(distinct?: string, messages?: any, db?: any, chain?: string, sortBy?: string, limit?: number) { - let chainList = sqlite.selectAll(db, "chain"); - if (distinct === "true") { - - let distinctChain = []; - for (let i = 0; i < chainList.length; i++) { - let chainName = chainList[i].key; - - messages = sqlite.selectAllRecent(db, "recentMessagesByChain", "*", sortBy, limit) - messages = messages.filter((message: any) => message.value.includes(chainName)); - - distinctChain.push(messages[0]); - chainList.slice(i, 1); - } - let result = distinctChain.filter((notNull) => notNull !== undefined) - return result; - } - return; -} diff --git a/src/sqlite.ts b/src/sqlite.ts index 1c85494..beadcb8 100644 --- a/src/sqlite.ts +++ b/src/sqlite.ts @@ -52,7 +52,6 @@ export function replaceRecent(db: Database, table: string, key: string, value: s return db.prepare(`REPLACE INTO ${table} (key, value, payload) VALUES (?, ?, ?)`).run(key, value, payload); } - export function selectAll(db: Database, table: string) { return db.query(`SELECT * FROM ${table}`).all() as KV[]; }