From 7404190b95cafd6f2a10face39480c0dfe2af8a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20P=C3=A9rez?= Date: Fri, 19 Jan 2024 15:50:02 +0100 Subject: [PATCH] Metrics --- src/scripts/pull_and_save_block_events.ts | 37 ++++++++++++++++++----- src/utils/metrics.ts | 14 ++++++++- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/scripts/pull_and_save_block_events.ts b/src/scripts/pull_and_save_block_events.ts index bdeae790..9b2d8135 100644 --- a/src/scripts/pull_and_save_block_events.ts +++ b/src/scripts/pull_and_save_block_events.ts @@ -17,7 +17,7 @@ import { Block, Transaction, TransactionReceipt } from '../entities'; import { eventScrperProps, EventScraperProps } from '../events'; import { parseBlock, parseTransaction, parseTransactionReceipt } from '../parsers/web3/parse_web3_objects'; import { chunk, logger } from '../utils'; -import { CURRENT_BLOCK, SCRIPT_RUN_DURATION } from '../utils/metrics'; +import { LATEST_SCRAPED_BLOCK, CURRENT_BLOCK, SCRIPT_RUN_DURATION, SAVED_RESULTS } from '../utils/metrics'; import { contractTopicFilter } from './utils/block_utils'; import { getParseSaveTokensAsync } from './utils/web3_utils'; import { web3Factory } from '@0x/dev-utils'; @@ -53,6 +53,15 @@ interface ParsedTransaction { parsedEvents: TypedEvents[] | null; } +class BlockHashMismatchError extends Error { + constructor(m: string) { + super(m); + + // Set the prototype explicitly. + Object.setPrototypeOf(this, BlockHashMismatchError.prototype); + } +} + const provider = web3Factory.getRpcProvider({ rpcUrl: EVM_RPC_URL, }); @@ -201,22 +210,26 @@ async function saveFullBlocks(connection: Connection, eventTables: string[], par const promises: Promise[] = []; /// Blocks + SAVED_RESULTS.labels({ type: 'block' }).inc(parsedBlocks.length); for (const chunkItems of chunk(parsedBlocks, 300)) { promises.push(queryRunner.manager.insert(Block, chunkItems)); } /// Transactions + SAVED_RESULTS.labels({ type: 'transactions' }).inc(parsedTransactions.length); for (const chunkItems of chunk(parsedTransactions, 300)) { promises.push(queryRunner.manager.insert(Transaction, chunkItems)); } /// TransactionReceipts + SAVED_RESULTS.labels({ type: 'transactionReceipts' }).inc(parsedTransactionReceipts.length); for (const chunkItems of chunk(parsedTransactionReceipts, 300)) { promises.push(queryRunner.manager.insert(TransactionReceipt, chunkItems)); } /// Events parsedEventsByType.forEach(async (typedEvents: TypedEvents) => { + SAVED_RESULTS.labels({ type: 'event', event: typedEvents.eventName }).inc(typedEvents.events.length); for (const chunkItems of chunk(typedEvents.events, 300)) { promises.push(queryRunner.manager.insert(typedEvents.eventType, chunkItems as any[])); } @@ -286,7 +299,7 @@ async function getParseSaveBlocksTransactionsEvents( const transactionsWithLogs = newBlockReceipts.map( (txReceipt: EVMTransactionReceipt, txIndex: number): FullTransaction => { if (txReceipt.blockHash !== newBlocks[blockIndex].hash) { - throw Error('Wrong Block hash'); + throw new BlockHashMismatchError('Wrong Block hash'); } return { ...newBlocks[blockIndex].transactions[txIndex], @@ -423,14 +436,22 @@ export class BlockEventsScraper { throw Error(`Big reorg detected, of more than ${lookback}, manual intervention needed`); } - const success = await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, true); + try { + const success = await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, true); - if (success) { - const endTime = new Date().getTime(); - const scriptDurationSeconds = (endTime - startTime) / 1000; - SCRIPT_RUN_DURATION.set({ script: 'events-by-block' }, scriptDurationSeconds); + if (success) { + const endTime = new Date().getTime(); + const scriptDurationSeconds = (endTime - startTime) / 1000; + SCRIPT_RUN_DURATION.set({ script: 'events-by-block' }, scriptDurationSeconds); + LATEST_SCRAPED_BLOCK.labels({ chain: CHAIN_NAME }).set(blockRangeEnd); - logger.info(`Finished pulling events block by in ${scriptDurationSeconds}`); + logger.info(`Finished pulling events block by in ${scriptDurationSeconds}`); + } + } catch (err) { + if (err instanceof BlockHashMismatchError) { + logger.error(err); + return; + } else throw err; } } } diff --git a/src/utils/metrics.ts b/src/utils/metrics.ts index 9fb7c09d..d9747b9e 100644 --- a/src/utils/metrics.ts +++ b/src/utils/metrics.ts @@ -1,7 +1,7 @@ import { CHAIN_NAME, METRICS_PATH, PROMETHEUS_PORT } from '../config'; import { logger } from './logger'; import express from 'express'; -import { Gauge, register } from 'prom-client'; +import { Counter, Gauge, register } from 'prom-client'; export const CURRENT_BLOCK = new Gauge({ name: 'event_scraper_current_block', @@ -9,6 +9,12 @@ export const CURRENT_BLOCK = new Gauge({ labelNames: ['chain'], }); +export const LATEST_SCRAPED_BLOCK = new Gauge({ + name: 'event_scraper_latest_scraped_block', + help: 'The latest scraped block', + labelNames: ['chain'], +}); + export const SCRIPT_RUN_DURATION = new Gauge({ name: 'event_scraper_script_run_duration', help: 'The time a script took to run', @@ -33,6 +39,12 @@ export const SCAN_RESULTS = new Gauge({ labelNames: ['type', 'event', 'includeBridgeTrades'], }); +export const SAVED_RESULTS = new Counter({ + name: 'event_scraper_saved_results', + help: 'The count of how many results are going to be saved to DB', + labelNames: ['type', 'event'], +}); + export const RPC_LOGS_ERROR = new Gauge({ name: 'event_scraper_rpc_error', help: 'Counter for RPC errors',