From b9e2c16566de03f526b4f38a18e84b1ef6f85221 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 19 Jun 2024 09:14:57 +0000 Subject: [PATCH 1/8] Removed empty command --- blockchain/polygon/polygon.go | 4 ++- cmd.go | 51 +++++++---------------------------- 2 files changed, 13 insertions(+), 42 deletions(-) diff --git a/blockchain/polygon/polygon.go b/blockchain/polygon/polygon.go index d156416..0d38ed5 100644 --- a/blockchain/polygon/polygon.go +++ b/blockchain/polygon/polygon.go @@ -627,7 +627,9 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa var labels []indexer.TransactionLabel for _, transaction := range decodedTransactions { - + if len(transaction.Input) < 11 { + continue + } selector := transaction.Input[:10] contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) diff --git a/cmd.go b/cmd.go index 520a914..ee3621f 100644 --- a/cmd.go +++ b/cmd.go @@ -41,11 +41,10 @@ func CreateRootCommand() *cobra.Command { blockchainCmd := CreateBlockchainCommand() starknetCmd := CreateStarknetCommand() crawlerCmd := CreateCrawlerCommand() - indexCmd := CreateIndexCommand() inspectorCmd := CreateInspectorCommand() evmCmd := CreateEVMCommand() synchronizerCmd := CreateSynchronizerCommand() - rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, indexCmd, inspectorCmd, synchronizerCmd) + rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, inspectorCmd, synchronizerCmd) // By default, cobra Command objects write to stderr. We have to forcibly set them to output to // stdout. @@ -357,9 +356,9 @@ func CreateInspectorCommand() *cobra.Command { var chain, baseDir, delim, returnFunc, batch, target string var timeout, row int - decodeCommand := &cobra.Command{ - Use: "decode", - Short: "Decode proto data from storage", + readCommand := &cobra.Command{ + Use: "read", + Short: "Read and decode indexed proto data from storage", PreRunE: func(cmd *cobra.Command, args []string) error { storageErr := storage.CheckVariablesForStorage() if storageErr != nil { @@ -414,11 +413,11 @@ func CreateInspectorCommand() *cobra.Command { }, } - decodeCommand.Flags().StringVar(&chain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)") - decodeCommand.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')") - decodeCommand.Flags().StringVar(&batch, "batch", "", "What batch to read") - decodeCommand.Flags().StringVar(&target, "target", "", "What to read: blocks, logs or transactions") - decodeCommand.Flags().IntVar(&row, "row", 0, "Row to read (default: 0)") + readCommand.Flags().StringVar(&chain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)") + readCommand.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')") + readCommand.Flags().StringVar(&batch, "batch", "", "What batch to read") + readCommand.Flags().StringVar(&target, "target", "", "What to read: blocks, logs or transactions") + readCommand.Flags().IntVar(&row, "row", 0, "Row to read (default: 0)") var storageVerify bool @@ -599,41 +598,11 @@ func CreateInspectorCommand() *cobra.Command { storageCommand.Flags().StringVar(&returnFunc, "return-func", "", "Which function use for return") storageCommand.Flags().IntVar(&timeout, "timeout", 180, "List timeout (default: 180)") - inspectorCmd.AddCommand(storageCommand, decodeCommand, dbCommand) + inspectorCmd.AddCommand(storageCommand, readCommand, dbCommand) return inspectorCmd } -func CreateIndexCommand() *cobra.Command { - - indexCommand := &cobra.Command{ - Use: "index", - Short: "Index storage of moonstream blockstore", - } - - // subcommands - - initializeCommand := &cobra.Command{ - Use: "initialize", - Short: "Initialize the index storage", - Run: func(cmd *cobra.Command, args []string) { - fmt.Println("Initializing index storage from go") - }, - } - - readCommand := &cobra.Command{ - Use: "read", - Short: "Read the index storage", - Run: func(cmd *cobra.Command, args []string) { - // index.Read() - }, - } - - indexCommand.AddCommand(initializeCommand, readCommand) - - return indexCommand -} - func CreateStarknetParseCommand() *cobra.Command { var infile string var rawABI []byte From eae3aecb52654931420c16533350e2952f8e566a Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 19 Jun 2024 20:43:04 +0000 Subject: [PATCH 2/8] Synchronizer changes to work with start and end, fixes lost batches --- cmd.go | 25 +++-- crawler/crawler.go | 9 +- indexer/db.go | 6 +- sample.env | 3 +- synchronizer/synchronizer.go | 174 ++++++++++++++++++++++------------- 5 files changed, 137 insertions(+), 80 deletions(-) diff --git a/cmd.go b/cmd.go index ee3621f..0616b08 100644 --- a/cmd.go +++ b/cmd.go @@ -283,9 +283,10 @@ func CreateCrawlerCommand() *cobra.Command { } func CreateSynchronizerCommand() *cobra.Command { - var startBlock, endBlock uint64 + var startBlock, endBlock, batchSize uint64 var timeout int - var chain, baseDir, output, abi_source string + var chain, baseDir, customerDbUriFlag string + var force bool synchronizerCmd := &cobra.Command{ Use: "synchronizer", @@ -320,12 +321,23 @@ func CreateSynchronizerCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { indexer.InitDBConnection() - newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, timeout) + newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, force, timeout) if synchonizerErr != nil { return synchonizerErr } - newSynchronizer.SyncCustomers() + latestBlockNumber, latestErr := newSynchronizer.Client.GetLatestBlockNumber() + if latestErr != nil { + return fmt.Errorf("Failed to get latest block number: %v", latestErr) + } + + if startBlock > latestBlockNumber.Uint64() { + log.Fatalf("Start block could not be greater then latest block number at blockchain") + } + + crawler.CurrentBlockchainState.SetLatestBlockNumber(latestBlockNumber) + + newSynchronizer.Start(customerDbUriFlag) return nil }, @@ -336,8 +348,9 @@ func CreateSynchronizerCommand() *cobra.Command { synchronizerCmd.Flags().Uint64Var(&endBlock, "end-block", 0, "The block number to end decoding at (default: latest block)") synchronizerCmd.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')") synchronizerCmd.Flags().IntVar(&timeout, "timeout", 30, "The timeout for the crawler in seconds (default: 30)") - synchronizerCmd.Flags().StringVar(&output, "output", "output", "The output directory to store the decoded data (default: output)") - synchronizerCmd.Flags().StringVar(&abi_source, "abi-source", "abi", "The source of the ABI (default: abi)") + synchronizerCmd.Flags().Uint64Var(&batchSize, "batch-size", 100, "The number of blocks to crawl in each batch (default: 100)") + synchronizerCmd.Flags().StringVar(&customerDbUriFlag, "customer-db-uri", "", "Set customer database URI for development. This workflow bypass fetching customer IDs and its database URL connection strings from mdb-v3-controller API") + synchronizerCmd.Flags().BoolVar(&force, "force", false, "Set this flag to force the crawler start from the specified block, otherwise it checks database latest indexed block number (default: false)") return synchronizerCmd } diff --git a/crawler/crawler.go b/crawler/crawler.go index fb6a790..6e80f3e 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -44,7 +44,7 @@ type Crawler struct { blockchain string startBlock int64 endBlock int64 - blocksBatch int64 + batchSize int64 confirmations int64 force bool baseDir string @@ -75,7 +75,7 @@ func NewCrawler(blockchain string, startBlock, endBlock, batchSize, confirmation blockchain: blockchain, startBlock: startBlock, endBlock: endBlock, - blocksBatch: batchSize, + batchSize: batchSize, confirmations: confirmations, force: force, baseDir: baseDir, @@ -129,7 +129,6 @@ func SetDefaultStartBlock(confirmations int64, latestBlockNumber *big.Int) int64 func (c *Crawler) Start(threads int) { latestBlockNumber := CurrentBlockchainState.GetLatestBlockNumber() if c.force { - // Start form specified startBlock if it is set and not 0 if c.startBlock == 0 { c.startBlock = SetDefaultStartBlock(c.confirmations, latestBlockNumber) } @@ -153,7 +152,7 @@ func (c *Crawler) Start(threads int) { } } - tempEndBlock := c.startBlock + c.blocksBatch + tempEndBlock := c.startBlock + c.batchSize var safeBlock int64 retryWaitTime := 10 * time.Second @@ -181,7 +180,7 @@ func (c *Crawler) Start(threads int) { safeBlock = latestBlockNumber.Int64() - c.confirmations - tempEndBlock = c.startBlock + c.blocksBatch + tempEndBlock = c.startBlock + c.batchSize if c.endBlock != 0 { if c.endBlock <= tempEndBlock { tempEndBlock = c.endBlock diff --git a/indexer/db.go b/indexer/db.go index 64f7474..10d946d 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -602,7 +602,7 @@ func (p *PostgreSQLpgx) ReadABIJobs(blockchain string) ([]AbiJob, error) { return nil, nil // or return an appropriate error if this is considered an error state } - log.Println("Parsed abiJobs:", len(abiJobs), " for blockchain:", blockchain) + log.Println("Parsed abiJobs:", len(abiJobs), "for blockchain:", blockchain) // If you need to process or log the first ABI job separately, do it here return abiJobs, nil @@ -1001,7 +1001,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro } } - log.Printf("Records %d events inserted into %s", len(events), tableName) + log.Printf("Pushed %d events into %s", len(events), tableName) return nil } @@ -1073,6 +1073,6 @@ func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []Tran } } - log.Printf("Records %d transactions inserted into %s", len(transactions), tableName) + log.Printf("Pushed %d transactions into %s", len(transactions), tableName) return nil } diff --git a/sample.env b/sample.env index 3e3506f..4a27ad9 100644 --- a/sample.env +++ b/sample.env @@ -18,5 +18,4 @@ export SEER_CRAWLER_STORAGE_PREFIX="" # Environment variables for local development export MOONSTREAM_STORAGE_GCP_SERVICE_ACCOUNT_CREDS_PATH="" - -export SEER_CRAWLER_DEBUG=false \ No newline at end of file +export SEER_CRAWLER_DEBUG=false diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index e57b9b8..41bab06 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -16,6 +16,7 @@ import ( "github.com/moonstream-to/seer/crawler" "github.com/moonstream-to/seer/indexer" "github.com/moonstream-to/seer/storage" + "golang.org/x/exp/slices" ) type Synchronizer struct { @@ -25,12 +26,14 @@ type Synchronizer struct { blockchain string startBlock uint64 endBlock uint64 + batchSize uint64 + force bool baseDir string basePath string } // NewSynchronizer creates a new synchronizer instance with the given blockchain handler. -func NewSynchronizer(blockchain, baseDir string, startBlock uint64, endBlock uint64, timeout int) (*Synchronizer, error) { +func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize uint64, force bool, timeout int) (*Synchronizer, error) { var synchronizer Synchronizer basePath := filepath.Join(baseDir, crawler.SeerCrawlerStoragePrefix, "data", blockchain) @@ -55,6 +58,7 @@ func NewSynchronizer(blockchain, baseDir string, startBlock uint64, endBlock uin blockchain: blockchain, startBlock: startBlock, endBlock: endBlock, + batchSize: batchSize, baseDir: baseDir, basePath: basePath, } @@ -163,31 +167,15 @@ func ensurePortInConnectionString(connStr string) (string, error) { return parsedURL.String(), nil } -func (d *Synchronizer) SyncCustomers() error { - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - log.Println("Run synchronization cycle...") - err := d.syncCycle() - if err != nil { - fmt.Println("Error during synchronization cycle:", err) - } - } - } -} - -func (d *Synchronizer) syncCycle() error { - // Initialize a wait group to synchronize goroutines - var wg sync.WaitGroup - errChan := make(chan error, 1) // Buffered channel for error handling +// getCustomers fetch ABI jobs, customer IDs and database URLs +func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]string, []string, error) { + rdsConnections := make(map[string]string) + var customerIds []string // Read ABI jobs from database abiJobs, err := d.ReadAbiJobsFromDatabase(d.blockchain) if err != nil { - return err + return nil, customerIds, err } // Create a set of customer IDs from ABI jobs to remove duplicates @@ -197,93 +185,151 @@ func (d *Synchronizer) syncCycle() error { } // Convert set to slice - var customerIds []string for id := range customerIdsSet { customerIds = append(customerIds, id) } log.Println("Customer IDs to sync:", customerIds) - // Get RDS connections for customer IDs - rdsConnections, err := GetDBConnections(customerIds) - if err != nil { - return err + if customerDbUriFlag == "" { + // Get RDS connections for customer IDs + rdsConnections, err = GetDBConnections(customerIds) + if err != nil { + return nil, customerIds, err + } + } else { + customersLen := 0 + for _, id := range customerIds { + rdsConnections[id] = customerDbUriFlag + customersLen++ + } + log.Printf("For %d customers set one specified db URI with flag", customersLen) } - if d.startBlock == 0 { + return rdsConnections, customerIds, nil +} + +func (d *Synchronizer) Start(customerDbUriFlag string) { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + isEnd, err := d.SyncCycle(customerDbUriFlag) + if err != nil { + fmt.Println("Error during synchronization cycle:", err) + } + if isEnd { + return + } + } + } +} + +func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { + var isEnd bool + var wg sync.WaitGroup + errChan := make(chan error, 1) // Buffered channel for error handling + + rdsConnections, customerIds, customersErr := d.getCustomers(customerDbUriFlag) + if customersErr != nil { + return isEnd, customersErr + } + if !d.force { var latestCustomerBlocks []uint64 for _, id := range customerIds { uri := rdsConnections[id] + // TODO(kompotkot): Rewrite to not initialize each time new psql conneciton pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(uri) if err != nil { log.Println("Error creating RDS connection: ", err) - return err // Error creating RDS connection + return isEnd, err } pool := pgx.GetPool() - fmt.Println("Acquiring pool connection...") conn, err := pool.Acquire(context.Background()) if err != nil { log.Println("Error acquiring pool connection: ", err) - return err // Error acquiring pool connection + return isEnd, err } defer conn.Release() latestBlock, err := pgx.ReadLastLabel(d.blockchain) if err != nil { log.Println("Error reading latest block: ", err) - return err // Error reading the latest block + return isEnd, err } latestCustomerBlocks = append(latestCustomerBlocks, latestBlock) - log.Printf("Latest block for customer %s: %d\n", id, latestBlock) + log.Printf("Latest block for customer %s is: %d\n", id, latestBlock) } // Determine the start block as the maximum of the latest blocks of all customers - for _, block := range latestCustomerBlocks { - if block > d.startBlock { - d.startBlock = block - 100 - } + maxCustomerLatestBlock := slices.Max(latestCustomerBlocks) + if maxCustomerLatestBlock != 0 { + d.startBlock = maxCustomerLatestBlock } } - // In case start block is still 0, get the latest block from the blockchain + // In case start block is still 0, get the latest block from the blockchain minus shift if d.startBlock == 0 { - log.Println("Start block is 0, RDS not contain any blocks yet. Sync indexers then.") - latestBlock, err := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain) - if err != nil { - return err + latestBlockNumber, latestErr := d.Client.GetLatestBlockNumber() + if latestErr != nil { + return isEnd, fmt.Errorf("failed to get latest block number: %v", latestErr) } - d.startBlock = latestBlock - 100 - d.endBlock = latestBlock + d.startBlock = uint64(crawler.SetDefaultStartBlock(0, latestBlockNumber)) } - // Get the latest block from indexer - latestBlock, err := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain) + // Get the latest block from indexes database + indexedLatestBlock, idxLatestErr := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain) + if idxLatestErr != nil { + return isEnd, idxLatestErr + } - if err != nil { - return err + if d.endBlock != 0 && indexedLatestBlock > d.endBlock { + indexedLatestBlock = d.endBlock + } + + if d.startBlock >= indexedLatestBlock { + log.Printf("Value in startBlock %d greater or equal indexedLatestBlock %d, waiting next iteration..", d.startBlock, indexedLatestBlock) + return isEnd, nil } - d.endBlock = latestBlock // Main loop Steps: // 1. Read updates from the indexer db // 2. For each update, read the original event data from storage // 3. Decode input data using ABIs // 4. Write updates to the user RDS + tempEndBlock := d.startBlock + d.batchSize + var isCycleFinished bool + for { + tempEndBlock = d.startBlock + d.batchSize + if d.endBlock != 0 { + if tempEndBlock >= d.endBlock { + tempEndBlock = d.endBlock + isEnd = true + isCycleFinished = true + log.Printf("End block %d almost reached", tempEndBlock) + } + } + if tempEndBlock >= indexedLatestBlock { + tempEndBlock = indexedLatestBlock + isCycleFinished = true + } - for i := d.startBlock; i < d.endBlock; i += 100 { - endBlock := i + 100 + if crawler.SEER_CRAWLER_DEBUG { + log.Printf("Syncing %d blocks from %d to %d\n", tempEndBlock-d.startBlock, d.startBlock, tempEndBlock) + } // Read updates from the indexer db // This function will return a list of customer updates 1 update is 1 customer - updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, i, endBlock, customerIds) + updates, err := indexer.DBConnection.ReadUpdates(d.blockchain, d.startBlock, tempEndBlock, customerIds) if err != nil { - return fmt.Errorf("error reading updates: %w", err) + return isEnd, fmt.Errorf("error reading updates: %w", err) } log.Printf("Read %d users updates from the indexer db\n", len(updates)) - log.Printf("Syncing blocks from %d to %d\n", i, endBlock) for _, update := range updates { wg.Add(1) @@ -294,6 +340,7 @@ func (d *Synchronizer) syncCycle() error { uri := rdsConnections[update.CustomerID] // Create a connection to the user RDS + // TODO(kompotkot): Rewrite to not initialize each time new psql conneciton pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(uri) if err != nil { errChan <- fmt.Errorf("error creating connection to RDS for customer %s: %w", update.CustomerID, err) @@ -353,8 +400,7 @@ func (d *Synchronizer) syncCycle() error { return } - // try to write to user RDS - + // Write events to user RDS pgx.WriteEvents( d.blockchain, decodedEvents, @@ -402,22 +448,22 @@ func (d *Synchronizer) syncCycle() error { return } + // Write transactions to user RDS pgx.WriteTransactions( d.blockchain, decodedTransactions, ) - if err != nil { - errChan <- fmt.Errorf("error reading transactions for customer %s: %w", update.CustomerID, err) - return - } - }(update) } wg.Wait() - d.startBlock = latestBlock + if isCycleFinished { + break + } + + d.startBlock = tempEndBlock + 1 } // Wait for all goroutines to finish @@ -430,9 +476,9 @@ func (d *Synchronizer) syncCycle() error { for err := range errChan { fmt.Println("Error during synchronization cycle:", err) if err != nil { - return err // Return the first error encountered + return isEnd, err } } - return nil // Return nil to indicate success if no errors occurred + return isEnd, nil } From c9f0ac7fef04744aeb0d026095e844479a804f21 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jun 2024 09:09:16 +0000 Subject: [PATCH 3/8] Removed debug code --- blockchain/polygon/polygon.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/blockchain/polygon/polygon.go b/blockchain/polygon/polygon.go index 0d38ed5..48d03a9 100644 --- a/blockchain/polygon/polygon.go +++ b/blockchain/polygon/polygon.go @@ -627,9 +627,6 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa var labels []indexer.TransactionLabel for _, transaction := range decodedTransactions { - if len(transaction.Input) < 11 { - continue - } selector := transaction.Input[:10] contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) From d7fc1c16ac1f693f6d1dbaea1473fb9b226f326e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jun 2024 09:10:15 +0000 Subject: [PATCH 4/8] Fix pol changes --- blockchain/polygon/polygon.go | 1 + 1 file changed, 1 insertion(+) diff --git a/blockchain/polygon/polygon.go b/blockchain/polygon/polygon.go index 48d03a9..d156416 100644 --- a/blockchain/polygon/polygon.go +++ b/blockchain/polygon/polygon.go @@ -627,6 +627,7 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa var labels []indexer.TransactionLabel for _, transaction := range decodedTransactions { + selector := transaction.Input[:10] contractAbi, err := abi.JSON(strings.NewReader(abiMap[transaction.ToAddress][selector]["abi"])) From 07cc1e555024aed97fb4ced1f22f40a315491e53 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jun 2024 09:55:05 +0000 Subject: [PATCH 5/8] Updated start end logic for synchronizer --- cmd.go | 4 +--- synchronizer/synchronizer.go | 35 +++++++++++++++++++++-------------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/cmd.go b/cmd.go index 0616b08..7ddb64c 100644 --- a/cmd.go +++ b/cmd.go @@ -286,7 +286,6 @@ func CreateSynchronizerCommand() *cobra.Command { var startBlock, endBlock, batchSize uint64 var timeout int var chain, baseDir, customerDbUriFlag string - var force bool synchronizerCmd := &cobra.Command{ Use: "synchronizer", @@ -321,7 +320,7 @@ func CreateSynchronizerCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { indexer.InitDBConnection() - newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, force, timeout) + newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout) if synchonizerErr != nil { return synchonizerErr } @@ -350,7 +349,6 @@ func CreateSynchronizerCommand() *cobra.Command { synchronizerCmd.Flags().IntVar(&timeout, "timeout", 30, "The timeout for the crawler in seconds (default: 30)") synchronizerCmd.Flags().Uint64Var(&batchSize, "batch-size", 100, "The number of blocks to crawl in each batch (default: 100)") synchronizerCmd.Flags().StringVar(&customerDbUriFlag, "customer-db-uri", "", "Set customer database URI for development. This workflow bypass fetching customer IDs and its database URL connection strings from mdb-v3-controller API") - synchronizerCmd.Flags().BoolVar(&force, "force", false, "Set this flag to force the crawler start from the specified block, otherwise it checks database latest indexed block number (default: false)") return synchronizerCmd } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 41bab06..e031679 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -27,13 +27,12 @@ type Synchronizer struct { startBlock uint64 endBlock uint64 batchSize uint64 - force bool baseDir string basePath string } // NewSynchronizer creates a new synchronizer instance with the given blockchain handler. -func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize uint64, force bool, timeout int) (*Synchronizer, error) { +func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize uint64, timeout int) (*Synchronizer, error) { var synchronizer Synchronizer basePath := filepath.Join(baseDir, crawler.SeerCrawlerStoragePrefix, "data", blockchain) @@ -209,9 +208,19 @@ func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]string } func (d *Synchronizer) Start(customerDbUriFlag string) { + var isEnd bool + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() + isEnd, err := d.SyncCycle(customerDbUriFlag) + if err != nil { + fmt.Println("Error during first synchronization cycle:", err) + } + if isEnd { + return + } + for { select { case <-ticker.C: @@ -236,7 +245,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { return isEnd, customersErr } - if !d.force { + if d.startBlock == 0 { var latestCustomerBlocks []uint64 for _, id := range customerIds { uri := rdsConnections[id] @@ -269,18 +278,16 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { maxCustomerLatestBlock := slices.Max(latestCustomerBlocks) if maxCustomerLatestBlock != 0 { d.startBlock = maxCustomerLatestBlock + } else { + // In case start block is still 0, get the latest block from the blockchain minus shift + latestBlockNumber, latestErr := d.Client.GetLatestBlockNumber() + if latestErr != nil { + return isEnd, fmt.Errorf("failed to get latest block number: %v", latestErr) + } + d.startBlock = uint64(crawler.SetDefaultStartBlock(0, latestBlockNumber)) } } - // In case start block is still 0, get the latest block from the blockchain minus shift - if d.startBlock == 0 { - latestBlockNumber, latestErr := d.Client.GetLatestBlockNumber() - if latestErr != nil { - return isEnd, fmt.Errorf("failed to get latest block number: %v", latestErr) - } - d.startBlock = uint64(crawler.SetDefaultStartBlock(0, latestBlockNumber)) - } - // Get the latest block from indexes database indexedLatestBlock, idxLatestErr := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain) if idxLatestErr != nil { @@ -459,11 +466,11 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { wg.Wait() + d.startBlock = tempEndBlock + 1 + if isCycleFinished { break } - - d.startBlock = tempEndBlock + 1 } // Wait for all goroutines to finish From f73a2dc64ceb8700089cf0ecc6b59c1796f2ccb2 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jun 2024 10:19:15 +0000 Subject: [PATCH 6/8] Return shift for start block and semaphore logic to not verload --- synchronizer/synchronizer.go | 37 +++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index e031679..37017c4 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -237,8 +237,6 @@ func (d *Synchronizer) Start(customerDbUriFlag string) { func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { var isEnd bool - var wg sync.WaitGroup - errChan := make(chan error, 1) // Buffered channel for error handling rdsConnections, customerIds, customersErr := d.getCustomers(customerDbUriFlag) if customersErr != nil { @@ -277,7 +275,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { // Determine the start block as the maximum of the latest blocks of all customers maxCustomerLatestBlock := slices.Max(latestCustomerBlocks) if maxCustomerLatestBlock != 0 { - d.startBlock = maxCustomerLatestBlock + d.startBlock = maxCustomerLatestBlock - 100 } else { // In case start block is still 0, get the latest block from the blockchain minus shift latestBlockNumber, latestErr := d.Client.GetLatestBlockNumber() @@ -338,11 +336,18 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { log.Printf("Read %d users updates from the indexer db\n", len(updates)) + var wg sync.WaitGroup + + sem := make(chan struct{}, 5) // Semaphore to control concurrency + errChan := make(chan error, 1) // Buffered channel for error handling + for _, update := range updates { wg.Add(1) go func(update indexer.CustomerUpdates) { defer wg.Done() + sem <- struct{}{} // Acquire semaphore + // Get the RDS connection for the customer uri := rdsConnections[update.CustomerID] @@ -461,29 +466,27 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { decodedTransactions, ) + <-sem }(update) } wg.Wait() - d.startBlock = tempEndBlock + 1 + close(sem) + close(errChan) // Close the channel to signal that all goroutines have finished - if isCycleFinished { - break + // Check for errors from goroutines + for err := range errChan { + fmt.Println("Error during synchronization cycle:", err) + if err != nil { + return isEnd, err + } } - } - // Wait for all goroutines to finish - go func() { - wg.Wait() - close(errChan) // Close the channel to signal that all goroutines have finished - }() + d.startBlock = tempEndBlock + 1 - // Check for errors from goroutines - for err := range errChan { - fmt.Println("Error during synchronization cycle:", err) - if err != nil { - return isEnd, err + if isCycleFinished { + break } } From 759164824471baf1e64cc861533155cbd2c05721 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jun 2024 12:51:40 +0000 Subject: [PATCH 7/8] Reuse pgx conn --- synchronizer/synchronizer.go | 119 ++++++++++++++++------------------- 1 file changed, 54 insertions(+), 65 deletions(-) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 37017c4..4ff81cf 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -86,22 +86,6 @@ func NewSynchronizer(blockchain, baseDir string, startBlock, endBlock, batchSize // ------------------------------------------------------------------------------------------------------------------------------- -func GetDBConnections(uuids []string) (map[string]string, error) { - connections := make(map[string]string) // Initialize the map - - for _, id := range uuids { - - connectionString, err := GetDBConnection(id) - - if err != nil { - return nil, err - } - - connections[id] = connectionString - } - return connections, nil -} - func GetDBConnection(uuid string) (string, error) { // Create the request @@ -142,8 +126,6 @@ func GetDBConnection(uuid string) (string, error) { } func (d *Synchronizer) ReadAbiJobsFromDatabase(blockchain string) ([]indexer.AbiJob, error) { - // Simulate reading ABI jobs from the database for a given blockchain. - // This function will need to interact with a real database or an internal API in the future. abiJobs, err := indexer.DBConnection.ReadABIJobs(blockchain) if err != nil { return nil, err @@ -166,15 +148,20 @@ func ensurePortInConnectionString(connStr string) (string, error) { return parsedURL.String(), nil } +type CustomerDBConnection struct { + Uri string + + Pgx *indexer.PostgreSQLpgx +} + // getCustomers fetch ABI jobs, customer IDs and database URLs -func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]string, []string, error) { - rdsConnections := make(map[string]string) - var customerIds []string +func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]CustomerDBConnection, error) { + customerDBConnections := make(map[string]CustomerDBConnection) // Read ABI jobs from database abiJobs, err := d.ReadAbiJobsFromDatabase(d.blockchain) if err != nil { - return nil, customerIds, err + return customerDBConnections, err } // Create a set of customer IDs from ABI jobs to remove duplicates @@ -183,28 +170,30 @@ func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]string customerIdsSet[job.CustomerID] = struct{}{} } - // Convert set to slice - for id := range customerIdsSet { - customerIds = append(customerIds, id) - } - log.Println("Customer IDs to sync:", customerIds) + var customerIds []string - if customerDbUriFlag == "" { - // Get RDS connections for customer IDs - rdsConnections, err = GetDBConnections(customerIds) - if err != nil { - return nil, customerIds, err + for id := range customerIdsSet { + var connectionString string + var dbConnErr error + if customerDbUriFlag == "" { + connectionString, dbConnErr = GetDBConnection(id) + if dbConnErr != nil { + log.Printf("Unable to get connection database URI for %s customer, err: %v", id, dbConnErr) + continue + } + } else { + connectionString = customerDbUriFlag } - } else { - customersLen := 0 - for _, id := range customerIds { - rdsConnections[id] = customerDbUriFlag - customersLen++ + + customerDBConnections[id] = CustomerDBConnection{ + Uri: connectionString, } - log.Printf("For %d customers set one specified db URI with flag", customersLen) + customerIds = append(customerIds, id) + } + log.Println("Customer IDs to sync:", customerIds) - return rdsConnections, customerIds, nil + return customerDBConnections, nil } func (d *Synchronizer) Start(customerDbUriFlag string) { @@ -238,24 +227,31 @@ func (d *Synchronizer) Start(customerDbUriFlag string) { func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { var isEnd bool - rdsConnections, customerIds, customersErr := d.getCustomers(customerDbUriFlag) + customerDBConnections, customersErr := d.getCustomers(customerDbUriFlag) if customersErr != nil { return isEnd, customersErr } + var customerIds []string + for id, customer := range customerDBConnections { + pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(customer.Uri) + if err != nil { + log.Println("Error creating RDS connection: ", err) + return isEnd, err + } - if d.startBlock == 0 { - var latestCustomerBlocks []uint64 - for _, id := range customerIds { - uri := rdsConnections[id] + updatedCustomer := CustomerDBConnection{ + Uri: customer.Uri, + Pgx: pgx, + } + customerDBConnections[id] = updatedCustomer - // TODO(kompotkot): Rewrite to not initialize each time new psql conneciton - pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(uri) - if err != nil { - log.Println("Error creating RDS connection: ", err) - return isEnd, err - } - pool := pgx.GetPool() + customerIds = append(customerIds, id) + } + if d.startBlock == 0 { + var latestCustomerBlocks []uint64 + for id, customer := range customerDBConnections { + pool := customer.Pgx.GetPool() conn, err := pool.Acquire(context.Background()) if err != nil { log.Println("Error acquiring pool connection: ", err) @@ -263,13 +259,13 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } defer conn.Release() - latestBlock, err := pgx.ReadLastLabel(d.blockchain) + latestLabelBlock, err := customer.Pgx.ReadLastLabel(d.blockchain) if err != nil { log.Println("Error reading latest block: ", err) return isEnd, err } - latestCustomerBlocks = append(latestCustomerBlocks, latestBlock) - log.Printf("Latest block for customer %s is: %d\n", id, latestBlock) + latestCustomerBlocks = append(latestCustomerBlocks, latestLabelBlock) + log.Printf("Latest block for customer %s is: %d\n", id, latestLabelBlock) } // Determine the start block as the maximum of the latest blocks of all customers @@ -349,17 +345,10 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { sem <- struct{}{} // Acquire semaphore // Get the RDS connection for the customer - uri := rdsConnections[update.CustomerID] + customer := customerDBConnections[update.CustomerID] // Create a connection to the user RDS - // TODO(kompotkot): Rewrite to not initialize each time new psql conneciton - pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(uri) - if err != nil { - errChan <- fmt.Errorf("error creating connection to RDS for customer %s: %w", update.CustomerID, err) - return - } - - pool := pgx.GetPool() + pool := customer.Pgx.GetPool() conn, err := pool.Acquire(context.Background()) if err != nil { errChan <- fmt.Errorf("error acquiring connection for customer %s: %w", update.CustomerID, err) @@ -413,7 +402,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } // Write events to user RDS - pgx.WriteEvents( + customer.Pgx.WriteEvents( d.blockchain, decodedEvents, ) @@ -461,7 +450,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { } // Write transactions to user RDS - pgx.WriteTransactions( + customer.Pgx.WriteTransactions( d.blockchain, decodedTransactions, ) From 7a36e78f8e81dd43baa10493bf618572e0bfd7db Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 20 Jun 2024 12:55:02 +0000 Subject: [PATCH 8/8] Organized customer pgx set --- synchronizer/synchronizer.go | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 4ff81cf..113cb01 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -155,13 +155,14 @@ type CustomerDBConnection struct { } // getCustomers fetch ABI jobs, customer IDs and database URLs -func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]CustomerDBConnection, error) { +func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]CustomerDBConnection, []string, error) { customerDBConnections := make(map[string]CustomerDBConnection) + var customerIds []string // Read ABI jobs from database abiJobs, err := d.ReadAbiJobsFromDatabase(d.blockchain) if err != nil { - return customerDBConnections, err + return customerDBConnections, customerIds, err } // Create a set of customer IDs from ABI jobs to remove duplicates @@ -170,8 +171,6 @@ func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]Custom customerIdsSet[job.CustomerID] = struct{}{} } - var customerIds []string - for id := range customerIdsSet { var connectionString string var dbConnErr error @@ -185,15 +184,22 @@ func (d *Synchronizer) getCustomers(customerDbUriFlag string) (map[string]Custom connectionString = customerDbUriFlag } + pgx, pgxErr := indexer.NewPostgreSQLpgxWithCustomURI(connectionString) + if pgxErr != nil { + log.Println("Error creating RDS connection for %s customer, err: %v", id, pgxErr) + continue + } + customerDBConnections[id] = CustomerDBConnection{ Uri: connectionString, + Pgx: pgx, } customerIds = append(customerIds, id) } log.Println("Customer IDs to sync:", customerIds) - return customerDBConnections, nil + return customerDBConnections, customerIds, nil } func (d *Synchronizer) Start(customerDbUriFlag string) { @@ -227,26 +233,10 @@ func (d *Synchronizer) Start(customerDbUriFlag string) { func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { var isEnd bool - customerDBConnections, customersErr := d.getCustomers(customerDbUriFlag) + customerDBConnections, customerIds, customersErr := d.getCustomers(customerDbUriFlag) if customersErr != nil { return isEnd, customersErr } - var customerIds []string - for id, customer := range customerDBConnections { - pgx, err := indexer.NewPostgreSQLpgxWithCustomURI(customer.Uri) - if err != nil { - log.Println("Error creating RDS connection: ", err) - return isEnd, err - } - - updatedCustomer := CustomerDBConnection{ - Uri: customer.Uri, - Pgx: pgx, - } - customerDBConnections[id] = updatedCustomer - - customerIds = append(customerIds, id) - } if d.startBlock == 0 { var latestCustomerBlocks []uint64