Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch reading #103

Merged
merged 13 commits into from
Nov 21, 2024
17 changes: 12 additions & 5 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func CreateCrawlerCommand() *cobra.Command {

func CreateSynchronizerCommand() *cobra.Command {
var startBlock, endBlock, batchSize uint64
var timeout, threads, cycleTickerWaitTime int
var timeout, threads, cycleTickerWaitTime, minBlocksToSync int
var chain, baseDir, customerDbUriFlag string

synchronizerCmd := &cobra.Command{
Expand Down Expand Up @@ -337,14 +337,14 @@ func CreateSynchronizerCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
indexer.InitDBConnection()

newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads)
newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync)
if synchonizerErr != nil {
return synchonizerErr
}

latestBlockNumber, latestErr := newSynchronizer.Client.GetLatestBlockNumber()
if latestErr != nil {
return fmt.Errorf("Failed to get latest block number: %v", latestErr)
return fmt.Errorf("failed to get latest block number: %v", latestErr)
}

if startBlock > latestBlockNumber.Uint64() {
Expand All @@ -368,6 +368,7 @@ func CreateSynchronizerCommand() *cobra.Command {
synchronizerCmd.Flags().StringVar(&customerDbUriFlag, "customer-db-uri", "", "Set customer database URI for development. This workflow bypass fetching customer IDs and its database URL connection strings from mdb-v3-controller API")
synchronizerCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent decoding")
synchronizerCmd.Flags().IntVar(&cycleTickerWaitTime, "cycle-ticker-wait-time", 10, "The wait time for the synchronizer in seconds before it try to start new cycle")
synchronizerCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 10, "The minimum number of blocks to sync before the synchronizer starts decoding")

return synchronizerCmd
}
Expand Down Expand Up @@ -1011,7 +1012,7 @@ func CreateHistoricalSyncCommand() *cobra.Command {
var chain, baseDir, customerDbUriFlag string
var addresses, customerIds []string
var startBlock, endBlock, batchSize uint64
var timeout, threads int
var timeout, threads, minBlocksToSync int
var auto bool

historicalSyncCmd := &cobra.Command{
Expand All @@ -1038,6 +1039,11 @@ func CreateHistoricalSyncCommand() *cobra.Command {
return syncErr
}

blockchainErr := seer_blockchain.CheckVariablesForBlockchains()
if blockchainErr != nil {
return blockchainErr
}

if chain == "" {
return fmt.Errorf("blockchain is required via --chain")
}
Expand All @@ -1047,7 +1053,7 @@ func CreateHistoricalSyncCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
indexer.InitDBConnection()

newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads)
newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout, threads, minBlocksToSync)
if synchonizerErr != nil {
return synchonizerErr
}
Expand All @@ -1073,6 +1079,7 @@ func CreateHistoricalSyncCommand() *cobra.Command {
historicalSyncCmd.Flags().StringSliceVar(&addresses, "addresses", []string{}, "The list of addresses to sync")
historicalSyncCmd.Flags().BoolVar(&auto, "auto", false, "Set this flag to sync all unfinished historical crawl from the database (default: false)")
historicalSyncCmd.Flags().IntVar(&threads, "threads", 5, "Number of go-routines for concurrent crawling (default: 5)")
historicalSyncCmd.Flags().IntVar(&minBlocksToSync, "min-blocks-to-sync", 50, "The minimum number of blocks to sync before the synchronizer starts decoding")

return historicalSyncCmd
}
Expand Down
107 changes: 89 additions & 18 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,14 +551,14 @@ func (p *PostgreSQLpgx) GetCustomersIDs(blockchain string) ([]string, error) {
return customerIds, nil
}

func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string) (uint64, uint64, string, []CustomerUpdates, error) {
func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, customerIds []string, minBlocksToSync int) (uint64, uint64, []string, []CustomerUpdates, error) {

pool := p.GetPool()

conn, err := pool.Acquire(context.Background())

if err != nil {
return 0, 0, "", nil, err
return 0, 0, make([]string, 0), nil, err
}

defer conn.Release()
Expand All @@ -567,20 +567,20 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome

query := fmt.Sprintf(`WITH path as (
SELECT
path
path,
block_number
from
%s
WHERE
block_number = $1
block_number >= $1 and block_number <= $1 + $3
),
latest_block_of_path as (
SELECT
block_number as block_number,
path as path
block_number as last_block_number
from
%s
WHERE
path = (SELECT path from path)
path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1)
order by block_number desc
limit 1
),
Expand Down Expand Up @@ -627,29 +627,29 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome
customer_id
)
SELECT
block_number,
path,
last_block_number,
Andrei-Dolgolev marked this conversation as resolved.
Show resolved Hide resolved
(SELECT array_agg(path) FROM path) as paths,
(SELECT json_agg(json_build_object(customer_id, abis)) FROM reformatted_jobs) as jobs
FROM
latest_block_of_path
`, blocksTableName, blocksTableName)

rows, err := conn.Query(context.Background(), query, fromBlock, blockchain)
rows, err := conn.Query(context.Background(), query, fromBlock, blockchain, minBlocksToSync)

if err != nil {
log.Println("Error querying abi jobs from database", err)
return 0, 0, "", nil, err
return 0, 0, make([]string, 0), nil, err
Andrei-Dolgolev marked this conversation as resolved.
Show resolved Hide resolved
}

var customers []map[string]map[string]map[string]*AbiEntry
var path string
var paths []string
var firstBlockNumber, lastBlockNumber uint64

for rows.Next() {
err = rows.Scan(&lastBlockNumber, &path, &customers)
err = rows.Scan(&lastBlockNumber, &paths, &customers)
if err != nil {
log.Println("Error scanning row:", err)
return 0, 0, "", nil, err
return 0, 0, make([]string, 0), nil, err
}
}

Expand All @@ -668,7 +668,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, custome

}

