Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelpapineau committed Nov 24, 2023
2 parents 3c52f75 + a31ea40 commit 3141631
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 129 deletions.
37 changes: 0 additions & 37 deletions src/banner.ts

This file was deleted.

21 changes: 3 additions & 18 deletions src/fetch/GET.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { checkHealth } from "../health.js";
import * as prometheus from "../prometheus.js";
import * as sqlite from "../sqlite.js";
import { db } from "../../index.js";
Expand All @@ -8,8 +7,8 @@ import openapi from "./openapi.js";
import swaggerHtml from "../../swagger/index.html"
import swaggerFavicon from "../../swagger/favicon.png"
import { toFile, toJSON, toText } from "./cors.js";
import { recentMessagesEndpoint } from "../recentMessages.js";
import { DEFAULT_RECENT_MESSAGES_LIMIT } from "../config.js";
import { handleMessages } from "./messages.js";
import { checkHealth } from "./health.js";

export default async function (req: Request, server: Server) {
const { pathname, searchParams} = new URL(req.url);
Expand All @@ -19,21 +18,7 @@ export default async function (req: Request, server: Server) {
const key = req.headers.get("sec-websocket-key")
const chain = searchParams.get("chain")
const moduleHash = searchParams.get("moduleHash");
const distinct = searchParams.get("distinct");
const success = server.upgrade(req, {data: {key, chain, moduleHash}});
let limit = Number(searchParams.get("limit"));
let sort = searchParams.get("sort");

//error handling

if (limit === null || limit === 0) limit = DEFAULT_RECENT_MESSAGES_LIMIT;
if (isNaN(Number(limit))) return toText("limit must be a number", 400 );
if (sort === null) sort = "desc";
console.log(distinct)
if (distinct !== "true" && distinct !== null) return toText("distinct must be set to true if declared", 400 );
if (distinct === "true" && chain) return toText("chain cannot be set if distinct is set to true", 400 );
if (sort !== "asc" && sort !== "desc") return toText("sort must be asc or desc", 400 );

if (success) {
logger.info('upgrade', {key, chain, moduleHash});
return;
Expand All @@ -48,7 +33,7 @@ export default async function (req: Request, server: Server) {
if ( pathname === "/traceId") return toJSON(sqlite.selectAll(db, "traceId"));
if ( pathname === "/chain") return toJSON(sqlite.selectAll(db, "chain"));
if ( pathname === "/openapi") return toJSON(openapi);
if ( pathname === "/recentMessages") return recentMessagesEndpoint(db, chain, moduleHash, limit, distinct, sort);
if ( pathname === "/messages") return handleMessages(req);

return toText("Not found", 400 );
}
4 changes: 2 additions & 2 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { verify } from "../verify.js";
import { PUBLIC_KEY } from "../config.js";
import { Server } from "bun";
import { toText } from "./cors.js";
import { recentMessages } from "../recentMessages.js";
import { insertMessages } from "./messages.js";

export default async function (req: Request, server: Server) {
// get headers and body from POST request
Expand Down Expand Up @@ -87,7 +87,7 @@ export default async function (req: Request, server: Server) {
sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp);
//Set timestamp as key to filter recent messages

recentMessages( db, traceId, JSON.stringify(json), chain );
insertMessages( db, traceId, JSON.stringify(json), chain );

return toText("OK");
}
2 changes: 1 addition & 1 deletion src/health.ts → src/fetch/health.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { toText } from "./fetch/cors.js";
import { toText } from "./cors.js";

// https://github.com/pinax-network/substreams-sink-websockets/issues/2#issuecomment-1746121519
export function checkHealth() {
Expand Down
90 changes: 90 additions & 0 deletions src/fetch/messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import * as sqlite from "../sqlite.js";
import { DEFAULT_RECENT_MESSAGES_LIMIT, RECENT_MESSAGES_LIMIT } from "../config.js";
import Database from "bun:sqlite";
import { db } from "../../index.js";
import { toJSON } from "./cors.js";

export function parseLimit(searchParams: URLSearchParams) {
const value = searchParams.get("limit");
if (!value) return DEFAULT_RECENT_MESSAGES_LIMIT;
const limit = Number(value);
if (isNaN(limit)) throw new Error("limit must be a number");
return limit;
}

export function parseSort(searchParams: URLSearchParams) {
const value = searchParams.get("sort");
if (!value) return "desc";
if (!["asc", "desc"].includes(value)) throw new Error("sort must be asc or desc");
return value;
}

export function handleMessages(req: Request) {
const { searchParams} = new URL(req.url);
// const distinct = searchParams.get("distinct");
const limit = parseLimit(searchParams);
const sort = parseSort(searchParams);
const chain = searchParams.get("chain")
const moduleHash = searchParams.get("moduleHash");

// // error handling
// if (distinct !== "true" && distinct !== null) return toText("distinct must be set to true if declared", 400 );
// if (distinct === "true" && chain) return toText("chain cannot be set if distinct is set to true", 400 );

selectMessages(db, limit, sort, chain, moduleHash);
}

export function insertMessages(db: Database, traceId: string, timestamp: string, chain?: string) {
const dbLength = sqlite.count(db, "messages");

if (dbLength >= RECENT_MESSAGES_LIMIT) {
let oldest = sqlite.selectAll(db, "messages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0];

// update messages
sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp);
sqlite.deleteRow(db, "messages", oldest.key);

// update messagesByChain
if (chain) {
oldest = sqlite.selectAll(db, "messagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0];
sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp );
sqlite.deleteRow(db, "messagesByChain", `${oldest.key}`);
}
return;
}
// add messages if tables not full
sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp);

if (chain) sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp );
}

export function selectMessages(db: Database, limit: number, sortBy: string, chain?: string, moduleHash?: string,) {

let messages = sqlite.selectAllRecent(db, "messages", "*", sortBy, limit);

// if (distinct) messages = selectDistinct(distinct, messages, db, chain, sortBy, limit);
if (chain) messages = sqlite.selectAllRecent(db, "messagesByChain", "*", sortBy, limit).filter((message: any) => message.value.includes(chain));
if (moduleHash) messages = messages.filter((message: any) => message.value.includes(moduleHash));

return toJSON(messages);
}

// export function selectDistinct(distinct?: string, messages?: any, db?: any, chain?: string, sortBy?: string, limit?: number) {
// let chainList = sqlite.selectAll(db, "chain");
// if (distinct === "true") {

// let distinctChain = [];
// for (let i = 0; i < chainList.length; i++) {
// let chainName = chainList[i].key;

// messages = sqlite.selectAllRecent(db, "messagesByChain", "*", sortBy, limit)
// messages = messages.filter((message: any) => message.value.includes(chainName));

// distinctChain.push(messages[0]);
// chainList.slice(i, 1);
// }
// let result = distinctChain.filter((notNull) => notNull !== undefined)
// return result;
// }
// return;
// }
10 changes: 0 additions & 10 deletions src/http.ts

This file was deleted.

60 changes: 0 additions & 60 deletions src/recentMessages.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ export function replaceRecent(db: Database, table: string, key: string, value: s
return db.prepare(`REPLACE INTO ${table} (key, value, payload) VALUES (?, ?, ?)`).run(key, value, payload);
}


export function selectAll(db: Database, table: string) {
return db.query(`SELECT * FROM ${table}`).all() as KV[];
}
Expand Down

0 comments on commit 3141631

Please sign in to comment.