Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered insertions #61

Merged
merged 17 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ Options:
--password <string> Password associated with the specified username (default: "", env: PASSWORD)
--async-insert <number> https://clickhouse.com/docs/en/operations/settings/settings#async-insert (choices: "0", "1", default: 1, env: ASYNC_INSERT)
--wait-for-insert <boolean> https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert (choices: "0", "1", default: 0, env: WAIT_FOR_INSERT)
--queue-limit <number> Insert delay to each response when the pqueue exceeds this value (default: 10, env: QUEUE_LIMIT)
--queue-concurrency <number> https://github.com/sindresorhus/p-queue#concurrency (default: 10, env: QUEUE_CONCURRENCY)
--max-buffer-size <number> Maximum insertion batch size (default: 10_000, env: MAX_BUFFER_SIZE)
--insertion-delay <number> Delay between batch insertions (in ms) (default: 2000, env: INSERTION_DELAY)
-h, --help display help for command
```

Expand All @@ -119,8 +119,8 @@ USERNAME=default
PASSWORD=

# Sink
QUEUE_LIMIT=10
QUEUE_CONCURRENCY=10
MAX_BUFFER_SIZE=10000
INSERTION_DELAY=2000
VERBOSE=true
```

Expand Down
Binary file modified bun.lockb
Binary file not shown.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"dotenv": "latest",
"graphql": "^16.8.1",
"openapi3-ts": "latest",
"p-queue": "latest",
"p-queue": "^7.4.1",
"prom-client": "latest",
"tslog": "latest",
"tweetnacl": "latest",
Expand Down
176 changes: 114 additions & 62 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,117 +6,164 @@ import { logger } from "../logger.js";
import * as prometheus from "../prometheus.js";
import { Clock, Manifest, PayloadBody } from "../schemas.js";
import client from "./createClient.js";
const { setTimeout } = require("timers/promises");

let timeLimitReached = true;
const queue = new PQueue({ concurrency: 2 });

// TO-DO: moves these to a separate file `src/clickhouse/stores.ts`
const knownModuleHashes = new Set<string>();
const knownBlockId = new Set<string>();
const knownBlockIdFinal = new Set<string>();
const knownTables = new Map<string, boolean>();
const queue = new PQueue({ concurrency: config.queueConcurrency });

let insertions: Record<
"moduleHashes" | "finalBlocks" | "blocks" | "cursors",
Array<Record<string, unknown>>
> & { entityChanges: Record<string, unknown[]> } = {
entityChanges: {},
moduleHashes: [],
finalBlocks: [],
cursors: [],
blocks: [],
};

export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
prometheus.sink_requests?.inc();
const { manifest, clock, cursor } = metadata;
// Indexes
handleModuleHashes(queue, manifest);
handleBlocks(queue, manifest, clock);
handleFinalBlocks(queue, manifest, clock);
handleCursors(queue, manifest, clock, cursor);
handleModuleHashes(manifest);
handleBlocks(manifest, clock);
handleFinalBlocks(manifest, clock);
handleCursors(manifest, clock, cursor);

// EntityChanges
for (const change of data.entityChanges) {
handleEntityChange(queue, change, metadata);
handleEntityChange(change, metadata);
}

if (batchSizeLimitReached()) {
// Wait for the next insertion window
await queue.onIdle();
}

// Prevent queue from growing too large
prometheus.queue_size.set(queue.size);
if (queue.size > config.queueLimit) await setTimeout(1000);
if (timeLimitReached) {
// If the previous batch is not fully inserted, wait for it to be.
await queue.onIdle();

const { moduleHashes, finalBlocks, blocks, cursors, entityChanges } = insertions;
insertions = {
entityChanges: {},
moduleHashes: [],
finalBlocks: [],
cursors: [],
blocks: [],
};

// Plan the next insertion in `config.insertionDelay` ms
timeLimitReached = false;
queue
.add(() => new Promise((resolve) => setTimeout(resolve, config.insertionDelay)))
.then(() => (timeLimitReached = true));

// Start an async job to insert every record stored in the current batch.
// This job will be awaited before starting the next batch.
queue.add(async () => {
if (moduleHashes.length > 0) {
await client.insert({
values: moduleHashes,
table: "module_hashes",
format: "JSONEachRow",
});
}

if (finalBlocks.length > 0) {
await client.insert({
values: finalBlocks,
table: "final_blocks",
format: "JSONEachRow",
});
}

if (blocks.length > 0) {
await client.insert({ values: blocks, table: "blocks", format: "JSONEachRow" });
}

if (cursors.length > 0) {
await client.insert({
values: cursors,
table: "cursors",
format: "JSONEachRow",
});
}

if (Object.keys(entityChanges).length > 0) {
for (const [table, values] of Object.entries(entityChanges)) {
if (values.length > 0) {
await client.insert({ table, values, format: "JSONStringsEachRow" });
}
}
}
});
}

