From 69bcc0882b6cb20c2e5e952b0dbab4d71b59689a Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 6 Jun 2024 17:11:13 +0300 Subject: [PATCH] Fix Read big files from GCS Fix selector cutting for events. Fix join. Fix address formating. --- blockchain/arbitrum_one/arbitrum_one.go | 15 +- .../arbitrum_sepolia/arbitrum_sepolia.go | 15 +- blockchain/blockchain.go.tmpl | 15 +- blockchain/ethereum/ethereum.go | 15 +- .../game7_orbit_arbitrum_sepolia.go | 15 +- blockchain/mantle/mantle.go | 15 +- blockchain/mantle_sepolia/mantle_sepolia.go | 13 +- blockchain/polygon/polygon.go | 17 +- blockchain/xai/xai.go | 15 +- blockchain/xai_sepolia/xai_sepolia.go | 15 +- indexer/db.go | 408 +++++++++--------- indexer/types.go | 2 +- storage/gcp_storage.go | 26 +- synchronizer/synchronizer.go | 27 +- 14 files changed, 303 insertions(+), 310 deletions(-) diff --git a/blockchain/arbitrum_one/arbitrum_one.go b/blockchain/arbitrum_one/arbitrum_one.go index ac17b10..42f5222 100644 --- a/blockchain/arbitrum_one/arbitrum_one.go +++ b/blockchain/arbitrum_one/arbitrum_one.go @@ -495,15 +495,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -529,7 +527,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -561,10 +559,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -597,7 +592,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go index 7e9213b..d903c12 100644 --- a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go +++ b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go @@ -495,15 +495,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -529,7 +527,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -561,10 +559,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -597,7 +592,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/blockchain.go.tmpl b/blockchain/blockchain.go.tmpl index 15f3aaf..83acab4 100644 --- a/blockchain/blockchain.go.tmpl +++ b/blockchain/blockchain.go.tmpl @@ -495,15 +495,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -529,7 +527,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -561,10 +559,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -597,7 +592,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/ethereum/ethereum.go b/blockchain/ethereum/ethereum.go index 23b6cbf..1c44f58 100644 --- a/blockchain/ethereum/ethereum.go +++ b/blockchain/ethereum/ethereum.go @@ -490,15 +490,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -524,7 +522,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -556,10 +554,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -592,7 +587,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go index c29b862..df12551 100644 --- a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go +++ b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go @@ -495,15 +495,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -529,7 +527,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -561,10 +559,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -597,7 +592,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/mantle/mantle.go b/blockchain/mantle/mantle.go index 5d7bb57..0aee2f8 100644 --- a/blockchain/mantle/mantle.go +++ b/blockchain/mantle/mantle.go @@ -490,15 +490,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -524,7 +522,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -556,10 +554,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -592,7 +587,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/mantle_sepolia/mantle_sepolia.go b/blockchain/mantle_sepolia/mantle_sepolia.go index 0c5024e..24dea18 100644 --- a/blockchain/mantle_sepolia/mantle_sepolia.go +++ b/blockchain/mantle_sepolia/mantle_sepolia.go @@ -490,15 +490,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -524,7 +522,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -557,9 +555,8 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -592,7 +589,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/polygon/polygon.go b/blockchain/polygon/polygon.go index 1ad4104..9a20821 100644 --- a/blockchain/polygon/polygon.go +++ b/blockchain/polygon/polygon.go @@ -480,6 +480,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint decodedEvents, err := c.DecodeProtoEventLogs(events) if err != nil { + fmt.Println("Error decoding events: ", err) return nil, err } @@ -490,15 +491,14 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) + if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -524,7 +524,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -556,10 +556,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -592,7 +589,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/xai/xai.go b/blockchain/xai/xai.go index 822bd7e..99ee254 100644 --- a/blockchain/xai/xai.go +++ b/blockchain/xai/xai.go @@ -495,15 +495,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -529,7 +527,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -561,10 +559,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -597,7 +592,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/blockchain/xai_sepolia/xai_sepolia.go b/blockchain/xai_sepolia/xai_sepolia.go index 0b690c8..303207b 100644 --- a/blockchain/xai_sepolia/xai_sepolia.go +++ b/blockchain/xai_sepolia/xai_sepolia.go @@ -495,15 +495,13 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint var topicSelector string if len(event.Topics) > 0 { - topicSelector = event.Topics[0][:10] + topicSelector = event.Topics[0] } else { continue } - checksumAddress := common.HexToAddress(event.Address).Hex() - // Get the ABI string - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][topicSelector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[event.Address][topicSelector]["abi"])) if err != nil { fmt.Println("Error initializing contract ABI: ", err) return nil, err @@ -529,7 +527,7 @@ func (c *Client) DecodeProtoEventsToLabels(events []string, blocksCache map[uint // Convert event to label eventLabel := indexer.EventLabel{ Label: indexer.SeerCrawlerLabel, - LabelName: abiMap[checksumAddress][topicSelector]["abi_name"], + LabelName: abiMap[event.Address][topicSelector]["abi_name"], LabelType: "event", BlockNumber: event.BlockNumber, BlockHash: event.BlockHash, @@ -561,10 +559,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa selector := transaction.Input[:10] - // To checksum address - checksumAddress := common.HexToAddress(transaction.ToAddress).Hex() - - contractAbi, err := abi.JSON(strings.NewReader(abiMap[checksumAddress][selector]["abi"])) + contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) if err != nil { return nil, err @@ -597,7 +592,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa BlockNumber: transaction.BlockNumber, BlockHash: transaction.BlockHash, CallerAddress: transaction.FromAddress, - LabelName: abiMap[checksumAddress][selector]["abi_name"], + LabelName: abiMap[transaction.ToAddress][selector]["abi_name"], LabelType: "tx_call", OriginAddress: transaction.FromAddress, Label: indexer.SeerCrawlerLabel, diff --git a/indexer/db.go b/indexer/db.go index cec6cea..88f8ab4 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -13,7 +13,6 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/moonstream-to/seer/storage" ) // DB is a global variable to hold the GORM database connection. @@ -542,7 +541,7 @@ func (p *PostgreSQLpgx) ReadABIJobs(blockchain string) ([]AbiJob, error) { defer conn.Release() - rows, err := conn.Query(context.Background(), "SELECT id, address, user_id, customer_id, abi_selector, chain, abi_name, status, historical_crawl_status, progress, moonworm_task_pickedup, abi, created_at, updated_at FROM abi_jobs where chain=$1 ", storage.Blockchains[blockchain]) + rows, err := conn.Query(context.Background(), "SELECT id, address, user_id, customer_id, abi_selector, chain, abi_name, status, historical_crawl_status, progress, moonworm_task_pickedup, abi, created_at, updated_at FROM abi_jobs where chain=$1 ", blockchain) if err != nil { return nil, err @@ -618,201 +617,216 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock blocksTableName := BlocksTableName(blockchain) - query := fmt.Sprintf(` - WITH blocks as ( - SELECT - block_number, - block_hash, - block_timestamp - from - %s - WHERE - block_number >= $1 - and block_number <= $2 - ), - transactions AS ( - SELECT - bk.block_number, - bk.block_hash, - bk.block_timestamp, - tx.hash AS transaction_hash, - tx.to_address AS transaction_address, - tx.selector AS transaction_selector, - tx.row_id AS transaction_row_id, - tx.path AS transaction_path - FROM - blocks bk - LEFT JOIN %s tx ON tx.block_hash = bk.block_hash - ), - events AS ( - SELECT - bk.block_number, - bk.block_hash, - bk.block_timestamp, - logs.transaction_hash AS transaction_hash, - logs.address AS event_address, - LEFT(logs.selector, 10) AS event_selector, - logs.row_id AS event_row_id, - logs.path AS event_path - FROM - blocks bk - LEFT JOIN %s logs ON logs.block_hash = bk.block_hash - ), - jobs AS ( - SELECT - decode(REPLACE(address, '0x', ''), 'hex') as address, - address as address_str, - customer_id, - abi_selector, - abi_name, - abi - FROM - abi_jobs - WHERE - chain = $3 - ), - address_abis AS ( - SELECT - address_str, - customer_id, - json_object_agg( - abi_selector, - json_build_object( - 'abi', '[' || abi || ']', - 'abi_name', abi_name - ) - ) AS abis_per_address - FROM - jobs - GROUP BY - address_str, customer_id - ), - reformatted_jobs AS ( - SELECT - customer_id, - json_object_agg( - address_str, - abis_per_address - ) AS abis - FROM - address_abis - GROUP BY - customer_id - ), - abi_transactions AS ( - SELECT - transactions.block_number, - transactions.block_timestamp, - jobs.customer_id, - jobs.abi_name, - jobs.address_str, - transactions.transaction_hash, - transactions.transaction_address, - transactions.transaction_selector, - transactions.transaction_row_id, - transactions.transaction_path - FROM - transactions - inner JOIN jobs ON transactions.transaction_address = jobs.address - AND transactions.transaction_selector = jobs.abi_selector - ), - abi_events AS ( - SELECT - events.block_number, - events.block_timestamp, - jobs.customer_id, - jobs.abi_name, - jobs.address_str, - events.transaction_hash, - events.block_hash, - events.event_address, - events.event_selector, - events.event_row_id, - events.event_path - FROM - events - inner JOIN jobs ON events.event_address = jobs.address - AND events.event_selector = jobs.abi_selector - ), - combined AS ( - SELECT - block_number, - block_timestamp, - customer_id, - 'transaction' AS type, - abi_name, - address_str, - transaction_hash AS hash, - transaction_address AS address, - transaction_selector AS selector, - transaction_row_id AS row_id, - transaction_path AS path - FROM - abi_transactions - where - transaction_hash is not null - UNION - ALL - SELECT - block_number, - block_timestamp, - customer_id, - 'event' AS type, - abi_name, - address_str, - transaction_hash AS hash, - event_address AS address, - event_selector AS selector, - event_row_id AS row_id, - event_path AS path - FROM - abi_events - where - transaction_hash is not null - ) - SELECT - customer_id, - ( - SELECT abis from reformatted_jobs where reformatted_jobs.customer_id = combined.customer_id - ) as abis, - json_object_agg(block_number, block_timestamp) AS blocks_cache, - json_build_object( - 'transactions', - COALESCE( - json_agg( - json_build_object( - 'hash', hash, - 'address', address_str, - 'selector', selector, - 'row_id', row_id, - 'path', path - ) - ) FILTER (WHERE type = 'transaction'), '[]'), - 'events', - COALESCE( - json_agg( - CASE - WHEN type = 'event' THEN json_build_object( - 'hash', - hash, - 'address', - address_str, - 'selector', - selector, - 'row_id', - row_id, - 'path', - path - ) - END - ) FILTER (WHERE type = 'event'), '[]') - ) AS data - FROM - combined - GROUP BY - customer_id`, blocksTableName, transactionsTableName, logsTableName) - - rows, err := conn.Query(context.Background(), query, fromBlock, toBlock, storage.Blockchains[blockchain]) + query := fmt.Sprintf(`WITH blocks as ( + SELECT + block_number, + block_hash, + block_timestamp + from + %s + WHERE + block_number >= $1 + and block_number <= $2 + ), + transactions AS ( + SELECT + bk.block_number, + bk.block_hash, + bk.block_timestamp, + tx.hash AS transaction_hash, + tx.to_address AS transaction_address, + tx.selector AS transaction_selector, + tx.row_id AS transaction_row_id, + tx.path AS transaction_path + FROM + blocks bk + LEFT JOIN %s tx ON tx.block_hash = bk.block_hash + ), + events AS ( + SELECT + bk.block_number, + bk.block_hash, + bk.block_timestamp, + logs.transaction_hash AS transaction_hash, + logs.address AS event_address, + logs.selector AS event_selector, + logs.row_id AS event_row_id, + logs.path AS event_path + FROM + blocks bk + LEFT JOIN %s logs ON logs.block_hash = bk.block_hash + ), + jobs AS ( + SELECT + address as address, + '0x' || encode(address, 'hex') as address_str, + customer_id, + abi_selector, + abi_name, + abi + FROM + abi_jobs + WHERE + chain = $3 + ), + address_abis AS ( + SELECT + address_str, + customer_id, + json_object_agg( + abi_selector, + json_build_object( + 'abi', + '[' || abi || ']', + 'abi_name', + abi_name + ) + ) AS abis_per_address + FROM + jobs + GROUP BY + address_str, + customer_id + ), + reformatted_jobs AS ( + SELECT + customer_id, + json_object_agg(address_str, abis_per_address) AS abis + FROM + address_abis + GROUP BY + customer_id + ), + abi_transactions AS ( + SELECT + transactions.block_number, + transactions.block_timestamp, + jobs.customer_id, + jobs.abi_name, + jobs.address_str, + transactions.transaction_hash, + transactions.transaction_address, + transactions.transaction_selector, + transactions.transaction_row_id, + transactions.transaction_path + FROM + transactions + inner JOIN jobs ON transactions.transaction_address = jobs.address + AND transactions.transaction_selector = jobs.abi_selector + ), + abi_events AS ( + SELECT + events.block_number, + events.block_timestamp, + jobs.customer_id, + jobs.abi_name, + jobs.address_str, + events.transaction_hash, + events.block_hash, + events.event_address, + events.event_selector, + events.event_row_id, + events.event_path + FROM + events + inner JOIN jobs ON events.event_address = jobs.address + AND events.event_selector = jobs.abi_selector + ), + combined AS ( + SELECT + block_number, + block_timestamp, + customer_id, + 'transaction' AS type, + abi_name, + address_str, + transaction_hash AS hash, + transaction_address AS address, + transaction_selector AS selector, + transaction_row_id AS row_id, + transaction_path AS path + FROM + abi_transactions + UNION + ALL + SELECT + block_number, + block_timestamp, + customer_id, + 'event' AS type, + abi_name, + address_str, + transaction_hash AS hash, + event_address AS address, + event_selector AS selector, + event_row_id AS row_id, + event_path AS path + FROM + abi_events + ) + SELECT + customer_id, + ( + SELECT + abis + from + reformatted_jobs + where + reformatted_jobs.customer_id = combined.customer_id + ) as abis, + json_object_agg(block_number, block_timestamp) AS blocks_cache, + json_build_object( + 'transactions', + COALESCE( + json_agg( + json_build_object( + 'hash', + hash, + 'address', + address_str, + 'selector', + selector, + 'row_id', + row_id, + 'path', + path + ) + ) FILTER ( + WHERE + type = 'transaction' + ), + '[]' + ), + 'events', + COALESCE( + json_agg( + CASE + WHEN type = 'event' THEN json_build_object( + 'hash', + hash, + 'address', + address_str, + 'selector', + selector, + 'row_id', + row_id, + 'path', + path + ) + END + ) FILTER ( + WHERE + type = 'event' + ), + '[]' + ) + ) AS data + FROM + combined + GROUP BY + customer_id`, blocksTableName, transactionsTableName, logsTableName) + + rows, err := conn.Query(context.Background(), query, fromBlock, toBlock, blockchain) if err != nil { log.Println("Error querying abi jobs from database", err) diff --git a/indexer/types.go b/indexer/types.go index bdb071d..c6e6613 100644 --- a/indexer/types.go +++ b/indexer/types.go @@ -120,7 +120,7 @@ type BlockCache struct { type AbiJob struct { ID string - Address string + Address []byte UserID string CustomerID string AbiSelector string diff --git a/storage/gcp_storage.go b/storage/gcp_storage.go index ec3b678..b1cd0a8 100644 --- a/storage/gcp_storage.go +++ b/storage/gcp_storage.go @@ -4,6 +4,8 @@ import ( "bufio" "context" "fmt" + "io" + "strings" "cloud.google.com/go/storage" ) @@ -106,7 +108,6 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) { result := make(map[string][]string) for _, item := range readItems { - obj := bucket.Object(item.Key) r, err := obj.NewReader(ctx) @@ -115,7 +116,7 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) { } defer r.Close() - scanner := bufio.NewScanner(r) + reader := bufio.NewReader(r) rowMap := make(map[uint64]bool) for _, id := range item.RowIds { @@ -124,19 +125,26 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) { var currentRow uint64 = 0 - for scanner.Scan() { - if rowMap[currentRow+1] { + for { + line, err := reader.ReadString('\n') + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("failed to read object from bucket: %v", err) + } + + // Remove the newline character from the end of the line if it exists + line = strings.TrimSuffix(line, "\n") + + if rowMap[currentRow] { if result[item.Key] == nil { result[item.Key] = make([]string, 0) } - result[item.Key] = append(result[item.Key], scanner.Text()) + result[item.Key] = append(result[item.Key], line) } currentRow++ } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("failed to read object from bucket: %v", err) - } } return result, nil diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index b1b285b..b8a7908 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -12,6 +12,7 @@ import ( "time" "github.com/moonstream-to/seer/blockchain" + "github.com/moonstream-to/seer/crawler" "github.com/moonstream-to/seer/indexer" "github.com/moonstream-to/seer/storage" ) @@ -88,6 +89,7 @@ func GetDBConnection(uuid string) (string, error) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.Println("Failed to get connection string for id", uuid, ":", MOONSTREAM_DB_V3_CONTROLLER_API+"/customers/"+uuid+"/instances/1/creds/seer/url") return "", fmt.Errorf("failed to get connection string for id %s: %s", uuid, resp.Status) } @@ -153,8 +155,19 @@ func (d *Synchronizer) syncCycle() error { var wg sync.WaitGroup errChan := make(chan error, 1) // Buffered channel for error handling + if d.blockchain == "" { + return fmt.Errorf("blockchain is not set") + } + if d.providerURI == "" { + url, ok := crawler.BlockchainURLs[d.blockchain] + if !ok { + return fmt.Errorf("blockchain URL is not set") + } + d.providerURI = url + } + // Read client - client, err := blockchain.NewClient(d.blockchain, "") + client, err := blockchain.NewClient(d.blockchain, d.providerURI) if err != nil { log.Println("Error initializing blockchain client:", err) log.Fatal(err) @@ -312,11 +325,11 @@ func (d *Synchronizer) syncCycle() error { RowIds: rowIds, }) } - // Read all rowIds for each path encodedEvents, err := storageInstance.ReadBatch(eventsReadMap) if err != nil { + fmt.Println("Error reading events: ", err) errChan <- fmt.Errorf("error reading events for customer %s: %w", update.CustomerID, err) return } @@ -329,16 +342,17 @@ func (d *Synchronizer) syncCycle() error { all_events = append(all_events, data...) } + // write abis to file + // Decode the events using ABIs decodedEvents, err := client.DecodeProtoEventsToLabels(all_events, update.BlocksCache, update.Abis) if err != nil { + fmt.Println("Error decoding events: ", err) errChan <- fmt.Errorf("error decoding events for customer %s: %w", update.CustomerID, err) return } - log.Println("Decoded events amount: ", len(decodedEvents)) - // try to write to user RDS pgx.WriteEvents( @@ -366,7 +380,6 @@ func (d *Synchronizer) syncCycle() error { RowIds: rowIds, }) } - encodedTransactions, err := storageInstance.ReadBatch(transactionsReadMap) if err != nil { @@ -407,6 +420,9 @@ func (d *Synchronizer) syncCycle() error { }(update) } + + wg.Wait() + d.startBlock = latestBlock } @@ -418,6 +434,7 @@ func (d *Synchronizer) syncCycle() error { // Check for errors from goroutines for err := range errChan { + fmt.Println("Error during synchronization cycle:", err) if err != nil { return err // Return the first error encountered }