Skip to content

Commit

Permalink
add /missing endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 21, 2024
1 parent ef2c6ee commit 39f828b
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 1,852 deletions.
1,983 changes: 141 additions & 1,842 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink-clickhouse",
"version": "0.3.2",
"version": "0.3.3",
"description": "Substreams Clickhouse Sink",
"type": "module",
"homepage": "https://github.com/pinax-network/substreams-sink-clickhouse",
Expand Down Expand Up @@ -36,7 +36,7 @@
"openapi3-ts": "latest",
"p-queue": "latest",
"prom-client": "latest",
"substreams-sink-webhook": "^0.8.5",
"substreams-sink-webhook": "^0.9.2",
"tslog": "latest",
"zod": "latest",
"zod-to-json-schema": "latest"
Expand Down
2 changes: 0 additions & 2 deletions sql/blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,3 @@ SELECT
delta - count_distinct AS missing,
count - count_distinct AS optimize
FROM blocks
WHERE chain = {chain: String}
GROUP BY (chain, module_hash)
13 changes: 8 additions & 5 deletions sql/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ export function getModuleHash(req: Request, required = true) {

export async function blocks(req: Request) {
let query = await Bun.file(import.meta.dirname + "/blocks.sql").text()
const chain = getChain(req, true);
const chain = getChain(req, false);
const module_hash = getModuleHash(req, false);
const response = await readOnlyClient.query({ query_params: {chain}, query, format: "JSONEachRow" });
const WHERE = [];
if ( chain ) WHERE.push(`chain = ${chain}`);
if ( module_hash ) WHERE.push(`module_hash = ${module_hash}`);
if ( WHERE.length ) query += " WHERE " + WHERE.join(" AND ");
query += "GROUP BY (chain, module_hash)";

const response = await readOnlyClient.query({ query_params: {chain, module_hash}, query, format: "JSONEachRow" });
let data = await response.json() as BlockResponseSchema[];

// optional filter by param
// if ( chain ) data = data.filter((row) => row.chain === chain);
if ( module_hash ) data = data.filter((row) => row.module_hash === module_hash);
return data;
}
15 changes: 15 additions & 0 deletions sql/missing.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
SELECT
stop_block_number - 10000 as start_block_number,
t * 10000 as stop_block_number,
10000 - c as missing
FROM (
SELECT
floor(block_number / 10000) AS t,
count() AS c,
max(block_number) AS m
FROM blocks
WHERE chain = {chain: String} and module_hash = {module_hash: String}
GROUP BY t
ORDER BY t ASC WITH FILL
)
WHERE c < 10000
19 changes: 19 additions & 0 deletions sql/missing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { z } from "zod";
import { readOnlyClient } from "../src/clickhouse/createClient.js";
import { getChain, getModuleHash } from "./blocks.js";

export const MissingResponseSchema = z.object({
start_block: z.number(),
stop_block: z.number(),
missing: z.number(),
});

export type MissingResponseSchema = z.infer<typeof MissingResponseSchema>;

export async function missing(req: Request) {
let query = await Bun.file(import.meta.dirname + "/missing.sql").text()
const chain = getChain(req, true);
const module_hash = getModuleHash(req, true);
const response = await readOnlyClient.query({ query_params: {chain, module_hash}, query, format: "JSONEachRow" });
return await response.json() as MissingResponseSchema[];
}
2 changes: 2 additions & 0 deletions src/fetch/GET.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { findLatestCursor } from "../../sql/cursor.js";
import health from "./health.js";
import { openapi } from "./openapi.js";
import { cluster } from "../../sql/cluster.js";
import { missing } from "../../sql/missing.js";

export default async function (req: Request): Promise<Response> {
const { pathname } = new URL(req.url);
Expand All @@ -21,6 +22,7 @@ export default async function (req: Request): Promise<Response> {

// health
if (pathname === "/blocks") return toJSON(await blocks(req));
if (pathname === "/missing") return toJSON(await missing(req));
if (pathname === "/cluster") return toJSON(await cluster());
if (pathname === "/health") return health();
if (pathname === "/metrics") return metrics();
Expand Down
24 changes: 23 additions & 1 deletion src/fetch/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as store from "../clickhouse/stores.js";
import { BodySchema } from "../schemas.js";
import { BlockResponseSchema } from "../../sql/blocks.js";
import { ClusterSchema } from "../../sql/cluster.js";
import { MissingResponseSchema } from "../../sql/missing.js";

const zodToJsonSchema = (...params: Parameters<(typeof ztjs)["zodToJsonSchema"]>) =>
ztjs.zodToJsonSchema(...params) as SchemaObject;
Expand Down Expand Up @@ -233,7 +234,7 @@ export async function openapi() {
tags: [TAGS.HEALTH],
summary: "Gives a summary of known blocks for particular module hashes",
parameters: [
await paramChain(true),
await paramChain(false),
await paramModuleHash(false),
],
responses: {
Expand All @@ -249,6 +250,27 @@ export async function openapi() {
},
},
})
.addPath("/missing", {
get: {
tags: [TAGS.HEALTH],
summary: "Gives a summary of missing blocks ranges for a particular module hash",
parameters: [
await paramChain(true),
await paramModuleHash(true),
],
responses: {
200: {
description: "Module hash missing blocks summary",
content: {
"application/json": {
schema: zodToJsonSchema(MissingResponseSchema),
},
},
},
500: { description: "Internal server errror" },
},
},
})
.addPath("/cluster", {
get: {
tags: [TAGS.HEALTH],
Expand Down

0 comments on commit 39f828b

Please sign in to comment.