logger.info(`handleSinkRequest | entityChanges=${data.entityChanges.length},queue.size=${queue.size}`);
logger.info(`handleSinkRequest | entityChanges=${data.entityChanges.length}`);
return new Response("OK");
}

function batchSizeLimitReached() {
return (
insertions.moduleHashes.length >= config.maxBufferSize ||
insertions.finalBlocks.length >= config.maxBufferSize ||
insertions.blocks.length >= config.maxBufferSize
);
}

// Module Hashes index
function handleModuleHashes(queue: PQueue, manifest: Manifest) {
function handleModuleHashes(manifest: Manifest) {
const { moduleHash, type, moduleName, chain } = manifest;
const moduleHashKey = `${moduleHash}-${chain}`;

if (!knownModuleHashes.has(moduleHashKey)) {
queue.add(() =>
client.insert({
values: {
module_hash: moduleHash,
chain,
type,
module_name: moduleName,
},
table: "module_hashes",
format: "JSONEachRow",
})
);
insertions.moduleHashes.push({ module_hash: moduleHash, chain, type, module_name: moduleName });
knownModuleHashes.add(moduleHashKey);
}
}

// Final Block Index
function handleFinalBlocks(queue: PQueue, manifest: Manifest, clock: Clock) {
function handleFinalBlocks(manifest: Manifest, clock: Clock) {
const block_id = clock.id;
const finalBlockOnly = manifest.finalBlockOnly === "true";
if (!finalBlockOnly) return; // Only insert final blocks

if (!knownBlockIdFinal.has(block_id)) {
queue.add(() =>
client.insert({
values: { block_id },
table: "final_blocks",
format: "JSONEachRow",
})
);
insertions.finalBlocks.push({ block_id });
knownBlockIdFinal.add(block_id);
}
}

// Block Index
function handleBlocks(queue: PQueue, manifest: Manifest, clock: Clock) {
function handleBlocks(manifest: Manifest, clock: Clock) {
const block_id = clock.id;
const block_number = clock.number;
const timestamp = Number(new Date(clock.timestamp));
const chain = manifest.chain;

if (!knownBlockId.has(block_id)) {
queue.add(() =>
client.insert({
values: {
block_id,
block_number,
chain,
timestamp,
},
table: "blocks",
format: "JSONEachRow",
})
);
insertions.blocks.push({ block_id, block_number, chain, timestamp });
knownBlockId.add(block_id);
}
}

function handleCursors(queue: PQueue, manifest: Manifest, clock: Clock, cursor: string) {
queue.add(() =>
client.insert({
values: {
cursor,
block_id: clock.id,
chain: manifest.chain,
module_hash: manifest.moduleHash,
},
table: "cursors",
format: "JSONEachRow",
})
);
function handleCursors(manifest: Manifest, clock: Clock, cursor: string) {
insertions.cursors.push({
cursor,
block_id: clock.id,
chain: manifest.chain,
module_hash: manifest.moduleHash,
});
}

async function handleEntityChange(
queue: PQueue,
change: EntityChange,
metadata: { clock: Clock; manifest: Manifest }
) {
Expand All @@ -133,7 +180,7 @@ async function handleEntityChange(

switch (change.operation) {
case "OPERATION_CREATE":
return insertEntityChange(queue, table, values, { ...metadata, id: change.id });
return insertEntityChange(table, values, { ...metadata, id: change.id });

case "OPERATION_UPDATE":
return updateEntityChange();
Expand All @@ -148,15 +195,20 @@ async function handleEntityChange(
}
}

function insertEntityChange(queue: PQueue, table: string, values: Record<string, unknown>, metadata: { id: string; clock: Clock; manifest: Manifest }) {
function insertEntityChange(
table: string,
values: Record<string, unknown>,
metadata: { id: string; clock: Clock; manifest: Manifest }
) {
// EntityChange
values["id"] = metadata.id; // Entity ID
values["block_id"] = metadata.clock.id; // Block Index
values["module_hash"] = metadata.manifest.moduleHash; // ModuleHash Index
values["chain"] = metadata.manifest.chain; // Chain Index

prometheus.entity_changes_inserted.inc();
return queue.add(() => client.insert({ values, table, format: "JSONStringsEachRow" }));
insertions.entityChanges[table] ??= [];
insertions.entityChanges[table].push(values);
}

// TODO: implement function
Expand Down
14 changes: 6 additions & 8 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ import { ConfigSchema } from "./schemas.js";
// Defaults
export const DEFAULT_PORT = "3000";
export const DEFAULT_VERBOSE = "true";
export const DEFAULT_HOSTNAME = "0.0.0.0"
export const DEFAULT_HOSTNAME = "0.0.0.0";
export const DEFAULT_HOST = "http://localhost:8123";
export const DEFAULT_DATABASE = "default";
export const DEFAULT_USERNAME = "default";
export const DEFAULT_PASSWORD = "";
export const DEFAULT_ASYNC_INSERT = 1;
export const DEFAULT_WAIT_FOR_ASYNC_INSERT = 0;
export const DEFAULT_QUEUE_LIMIT = 10;
export const DEFAULT_QUEUE_CONCURRENCY = 10;
export const DEFAULT_SCHEMA_URL = "./schema.sql";
export const DEFAULT_MAX_BUFFER_SIZE = 10_000;
export const DEFAULT_INSERTION_DELAY = 2000;
export const APP_NAME = name;

export const opts = program
Expand All @@ -35,11 +34,10 @@ export const opts = program
.addOption(new Option("--database <string>", "The database to use inside ClickHouse").env("DATABASE").default(DEFAULT_DATABASE))
.addOption(new Option("--async-insert <number>", "https://clickhouse.com/docs/en/operations/settings/settings#async-insert").choices(["0", "1"]).env("ASYNC_INSERT").default(DEFAULT_ASYNC_INSERT))
.addOption(new Option("--wait-for-async-insert <boolean>", "https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert").choices(["0", "1"]).env("WAIT_FOR_INSERT").default(DEFAULT_WAIT_FOR_ASYNC_INSERT))
.addOption(new Option("--queue-limit <number>","Insert delay to each response when the pqueue exceeds this value").env("QUEUE_LIMIT").default(DEFAULT_QUEUE_LIMIT))
.addOption(new Option("--queue-concurrency <number>","https://github.com/sindresorhus/p-queue#concurrency").env("QUEUE_CONCURRENCY").default(DEFAULT_QUEUE_CONCURRENCY))
.addOption(new Option("--max-buffer-size <number>", "Maximum insertion batch size").env("MAX_BUFFER_SIZE").default(DEFAULT_MAX_BUFFER_SIZE))
.addOption(new Option("--insertion-delay <number>", "Delay between batch insertions (in ms)").env("INSERTION_DELAY").default(DEFAULT_INSERTION_DELAY))
.parse()
.opts();


// Validate Commander argument & .env options
export const config = ConfigSchema.parse(opts);
export const config = ConfigSchema.parse(opts);
6 changes: 3 additions & 3 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { handleSinkRequest } from "../clickhouse/handleSinkRequest.js";
import { logger } from "../logger.js";
import { sink_request_errors, sink_requests } from "../prometheus.js";
import * as prometheus from "../prometheus.js";
import { BodySchema } from "../schemas.js";
import signatureEd25519 from "../webhook/signatureEd25519.js";
import { toText } from "./cors.js";
Expand All @@ -20,7 +20,7 @@ export default async function (req: Request) {

// parse POST body payload
try {
sink_requests?.inc();
prometheus.requests.inc();
const body = BodySchema.parse(JSON.parse(text));

if ("message" in body) {
Expand All @@ -31,7 +31,7 @@ export default async function (req: Request) {
return handleSinkRequest(body);
} catch (err) {
logger.error(err);
sink_request_errors?.inc();
prometheus.request_errors?.inc();
return toText("invalid request", 400);
}
}
7 changes: 3 additions & 4 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ export function registerGauge(name: string, help: string) {
} catch (error) {}
}

// TO-DO: Add Prometheus metrics
// https://github.com/pinax-network/substreams-sink-clickhouse/issues/26
export const sink_requests = registerCounter("sink_requests", "Total requests")!;
export const sink_request_errors = registerCounter("sink_request_errors", "Total failed requests")!;
export const requests = registerCounter("requests", "Total requests")!
export const request_errors = registerCounter("request_errors", "Total failed requests")!;
export const sink_requests = registerCounter("sink_requests", "Total sink requests")!;
export const queue_size = registerGauge("queue_size", "Amount of promises being processed")!;

export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes")!;
Expand Down
11 changes: 6 additions & 5 deletions src/schemas.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,23 @@ const config = ConfigSchema.parse({
database: "default",
username: "default",
password: "",
queueLimit: "10",
queueConcurrency: "10",
verbose: "true",
waitForAsyncInsert: "0",
asyncInsert: "1",
createDatabase: "false",
maxBufferSize: "10000",
insertionDelay: "2000",
});

describe("ConfigSchema", () => {
test("verbose", () => expect(config.verbose).toBe(true));
test("port", () => expect(config.port).toBe(3000));
test("queueLimit", () => expect(config.queueLimit).toBe(10));
test("verbose", () => expect(config.verbose).toBe(true));
test("database", () => expect(config.database).toBe("default"));
test("username", () => expect(config.username).toBe("default"));
test("publicKey", () => expect(config.publicKey).toBe("a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9"));
test("publicKey", () =>
expect(config.publicKey).toBe(
"a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9"
));
test("waitForAsyncInsert", () => expect(config.waitForAsyncInsert).toBe(0));
test("asyncInsert", () => expect(config.asyncInsert).toBe(1));
});
Loading