diff --git a/src/events.ts b/src/events.ts index 4754a216..b1615424 100644 --- a/src/events.ts +++ b/src/events.ts @@ -265,8 +265,12 @@ export type EventScraperProps = { deleteOptions?: DeleteOptions; tokenMetadataMap?: TokenMetadataMap; postProcess?: any; - filterFunction?: (events: Event[], transaction: Transaction) => Event[]; - filterFunctionGetContext?: (events: Event[], web3Source: Web3Source) => Promise; + filterFunction?: (events: Event[], transaction: Transaction, requiredTxnList?: Set) => Event[]; + filterFunctionGetContext?: ( + events: Event[], + web3Source: Web3Source, + requiredTxnList?: Set, + ) => Promise; }; export const eventScrperProps: EventScraperProps[] = [ diff --git a/src/filters/erc20_transfer_events.ts b/src/filters/erc20_transfer_events.ts index ed5e3c7a..47b09313 100644 --- a/src/filters/erc20_transfer_events.ts +++ b/src/filters/erc20_transfer_events.ts @@ -2,24 +2,45 @@ import { Web3Source } from '../data_sources/events/web3'; import { Event, Transaction } from '../entities'; import { getParseTxsAsync } from '../scripts/utils/web3_utils'; -export async function filterERC20TransferEventsGetContext(events: Event[], web3Source: Web3Source): Promise { +export async function filterERC20TransferEventsGetContext( + events: Event[], + web3Source: Web3Source, + requiredTxnList?: Set, +): Promise { if (events.length > 0) { const txHashes = events.map((log: Event) => log.transactionHash); - const txData = await getParseTxsAsync(web3Source, txHashes); - const filteredTxsHashes = txData.parsedTxs - .filter((tx: Transaction) => tx.quoteId) - .map((tx: Transaction) => tx.transactionHash); + let validTxHashSet: Set; + if (requiredTxnList && requiredTxnList.size > 0) { + validTxHashSet = requiredTxnList; + } else { + const txData = await getParseTxsAsync(web3Source, txHashes); + const filteredTxsHashes = txData.parsedTxs + .filter((tx: Transaction) => tx.quoteId) + .map((tx: Transaction) => tx.transactionHash); + + validTxHashSet = new Set(filteredTxsHashes); + } - const validTxHashSet = new Set(filteredTxsHashes); const filteredLogs = events.filter((log: Event) => validTxHashSet.has(log.transactionHash)); return filteredLogs.filter((e) => e !== null); } return []; } -export function filterERC20TransferEvents(events: Event[], transaction: Transaction): Event[] { +export function filterERC20TransferEvents( + events: Event[], + transaction: Transaction, + requiredTxnList?: Set, +): Event[] { + const filteredEvents = new Set(); + + if (requiredTxnList && requiredTxnList.size > 0 && requiredTxnList.has(transaction.transactionHash)) { + events.filter((e) => e !== null).forEach((e) => filteredEvents.add(e)); + } + if (transaction.quoteId) { - return events.filter((e) => e !== null); + events.filter((e) => e !== null).forEach((e) => filteredEvents.add(e)); } - return []; + + return Array.from(filteredEvents); } diff --git a/src/scripts/pull_and_save_block_events.ts b/src/scripts/pull_and_save_block_events.ts index ea804c4d..af005c6a 100644 --- a/src/scripts/pull_and_save_block_events.ts +++ b/src/scripts/pull_and_save_block_events.ts @@ -73,13 +73,12 @@ const provider = web3Factory.getRpcProvider({ }); const web3Source = new Web3Source(provider, EVM_RPC_URL); -function parseBlockTransactionsEvents(fullBlock: FullBlock): ParsedFullBlock { +function parseBlockTransactionsEvents(fullBlock: FullBlock, requiredTxnList?: Set): ParsedFullBlock { const parsedBlock = parseBlock({ ...fullBlock, transactions: [''] }); const usefullTxs: ParsedTransaction[] = fullBlock.transactions .map((transaction: FullTransaction): ParsedTransaction | null => { - const parsedTransactionEvents = parseTransactionEvents(transaction); - + const parsedTransactionEvents = parseTransactionEvents(transaction, requiredTxnList); if (parsedTransactionEvents.parsedTransaction !== null) { return parsedTransactionEvents; } @@ -102,7 +101,7 @@ function parseBlockTransactionsEvents(fullBlock: FullBlock): ParsedFullBlock { }; } -function parseTransactionEvents(transaction: FullTransaction): ParsedTransaction { +function parseTransactionEvents(transaction: FullTransaction, requiredTxnList?: Set): ParsedTransaction { if (transaction.input === '0x') { return { parsedTransaction: null, @@ -122,7 +121,7 @@ function parseTransactionEvents(transaction: FullTransaction): ParsedTransaction const parsedLogs = baseFilteredLogs.map((log: LogEntry) => props.parser(log)); const filteredLogs = props.filterFunction - ? props.filterFunction(parsedLogs, parsedTransaction) + ? props.filterFunction(parsedLogs, parsedTransaction, requiredTxnList) : parsedLogs; const postProcessedLogs = props.postProcess ? props.postProcess(filteredLogs) : filteredLogs; @@ -270,6 +269,7 @@ async function getParseSaveBlocksTransactionsEvents( producer: Producer | null, newBlocks: EVMBlock[], allowPartialSuccess: boolean, + requiredTxnList?: Set, ): Promise { const blockNumbers = newBlocks.map((newBlock) => newBlock.number!); @@ -280,7 +280,7 @@ async function getParseSaveBlocksTransactionsEvents( const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers); const filteredNewBlocksReceipts = newBlocksReceipts.filter( - (blockReciepts) => blockReciepts !== null && blockReciepts !== undefined, + (blockReceipts) => blockReceipts !== null && blockReceipts !== undefined, ); if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) { @@ -288,12 +288,12 @@ async function getParseSaveBlocksTransactionsEvents( return false; } const { nullOnlyAtEnd } = newBlocksReceipts.reduce( - (state, blockReciepts) => { - if (state.hasSeenNull && blockReciepts !== null) { + (state, blockReceipts) => { + if (state.hasSeenNull && blockReceipts !== null) { state.nullOnlyAtEnd = false; } - if (newBlocksReceipts === null) { + if (blockReceipts === null) { state.hasSeenNull = true; } return state; @@ -302,9 +302,9 @@ async function getParseSaveBlocksTransactionsEvents( ); if (nullOnlyAtEnd) { - logger.info('Last block(s) reciepts not found, retrying that block(s) on the next run'); + logger.info('Last block(s) receipts not found, retrying those block(s) on the next run'); } else { - logger.error("Missing intermideate block reciepts, can't continue. Retrying next run"); + logger.error("Missing intermediate block receipts, can't continue. Retrying next run"); logger.error(newBlocksReceipts); return false; } @@ -327,7 +327,9 @@ async function getParseSaveBlocksTransactionsEvents( return { ...newBlocks[blockIndex], transactions: transactionsWithLogs }; }); - const parsedFullBlocks = fullBlocks.map(parseBlockTransactionsEvents); + const parsedFullBlocks = fullBlocks.map((fullBlock) => + parseBlockTransactionsEvents(fullBlock, requiredTxnList), + ); const eventTables = eventScrperProps .filter((props) => props.enabled) @@ -371,9 +373,26 @@ export class BlockEventsScraper { const blockNumbers = oldestBlocksToBackfill.map( (backfillBlock: { block_number: number }) => backfillBlock.block_number, ); + const requiredTxnListQuery = await connection.query( + `SELECT DISTINCT transaction_hash + FROM ${SCHEMA}.tx_backfill + WHERE block_number IN (${blockNumbers.join(',')}) AND done = false`, + ); + + const requiredTxnList = new Set( + requiredTxnListQuery.map((row: { transaction_hash: string }) => row.transaction_hash), + ); const newBlocks = await web3Source.getBatchBlockInfoAsync(blockNumbers, true); - const success = await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, false); + + const success = await getParseSaveBlocksTransactionsEvents( + connection, + producer, + newBlocks, + false, + requiredTxnList, + ); + if (success) { const newBlockNumbers = newBlocks.map((block) => block.number); const queryRunner = connection.createQueryRunner();