Skip to content

Commit

Permalink
Merge branch 'main' into feature/substreams-specific-metrics
Browse files Browse the repository at this point in the history
# Conflicts:
#	bun.lockb
  • Loading branch information
JulienR1 committed Nov 23, 2023
2 parents 75e65a7 + 8e9f939 commit d95516b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 26 deletions.
14 changes: 2 additions & 12 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env node

import { saveKnownEntityChanges } from "./src/clickhouse/handleSinkRequest.js";
import { ping } from "./src/clickhouse/ping.js";
import { config } from "./src/config.js";
import DELETE from "./src/fetch/DELETE.js";
import GET from "./src/fetch/GET.js";
Expand All @@ -10,18 +8,10 @@ import POST from "./src/fetch/POST.js";
import PUT from "./src/fetch/PUT.js";
import { NotFound } from "./src/fetch/cors.js";
import { logger } from "./src/logger.js";
import { resume } from "./src/resume.js";

if (config.verbose) logger.enable();

if (config.resume) {
const pingResult = await ping();
if (pingResult.success) {
logger.info("Writing unsinked data to ClickHouse");
await saveKnownEntityChanges();
} else {
logger.error("Resume failed | Error: " + pingResult.error.message);
}
}
if (config.resume) resume();

const app = Bun.serve({
hostname: config.hostname,
Expand Down
20 changes: 20 additions & 0 deletions src/resume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { saveKnownEntityChanges } from "./clickhouse/handleSinkRequest.js";
import { ping } from "./clickhouse/ping.js";
import { logger } from "./logger.js";

export async function resume() {
const pingResult = await ping();
if (!pingResult.success) {
logger.error("Resume failed | Error: " + pingResult.error.message);
return;
}

logger.info("Writing unsinked data to ClickHouse");
const saveResult = await saveKnownEntityChanges();
if (!saveResult.success) {
logger.error("Resume failed | Error: " + saveResult.error.message);
return;
}

logger.info("Resume completed.");
}
41 changes: 27 additions & 14 deletions src/sqlite/sqlite.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { file } from "bun";
import Database, { Statement } from "bun:sqlite";
import { config } from "../config.js";
import { Err, Ok, Result } from "../result.js";
import tableSQL from "./table.sql";

const selectSQL = {
Expand Down Expand Up @@ -68,24 +69,36 @@ class SQLite {
this.insertStatement.run(this.batchNumber, entityChanges, source, chain, blockId, blockNumber, isFinal ? 1 : 0, moduleHash, moduleName, type, timestamp, cursor);
}

public async commitBuffer(onData: (blocks: unknown[], cursors: unknown[], finalBlocks: unknown[], moduleHashes: unknown[], entityChanges: Record<string, unknown[]>) => Promise<void>) {
this.batchNumber++;

const blocks = this.selectBlocksStatement.all(this.batchNumber);
const cursors = this.selectCursorsStatement.all(this.batchNumber);
const finalBlocks = this.selectFinalBlocksStatement.all(this.batchNumber);
const moduleHashes = this.selectModuleHashesStatement.all(this.batchNumber);
const entityChanges: Record<string, Array<unknown>> = {};
public async commitBuffer(onData: (blocks: unknown[], cursors: unknown[], finalBlocks: unknown[], moduleHashes: unknown[], entityChanges: Record<string, unknown[]>) => Promise<void>): Promise<Result> {
try {
this.batchNumber++;

const blocks = this.selectBlocksStatement.all(this.batchNumber);
const cursors = this.selectCursorsStatement.all(this.batchNumber);
const finalBlocks = this.selectFinalBlocksStatement.all(this.batchNumber);
const moduleHashes = this.selectModuleHashesStatement.all(this.batchNumber);
const entityChanges: Record<string, Array<unknown>> = {};

const sources = this.selectSourcesStatement.all(this.batchNumber);
for (const { source } of sources) {
if (source.length > 0) {
entityChanges[source] = this.selecEntityChangesStatement.all(this.batchNumber, source).map((response) => JSON.parse(response.entity_changes));
}
}

const sources = this.selectSourcesStatement.all(this.batchNumber);
for (const { source } of sources) {
if (source.length > 0) {
entityChanges[source] = this.selecEntityChangesStatement.all(this.batchNumber, source).map((response) => JSON.parse(response.entity_changes));
await onData(blocks, cursors, finalBlocks, moduleHashes, entityChanges);
this.deleteStatement.run(this.batchNumber);
} catch (err) {
if (err instanceof Error) {
return Err(err);
} else if (typeof err === "string") {
return Err(new Error(err));
} else {
return Err(new Error(JSON.stringify(err)));
}
}

await onData(blocks, cursors, finalBlocks, moduleHashes, entityChanges);
this.deleteStatement.run(this.batchNumber);
return Ok();
}

private get initialBatchNumber() {
Expand Down

0 comments on commit d95516b

Please sign in to comment.