Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch reading #103

Merged
merged 13 commits into from
Nov 21, 2024
15 changes: 11 additions & 4 deletions blockchain/blockchain.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *Client) FetchBlocksInRange(from, to *big.Int, debug bool) ([]*seer_comm
func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxRequests int) ([]*seer_common.BlockJson, error) {
var (
blocks []*seer_common.BlockJson

collectedErrors []error
mu sync.Mutex
wg sync.WaitGroup
ctx = context.Background()
Expand All @@ -238,7 +238,7 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
}

sem := make(chan struct{}, maxRequests) // Semaphore to control concurrency
errChan := make(chan error, 1)
errChan := make(chan error, len(blockNumbersRange)) // Channel to collect errors from goroutines

for _, b := range blockNumbersRange {
wg.Add(1)
Expand Down Expand Up @@ -274,10 +274,17 @@ func (c *Client) FetchBlocksInRangeAsync(from, to *big.Int, debug bool, maxReque
close(sem)
close(errChan)

if err := <-errChan; err != nil {
return nil, err
for err := range errChan {
collectedErrors = append(collectedErrors, err)
}

if len(collectedErrors) > 0 {
var errStrings []string
for _, err := range collectedErrors {
errStrings = append(errStrings, err.Error())
}
return nil, fmt.Errorf("errors occurred during crawling: %s", strings.Join(errStrings, "; "))
}
return blocks, nil
}

Expand Down
17 changes: 12 additions & 5 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func CreateCrawlerCommand() *cobra.Command {

func CreateSynchronizerCommand() *cobra.Command {
var startBlock, endBlock, batchSize uint64
var timeout, threads, cycleTickerWaitTime int
var timeout, threads, cycleTickerWaitTime, minBlocksToSync int
var chain, baseDir, customerDbUriFlag string

synchronizerCmd := &cobra.Command{
Expand Down Expand Up @@ -337,14 +337,14 @@ func CreateSynchronizerCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
indexer.InitDBConnection()

newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads)
newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync)
if synchonizerErr != nil {
return synchonizerErr
}

latestBlockNumber, latestErr := newSynchronizer.Client.GetLatestBlockNumber()
if latestErr != nil {
return fmt.Errorf("Failed to get latest block number: %v", latestErr)
return fmt.Errorf("failed to get latest block number: %v", latestErr)
}

if startBlock > latestBlockNumber.Uint64() {
Expand All @@ -368,6 +368,7 @@ func CreateSynchronizerCommand() *cobra.Command {
synchronizerCmd.Flags().StringVar(&customerDbUriFlag, "customer-db-uri", "", "Set customer database URI for development. This workflow bypass fetching customer IDs and its database URL connection strings from mdb-v3-controller API")
synchronizerCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent decoding")
synchronizerCmd.Flags().IntVar(&cycleTickerWaitTime, "cycle-ticker-wait-time", 10, "The wait time for the synchronizer in seconds before it try to start new cycle")
synchronizerCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 10, "The minimum number of blocks to sync before the synchronizer starts decoding")

return synchronizerCmd
}
Expand Down Expand Up @@ -1011,7 +1012,7 @@ func CreateHistoricalSyncCommand() *cobra.Command {
var chain, baseDir, customerDbUriFlag string
var addresses, customerIds []string
var startBlock, endBlock, batchSize uint64
var timeout, threads int
var timeout, threads, minBlocksToSync int
var auto bool

historicalSyncCmd := &cobra.Command{
Expand All @@ -1038,6 +1039,11 @@ func CreateHistoricalSyncCommand() *cobra.Command {
return syncErr
}

blockchainErr := seer_blockchain.CheckVariablesForBlockchains()
if blockchainErr != nil {
return blockchainErr
}

if chain == "" {
return fmt.Errorf("blockchain is required via --chain")
}
Expand All @@ -1047,7 +1053,7 @@ func CreateHistoricalSyncCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
indexer.InitDBConnection()

newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads)
newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync)
if synchonizerErr != nil {
return synchonizerErr
}
Expand All @@ -1073,6 +1079,7 @@ func CreateHistoricalSyncCommand() *cobra.Command {
historicalSyncCmd.Flags().StringSliceVar(&addresses, "addresses", []string{}, "The list of addresses to sync")
historicalSyncCmd.Flags().BoolVar(&auto, "auto", false, "Set this flag to sync all unfinished historical crawl from the database (default: false)")
historicalSyncCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent crawling (default: 5)")
historicalSyncCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 50, "The minimum number of blocks to sync before the synchronizer starts decoding")

return historicalSyncCmd
}
Expand Down
107 changes: 89 additions & 18 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,14 +551,16 @@ func (p *PostgreSQLpgx) GetCustomersIDs(blockchain string) ([]string, error) {
return customerIds, nil
}

func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string) (uint64, uint64, string, []CustomerUpdates, error) {
func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string, minBlocksToSync int) (uint64, uint64, []string, []CustomerUpdates, error) {

pool := p.GetPool()

conn, err := pool.Acquire(context.Background())

var paths []string

if err != nil {
return 0, 0, "", nil, err
return 0, 0, paths, nil, err
}

defer conn.Release()
Expand All @@ -567,20 +569,20 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome

query := fmt.Sprintf(`WITH path as (
SELECT
path
path,
block_number
from
%s
WHERE
block_number = $1
block_number >= $1 and block_number <= $1 + $3
),
latest_block_of_path as (
SELECT
block_number as block_number,
path as path
block_number as latest_block_number
from
%s
WHERE
path = (SELECT path from path)
path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1)
order by block_number desc
limit 1
),
Expand Down Expand Up @@ -627,29 +629,28 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome
customer_id
)
SELECT
block_number,
path,
latest_block_number,
(SELECT array_agg(path) FROM path) as paths,
(SELECT json_agg(json_build_object(customer_id, abis)) FROM reformatted_jobs) as jobs
FROM
latest_block_of_path
`, blocksTableName, blocksTableName)

rows, err := conn.Query(context.Background(), query, fromBlock, blockchain)
rows, err := conn.Query(context.Background(), query, fromBlock, blockchain, minBlocksToSync)

if err != nil {
log.Println("Error querying abi jobs from database", err)
return 0, 0, "", nil, err
return 0, 0, paths, nil, err
}

var customers []map[string]map[string]map[string]*AbiEntry
var path string
var firstBlockNumber, lastBlockNumber uint64

for rows.Next() {
err = rows.Scan(&lastBlockNumber, &path, &customers)
err = rows.Scan(&lastBlockNumber, &paths, &customers)
if err != nil {
log.Println("Error scanning row:", err)
return 0, 0, "", nil, err
return 0, 0, paths, nil, err
}
}

Expand All @@ -668,7 +669,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome

}

return firstBlockNumber, lastBlockNumber, path, customerUpdates, nil
return firstBlockNumber, lastBlockNumber, paths, customerUpdates, nil

}

Expand Down Expand Up @@ -803,31 +804,37 @@ func (p *PostgreSQLpgx) WriteLabes(
}

defer func() {
if err := recover(); err != nil {
if pErr := recover(); pErr != nil {
tx.Rollback(context.Background())
panic(err)
panic(pErr)
} else if err != nil {
tx.Rollback(context.Background())
} else {
err = tx.Commit(context.Background())
if err != nil {
log.Println("Error committing transaction:", err)
panic(err)
}
}
}()

if len(transactions) > 0 {
err := p.WriteTransactions(tx, blockchain, transactions)
if err != nil {
log.Println("Error writing transactions:", err)
return err
}
}

if len(events) > 0 {
err := p.WriteEvents(tx, blockchain, events)
if err != nil {
log.Println("Error writing events:", err)
return err
}
}

return nil
return err
}

func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error {
Expand Down Expand Up @@ -1418,6 +1425,70 @@ func (p *PostgreSQLpgx) FindBatchPath(blockchain string, blockNumber uint64) (st

}

func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumber uint64, minBlocksToSync int) ([]string, uint64, uint64, error) {
pool := p.GetPool()

conn, err := pool.Acquire(context.Background())

if err != nil {
return nil, 0, 0, err
}

defer conn.Release()

var paths []string

var minBlockNumber uint64

var maxBlockNumber uint64

query := fmt.Sprintf(`WITH path as (
SELECT
path,
block_number
from
%s
WHERE
block_number >= $2 and block_number <= $1
), latest_block_of_path as (
SELECT
block_number as latest_block_number
from
%s
WHERE
path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1)
order by block_number desc
limit 1
), earliest_block_of_path as (
SELECT
block_number as first_block_number
from
%s
WHERE
path = (SELECT path FROM path ORDER BY block_number ASC LIMIT 1)
order by block_number asc
limit 1
)
select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT latest_block_number FROM latest_block_of_path) as max_block_number from path
`, BlocksTableName(blockchain), BlocksTableName(blockchain), BlocksTableName(blockchain))

err = conn.QueryRow(context.Background(), query, blockNumber, blockNumber-uint64(minBlocksToSync)).Scan(&paths, &minBlockNumber, &maxBlockNumber)

if err != nil {
if err == pgx.ErrNoRows {
// Blocks not indexed yet
return nil, 0, 0, nil
}
return nil,
0,
0,
err
}

return paths, minBlockNumber, maxBlockNumber, nil

}

func (p *PostgreSQLpgx) GetAbiJobsWithoutDeployBlocks(blockchain string) (map[string]map[string][]string, error) {
pool := p.GetPool()

Expand Down
65 changes: 65 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package storage
import (
"bytes"
"context"
"fmt"
"strings"
"sync"
)

type ListReturnFunc func(any) string
Expand All @@ -19,3 +22,65 @@ type ReadItem struct {
Key string
RowIds []uint64
}

func ReadFiles(keys []string, storageInstance Storer) ([]bytes.Buffer, error) {
var result []bytes.Buffer

for _, key := range keys {
buf, err := storageInstance.Read(key)
if err != nil {
return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err)
}

result = append(result, buf)
}

return result, nil
}

func ReadFilesAsync(keys []string, threads int, storageInstance Storer) ([]bytes.Buffer, error) {
var result []bytes.Buffer
var mu sync.Mutex
var wg sync.WaitGroup
errChan := make(chan error, len(keys))

// Semaphore to limit the number of concurrent reads
sem := make(chan struct{}, threads)

for _, key := range keys {
wg.Add(1)
sem <- struct{}{}
go func(k string) {
defer func() {
<-sem
wg.Done()
}()

buf, err := storageInstance.Read(k)
if err != nil {
errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err)
return
}

mu.Lock()
result = append(result, buf)
mu.Unlock()
}(key)
}

// Wait for all goroutines to finish
wg.Wait()
close(errChan)

// Check if any errors occurred
if len(errChan) > 0 {
var errMsgs []string
for err := range errChan {
errMsgs = append(errMsgs, err.Error())
}
return result, fmt.Errorf("errors occurred during file reads:\n%s",
strings.Join(errMsgs, "\n"))
}

return result, nil
}
Loading
Loading