Skip to content

Commit

Permalink
Merge pull request #109 from pinax-network/improve-queries
Browse files Browse the repository at this point in the history
Add new health queries
  • Loading branch information
DenisCarriere authored Feb 17, 2024
2 parents 711b647 + 776116e commit f53f5fd
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/clickhouse/table-initialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { logger } from "../logger.js";
import { Err, Ok, Result } from "../result.js";
import client from "./createClient.js";
import { augmentCreateTableStatement, getTableName, isCreateTableStatement } from "./table-utils.js";
import tables from "./tables/index.js";
import tables from "./tables/tables.js";

export async function initializeDefaultTables(): Promise<Result> {
const promiseResults = await Promise.allSettled(
Expand Down
3 changes: 2 additions & 1 deletion src/clickhouse/tables/blocks.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-- blocks --
CREATE TABLE IF NOT EXISTS blocks (
block_id FixedString(64),
block_number UInt32(),
Expand All @@ -6,4 +7,4 @@ CREATE TABLE IF NOT EXISTS blocks (
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (block_id)
ORDER BY (block_id, chain, block_number, timestamp);
ORDER BY (block_id, chain, block_number, timestamp);
22 changes: 22 additions & 0 deletions src/clickhouse/tables/blocks_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- view --
CREATE MATERIALIZED VIEW IF NOT EXISTS blocks_mv
ENGINE = MergeTree
ORDER BY (chain, timestamp, block_number)
AS SELECT
block_id,
block_number,
chain,
timestamp
FROM blocks;

-- DROP TABLE IF EXISTS blocks_mv;

-- OPTIMIZE TABLE blocks_mv FINAL;

-- -- insert --
-- INSERT INTO blocks_mv SELECT
-- block_id,
-- block_number,
-- chain,
-- timestamp
-- FROM blocks;
19 changes: 0 additions & 19 deletions src/clickhouse/tables/index.ts

This file was deleted.

14 changes: 14 additions & 0 deletions src/clickhouse/tables/tables.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { join } from "path";

function file(path: string) {
return Bun.file(join(import.meta.dirname, path)).text();
}

const glob = new Bun.Glob("**/*.sql");
const tables: [string, string][] = [];

for (const path of glob.scanSync(import.meta.dirname)) {
tables.push([path.replace(".sql", ""), await file(path)])
}

export default tables;
28 changes: 20 additions & 8 deletions src/fetch/GET.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,33 @@ import swaggerFavicon from "../../swagger/favicon.png";
import swaggerHtml from "../../swagger/index.html";
import { metrics } from "../prometheus.js";
import { blocks } from "./blocks.js";
import { NotFound, toFile, toJSON } from "./cors.js";
import { NotFound, toFile, toJSON, toText } from "./cors.js";
import { findLatestCursor } from "./cursor.js";
import health from "./health.js";
import { openapi } from "./openapi.js";
import { cluster } from "./cluster.js";

export default async function (req: Request) {
const { pathname } = new URL(req.url);

if (pathname === "/") return toFile(file(swaggerHtml));
if (pathname === "/favicon.png") return toFile(file(swaggerFavicon));
if (pathname === "/health") return health();
if (pathname === "/metrics") return metrics();
if (pathname === "/openapi") return toJSON(await openapi());
if (pathname === "/blocks") return blocks();
if (pathname === "/cursor/latest") return findLatestCursor(req);
try {
if (pathname === "/") return toFile(file(swaggerHtml));
if (pathname === "/favicon.png") return toFile(file(swaggerFavicon));

// queries
if (pathname === "/cursor/latest") return findLatestCursor(req);

// health
if (pathname === "/blocks") return toJSON(await blocks(req));
if (pathname === "/cluster") return toJSON(await cluster());
if (pathname === "/health") return health();
if (pathname === "/metrics") return metrics();

// docs
if (pathname === "/openapi") return toJSON(await openapi());
} catch (e) {
return toText(String(e), 500);
}

return NotFound;
}
1 change: 1 addition & 0 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { query } from "./query.js";
export default async function (req: Request) {
const { pathname } = new URL(req.url);

// queries
if (pathname === "/query") return query(req);
if (pathname === "/hash") return hash(req);

Expand Down
10 changes: 10 additions & 0 deletions src/fetch/blocks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SELECT
chain,
COUNT() AS count,
COUNTDISTINCT(block_number) AS count_distinct,
MAX(block_number) AS max,
MIN(block_number) AS min,
max - min + 1 AS delta,
delta - count_distinct AS missing
FROM blocks
GROUP BY chain
51 changes: 16 additions & 35 deletions src/fetch/blocks.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
import { z } from "zod";
import client from "../clickhouse/createClient.js";
import { logger } from "../logger.js";
import { InternalServerError, toJSON } from "./cors.js";

type BlockViewType = Array<{
count: string;
count_distinct: string;
max: number;
min: number;
}>;

export const BlockResponseSchema = z.object({
chain: z.string(),
count: z.number(),
distinctCount: z.number(),
min: z.number(),
Expand All @@ -20,32 +12,21 @@ export const BlockResponseSchema = z.object({
});
export type BlockResponseSchema = z.infer<typeof BlockResponseSchema>;

export async function blocks(): Promise<Response> {
const query = `
SELECT
COUNT() AS count,
COUNTDISTINCT(block_number) AS count_distinct,
MAX(block_number) AS max,
MIN(block_number) AS min
FROM blocks
`;

try {
const response = await client.query({ query, format: "JSONEachRow" });
const data = await response.json<BlockViewType>();

const distinctCount = parseInt(data[0].count_distinct);
const max = data[0].max;
const min = data[0].min;
const delta = max - min;
const missing = max - min - distinctCount;
const count = parseInt(data[0].count);
export function getChain(req: Request, required = true) {
const url = new URL(req.url);
const chain = url.searchParams.get("chain");

const dto: BlockResponseSchema = { max, min, distinctCount, delta, missing, count };

return toJSON(dto);
} catch (err) {
logger.error('[blocks]', err);
return InternalServerError;
if (required && !chain) {
throw Error("Missing parameter: chain");
}
return chain;
}

export async function blocks(req: Request) {
const query = await Bun.file(import.meta.dirname + "/blocks.sql").text()
const chain = getChain(req, false);
const response = await client.query({ query, format: "JSONEachRow" });
let data = await response.json() as BlockResponseSchema[];
if ( chain ) data = data.filter((row) => row.chain === chain);
return data;
}
14 changes: 14 additions & 0 deletions src/fetch/cluster.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
SELECT
table,
sum(rows) AS rows,
max(modification_time) AS latest_modification,
formatReadableSize(sum(bytes)) AS data_size,
formatReadableSize(sum(primary_key_bytes_in_memory)) AS primary_keys_size,
any(engine) AS engine,
sum(bytes) AS bytes_size
FROM clusterAllReplicas(default, system.parts)
WHERE active
GROUP BY
database,
table
ORDER BY bytes_size DESC
18 changes: 18 additions & 0 deletions src/fetch/cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { z } from "zod";
import client from "../clickhouse/createClient.js";

export const ClusterSchema = z.object({
count: z.number(),
distinctCount: z.number(),
min: z.number(),
max: z.number(),
delta: z.number(),
missing: z.number(),
});
export type ClusterSchema = z.infer<typeof ClusterSchema>;

export async function cluster() {
const query = await Bun.file(import.meta.dirname + "/cluster.sql").text()
const response = await client.query({ query, format: "JSONEachRow" });
return response.json();
}
54 changes: 44 additions & 10 deletions src/fetch/openapi.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import pkg from "../../package.json" assert { type: "json" };

import { LicenseObject } from "openapi3-ts/oas30";
import { OpenApiBuilder, ResponsesObject, SchemaObject } from "openapi3-ts/oas31";
import { OpenApiBuilder, ParameterObject, ResponsesObject, SchemaObject } from "openapi3-ts/oas31";
import { z } from "zod";
import * as ztjs from "zod-to-json-schema";
import { store } from "../clickhouse/stores.js";
import { BodySchema } from "../schemas.js";
import { BlockResponseSchema } from "./blocks.js";
import { ClusterSchema } from "./cluster.js";

const zodToJsonSchema = (...params: Parameters<(typeof ztjs)["zodToJsonSchema"]>) =>
ztjs.zodToJsonSchema(...params) as SchemaObject;

const TAGS = {
QUERIES: "Queries",
USAGE: "Usage",
MAINTENANCE: "Maintenance",
QUERIES: "Queries",
HEALTH: "Health",
DOCS: "Documentation",
} as const;
Expand All @@ -36,6 +37,24 @@ const PUT_RESPONSES: ResponsesObject = {

const ExecuteSchemaResponse = z.object({ success: z.literal("OK"), schema: z.string() });

async function paramChain(required = true): Promise<ParameterObject> {
return {
name: "chain",
in: "query",
required,
schema: { enum: await store.chains },
};
}

async function paramModuleHash(required = true): Promise<ParameterObject> {
return {
name: "module_hash",
in: "query",
required: true,
schema: { enum: await store.moduleHashes },
};
}

export async function openapi() {
return new OpenApiBuilder()
.addInfo({
Expand Down Expand Up @@ -205,13 +224,8 @@ export async function openapi() {
tags: [TAGS.QUERIES],
summary: "Finds the latest cursor for a given chain and table",
parameters: [
{ name: "chain", in: "query", required: true, schema: { enum: await store.chains } },
{
name: "module_hash",
in: "query",
required: true,
schema: { enum: await store.moduleHashes },
},
await paramChain(true),
await paramModuleHash(true)
],
responses: {
200: {
Expand All @@ -233,8 +247,11 @@ export async function openapi() {
})
.addPath("/blocks", {
get: {
tags: [TAGS.QUERIES],
tags: [TAGS.HEALTH],
summary: "Gives a summary of known blocks, including min, max and unique block numbers",
parameters: [
await paramChain(false)
],
responses: {
200: {
description: "Block number summary",
Expand All @@ -248,6 +265,23 @@ export async function openapi() {
},
},
})
.addPath("/cluster", {
get: {
tags: [TAGS.HEALTH],
summary: "Global overview of your cluster",
responses: {
200: {
description: "Global overview of your cluster",
content: {
"application/json": {
schema: zodToJsonSchema(ClusterSchema),
},
},
},
500: { description: "Internal server errror" },
},
},
})
.addPath("/query", {
post: {
tags: [TAGS.QUERIES],
Expand Down

0 comments on commit f53f5fd

Please sign in to comment.