From 597a1aa276c95daee06e80ac19429681929918c2 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Fri, 29 Sep 2023 23:58:37 -0400 Subject: [PATCH] Update sqlite methods --- examples/{ => bun}/client.ts | 2 +- index.ts | 33 +++++++++++++++++---------------- src/banner.ts | 3 ++- src/health.ts | 12 ++++-------- src/http.ts | 12 ++++++++++++ src/prometheus.ts | 2 +- src/sqlite.ts | 22 +++++++++++++--------- 7 files changed, 50 insertions(+), 36 deletions(-) rename examples/{ => bun}/client.ts (90%) create mode 100644 src/http.ts diff --git a/examples/client.ts b/examples/bun/client.ts similarity index 90% rename from examples/client.ts rename to examples/bun/client.ts index e45be73..46dd304 100644 --- a/examples/client.ts +++ b/examples/bun/client.ts @@ -1,6 +1,6 @@ const socket = new WebSocket("ws://localhost:3000"); -socket.onopen = (ws) => { +socket.onopen = () => { console.log("Connected!"); socket.send("6aa24e6aa34db4a4faf55c69c6f612aeb06053c2") }; diff --git a/index.ts b/index.ts index 0223c6f..30d334b 100644 --- a/index.ts +++ b/index.ts @@ -3,17 +3,19 @@ import { Database } from "bun:sqlite"; import { PORT, PUBLIC_KEY } from "./src/config.js"; import { verify } from "./src/verify.js"; import { banner } from "./src/banner.js"; -import { insert, select } from "./src/sqlite.js"; +import * as sqlite from "./src/sqlite.js"; import * as prometheus from "./src/prometheus.js"; import { checkHealth } from "./src/health.js"; +import { toJSON } from "./src/http.js"; console.log(`Server listening on PORT http://localhost:${PORT}`); console.log("Verifying with PUBLIC_KEY", PUBLIC_KEY); -// internal memory of moduleHashes (used as WebSocket channels) +const moduleHashes = new Set(); // TO-DO: replace using SQLite DB -const db = new Database("./sqlite/moduleHashes.sqlite"); -const traces = new Database("./sqlite/sessionId.sqlite"); -const moduleHashes = new Set(); +// Create SQLite DB +const db = new Database("./db.sqlite", {create: true}); // TO-DO as .env variable +sqlite.create(db, "moduleHash"); +sqlite.create(db, "traceId"); Bun.serve<{key: string}>({ port: PORT, @@ -31,9 +33,10 @@ Bun.serve<{key: string}>({ if ( req.method == "GET" ) { const { pathname } = new URL(req.url); if ( pathname === "/") return new Response(banner()) - if ( pathname === "/health") return new Response(JSON.stringify((await checkHealth()).data), Object(await checkHealth()).status); + if ( pathname === "/health") return toJSON(await checkHealth(db)); if ( pathname === "/metrics") return new Response(await prometheus.registry.metrics()); - if ( pathname === "/subscribe") return new Response(JSON.stringify(select(db, "moduleHash")), { status: 200 }); + if ( pathname === "/moduleHash") return toJSON(sqlite.findAll(db, "moduleHash")); + if ( pathname === "/traceId") return toJSON(sqlite.findAll(db, "traceId")); return new Response("Not found", { status: 400 }); } @@ -65,24 +68,22 @@ Bun.serve<{key: string}>({ console.log('server.publish', {response, message}); return new Response("OK"); } - const { clock, manifest, traceId } = JSON.parse(body); + // Get data from Substreams metadata + const { clock, manifest } = JSON.parse(body); const { moduleHash } = manifest; const bytes = Buffer.byteLength(body + moduleHash, 'utf8') const response = server.publish(moduleHash, body); - const name: any = `webhook_hash_${moduleHash}`; - const help = `Individual webhook session`; + // Prometheus Metrics prometheus.bytesPublished.inc(bytes); prometheus.publishedMessages.inc(1); prometheus.customMetric(moduleHash) moduleHashes.add(moduleHash); - //Insert moduleHash into SQLite DB - - const temp = "654b2e1fd43e8468863595baaad68627" - - insert(db, moduleHash, "moduleHash") - insert (traces, temp, "sessionId")//when ready, replace temp with traceId.sessionId + // Insert moduleHash into SQLite DB + const traceId = "654b2e1fd43e8468863595baaad68627"; // TO-DO: get traceId from Substreams metadata + sqlite.insert(db, "moduleHash", moduleHash, timestamp); + sqlite.insert(db, "traceId", traceId, timestamp); console.log('server.publish', {response, block: clock.number, timestamp: clock.timestamp, moduleHash}); diff --git a/src/banner.ts b/src/banner.ts index bf0cd73..e8d52d6 100644 --- a/src/banner.ts +++ b/src/banner.ts @@ -19,7 +19,8 @@ export function banner() { HTTP GET /health /metrics - /subscribe + /moduleHash + /traceId HTTP POST (Ed25519 signature) / {timestamp, signature, body} diff --git a/src/health.ts b/src/health.ts index 3f9871c..dfc4615 100644 --- a/src/health.ts +++ b/src/health.ts @@ -1,13 +1,9 @@ -import * as prometheus from "./prometheus.js"; import { Database } from "bun:sqlite"; -import { select } from "./sqlite.js"; +import * as sqlite from "./sqlite.js"; -export async function checkHealth(){ - const traces = new Database("./sqlite/sessionId.sqlite"); - const session = select(traces, "sessionId").length - const messages = await prometheus.getSingleMetric(`total_webhooks_sessions`) - - if (messages || session) return {data: "OK", status: { status: 200 }}; +export async function checkHealth(db: Database) { + const total_trace_id = sqlite.findAll(db, "traceId").length; + if (total_trace_id) return {data: "OK", status: { status: 200 }}; return {data: "Error: No connected webhooks", status: { status: 400 }}; }; \ No newline at end of file diff --git a/src/http.ts b/src/http.ts new file mode 100644 index 0000000..759a76d --- /dev/null +++ b/src/http.ts @@ -0,0 +1,12 @@ +import type { IncomingMessage, ServerResponse } from "http"; + +export function toText(data: any, status = 200) { + const headers = { 'Content-Type': 'text/plain; charset=utf-8' }; + return new Response(data, { status, headers }); +} + +export function toJSON(data: any, status = 200) { + const body = JSON.stringify(data, null, 2); + const headers = { 'Content-Type': 'application/json' }; + return new Response(body, { status, headers }); +} \ No newline at end of file diff --git a/src/prometheus.ts b/src/prometheus.ts index ccad38d..f654392 100644 --- a/src/prometheus.ts +++ b/src/prometheus.ts @@ -23,7 +23,7 @@ function registerGauge(name: string, help: string) { } export async function customMetric(moduleHash: string) { - const name: any = `webhook_hash_${moduleHash}`; + const name = `webhook_hash_${moduleHash}`; const help = `Individual webhook session`; try { const gauge = new client.Gauge({ name, help }); diff --git a/src/sqlite.ts b/src/sqlite.ts index 7e4ff77..c1a8914 100644 --- a/src/sqlite.ts +++ b/src/sqlite.ts @@ -1,13 +1,17 @@ -function create(db: any, column: string){ - db.query(`create table if not exists data (${column} text primary key)`).run(); +import Database from "bun:sqlite"; + +export function create(db: Database, table: string) { + return db.query(`CREATE TABLE IF NOT EXISTS ${table} (key TEXT PRIMARY KEY, value TEXT)`).run(); +} + +export function insert(db: Database, table: string, key: string, value: string|number) { + return db.query(`INSERT OR REPLACE INTO ${table} (key, value) values (?, ?)`).all(key, value); } -export function insert(db: any, data: string, column: string){ - create(db, column) - return db.query(`insert or ignore into data (${column}) values (?)`).all(data); +export function findAll(db: Database, table: string) { + return db.query(`SELECT * from ${table}`).all(); } -export function select(db: any, column: string){ - create(db, column) - return db.query(`select ${column} from data`).all(); -} \ No newline at end of file +export function find(db: Database, table: string, key: string) { + return db.query(`SELECT * from ${table} WHERE key=${key}`).all(); +}