return firstBlockNumber, lastBlockNumber, path, customerUpdates, nil
return firstBlockNumber, lastBlockNumber, paths, customerUpdates, nil

}

Expand Down Expand Up @@ -791,6 +791,7 @@ func (p *PostgreSQLpgx) WriteLabes(
conn, err := pool.Acquire(context.Background())

if err != nil {
fmt.Println("Error acquiring connection:", err)
Andrei-Dolgolev marked this conversation as resolved.
Show resolved Hide resolved
return err
}

Expand All @@ -799,35 +800,41 @@ func (p *PostgreSQLpgx) WriteLabes(
tx, err := conn.Begin(context.Background())

if err != nil {
fmt.Println("Error beginning transaction:", err)
return err
}

defer func() {
if err := recover(); err != nil {
if pErr := recover(); pErr != nil {
tx.Rollback(context.Background())
panic(err)
panic(pErr)
} else if err != nil {
tx.Rollback(context.Background())
} else {
err = tx.Commit(context.Background())
if err != nil {
fmt.Println("Error committing transaction:", err)
}
}
}()

if len(transactions) > 0 {
err := p.WriteTransactions(tx, blockchain, transactions)
if err != nil {
fmt.Println("Error writing transactions:", err)
return err
}
}

if len(events) > 0 {
err := p.WriteEvents(tx, blockchain, events)
if err != nil {
fmt.Println("Error writing events:", err)
return err
}
}

return nil
return err
}

func (p *PostgreSQLpgx) WriteEvents(tx pgx.Tx, blockchain string, events []EventLabel) error {
Expand Down Expand Up @@ -1418,6 +1425,70 @@ func (p *PostgreSQLpgx) FindBatchPath(blockchain string, blockNumber uint64) (st

}

func (p *PostgreSQLpgx) RetrievePathsAndBlockBounds(blockchain string, blockNumber uint64, minBlocksToSync int) ([]string, uint64, uint64, error) {
pool := p.GetPool()

conn, err := pool.Acquire(context.Background())

if err != nil {
return nil, 0, 0, err
}

defer conn.Release()

var paths []string

var minBlockNumber uint64

var maxBlockNumber uint64

query := fmt.Sprintf(`WITH path as (
SELECT
path,
block_number
from
%s
WHERE
block_number >= $2 and block_number <= $1
), latest_block_of_path as (
SELECT
block_number as last_block_number
Andrei-Dolgolev marked this conversation as resolved.
Show resolved Hide resolved
from
%s
WHERE
path = (SELECT path FROM path ORDER BY block_number DESC LIMIT 1)
order by block_number desc
limit 1
), earliest_block_of_path as (
SELECT
block_number as first_block_number
from
%s
WHERE
path = (SELECT path FROM path ORDER BY block_number ASC LIMIT 1)
order by block_number asc
limit 1
)
select ARRAY_AGG(path) as paths, (SELECT first_block_number FROM earliest_block_of_path) as min_block_number, (SELECT last_block_number FROM latest_block_of_path) as max_block_number from path
`, BlocksTableName(blockchain), BlocksTableName(blockchain), BlocksTableName(blockchain))

err = conn.QueryRow(context.Background(), query, blockNumber, blockNumber-uint64(minBlocksToSync)).Scan(&paths, &minBlockNumber, &maxBlockNumber)

if err != nil {
if err == pgx.ErrNoRows {
// Blocks not indexed yet
return nil, 0, 0, nil
}
return nil,
0,
0,
err
}

return paths, minBlockNumber, maxBlockNumber, nil

}

func (p *PostgreSQLpgx) GetAbiJobsWithoutDeployBlocks(blockchain string) (map[string]map[string][]string, error) {
pool := p.GetPool()

Expand Down
10 changes: 10 additions & 0 deletions storage/aws_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,13 @@ func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) {
// Implement the ReadBatch method
return nil, nil
}

func (s *S3) ReadFiles(keys []string) ([]bytes.Buffer, error) {
// Implement the ReadFiles method
return []bytes.Buffer{}, nil
}

func (s *S3) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) {
// Implement the ReadFilesAsync method
return []bytes.Buffer{}, nil
}
56 changes: 56 additions & 0 deletions storage/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"os"
"path/filepath"
"sync"
)

type FileStorage struct {
Expand Down Expand Up @@ -144,3 +145,58 @@ func (fs *FileStorage) Delete(key string) error {
// Implement the Delete method
return nil
}

func (fs *FileStorage) ReadFiles(keys []string) ([]bytes.Buffer, error) {

var data []bytes.Buffer

for _, key := range keys {
dataBlock, err := fs.Read(key)

if err != nil {
return nil, err
}
data = append(data, dataBlock)

}
return data, nil
}

func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) {
var data []bytes.Buffer
var mu sync.Mutex
var wg sync.WaitGroup
errChan := make(chan error, len(keys))

// Semaphore to limit the number of concurrent reads
sem := make(chan struct{}, threads)

for _, key := range keys {
wg.Add(1)
sem <- struct{}{}
go func(k string) {
defer func() {
<-sem
wg.Done()
}()
bf, err := fs.Read(k)
if err != nil {
errChan <- fmt.Errorf("failed to read file %s: %v", k, err)
return
}

mu.Lock()
data = append(data, bf)
mu.Unlock()
}(key)
}

wg.Wait()
close(errChan)

if len(errChan) > 0 {
return nil, fmt.Errorf("failed to read files: %v", <-errChan)
}

return data, nil
}
Loading
Loading