Skip to content

Commit

Permalink
Merge pull request #91 from G7DAO/remove-rpc-mode
Browse files Browse the repository at this point in the history
Remove RPC mode.
  • Loading branch information
Andrei-Dolgolev authored Oct 10, 2024
2 parents bdf57f5 + 0fb1cbd commit fa016e6
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 142 deletions.
2 changes: 1 addition & 1 deletion crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *Crawler) Start(threads int) {

// If Start block is not set, using last crawled block from indexes database
if c.startBlock == 0 {
latestIndexedBlock, latestErr := indexer.DBConnection.GetLatestDBBlockNumber(c.blockchain)
latestIndexedBlock, latestErr := indexer.DBConnection.GetLatestDBBlockNumber(c.blockchain, false)

// If there are no rows in result then set startBlock with shift
if latestErr != nil {
Expand Down
10 changes: 8 additions & 2 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (p *PostgreSQLpgx) GetEdgeDBBlock(ctx context.Context, blockchain, side str
return blockIndex, nil
}

func (p *PostgreSQLpgx) GetLatestDBBlockNumber(blockchain string) (uint64, error) {
func (p *PostgreSQLpgx) GetLatestDBBlockNumber(blockchain string, reverse ...bool) (uint64, error) {

pool := p.GetPool()

Expand All @@ -457,7 +457,13 @@ func (p *PostgreSQLpgx) GetLatestDBBlockNumber(blockchain string) (uint64, error
var blockNumber uint64

blocksTableName := BlocksTableName(blockchain)
query := fmt.Sprintf("SELECT block_number FROM %s ORDER BY block_number DESC LIMIT 1", blocksTableName)
// Check if reverse is provided, if not, default to false (DESC order)
orderDirection := "DESC"
if len(reverse) > 0 && reverse[0] {
orderDirection = "ASC"
}

query := fmt.Sprintf("SELECT block_number FROM %s ORDER BY block_number %s LIMIT 1", blocksTableName, orderDirection)

err = conn.QueryRow(context.Background(), query).Scan(&blockNumber)
if err != nil {
Expand Down
174 changes: 36 additions & 138 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

seer_blockchain "github.com/G7DAO/seer/blockchain"
"github.com/G7DAO/seer/blockchain/common"
"github.com/G7DAO/seer/crawler"
"github.com/G7DAO/seer/indexer"
"github.com/G7DAO/seer/storage"
Expand Down Expand Up @@ -317,7 +316,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
}

// Get the latest block from the indexer db
indexedLatestBlock, idxLatestErr := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain)
indexedLatestBlock, idxLatestErr := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain, false)
if idxLatestErr != nil {
return isEnd, idxLatestErr
}
Expand Down Expand Up @@ -406,22 +405,27 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
}

func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []string, customerIds []string, batchSize uint64, autoJobs bool) error {
var useRPC bool
var isCycleFinished bool
var updateDeadline time.Time
var initialStartBlock uint64

// Initialize start block if 0
if d.startBlock == 0 {
// Get the latest block from the indexer db
indexedLatestBlock, err := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain)
indexedLatestBlock, err := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain, false)
if err != nil {
return fmt.Errorf("error getting latest block number: %w", err)
}
d.startBlock = indexedLatestBlock
fmt.Printf("Start block is %d\n", d.startBlock)
}

earlyIndexedBlock, err := indexer.DBConnection.GetLatestDBBlockNumber(d.blockchain, true)

if err != nil {
return fmt.Errorf("error getting early indexer block: %w", err)
}

// Automatically update ABI jobs as active if auto mode is enabled
if autoJobs {
if err := indexer.DBConnection.UpdateAbiJobsStatus(d.blockchain); err != nil {
Expand All @@ -442,15 +446,27 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s
fmt.Printf("Found %d customer updates\n", len(customerUpdates))

// Filter out blocks more
// TODO: Maybe autoJobs only
for address, abisInfo := range addressesAbisInfo {
log.Printf("Address %s has deployed block %d\n", address, abisInfo.DeployedBlockNumber)
if abisInfo.DeployedBlockNumber > d.startBlock {
log.Printf("Finished crawling for address %s at block %d\n", address, abisInfo.DeployedBlockNumber)
delete(addressesAbisInfo, address)
if autoJobs {
for address, abisInfo := range addressesAbisInfo {
log.Printf("Address %s has deployed block %d\n", address, abisInfo.DeployedBlockNumber)

if abisInfo.DeployedBlockNumber > d.startBlock {
log.Printf("Finished crawling for address %s at block %d\n", address, abisInfo.DeployedBlockNumber)
delete(addressesAbisInfo, address)
}

if abisInfo.DeployedBlockNumber < earlyIndexedBlock {
log.Printf("Address %s has deployed block %d less than early indexed block %d\n", address, abisInfo.DeployedBlockNumber, earlyIndexedBlock)
delete(addressesAbisInfo, address)
}
}
}

if len(addressesAbisInfo) == 0 {
log.Println("No addresses to crawl")
return nil
}

// Get customer database connections
customerIdsMap := make(map[string]bool)
for _, update := range customerUpdates {
Expand Down Expand Up @@ -519,40 +535,27 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s
// Determine the processing strategy (RPC or storage)
var path string
var firstBlockOfChunk uint64
if !useRPC {

for {
path, firstBlockOfChunk, _, err = indexer.DBConnection.FindBatchPath(d.blockchain, d.startBlock)
if err != nil {
return fmt.Errorf("error finding batch path: %w", err)
}

if path == "" {
log.Printf("No batch path found for block %d\n", d.startBlock)
log.Println("Switching to RPC mode")
useRPC = true
} else {
if path != "" {
d.endBlock = firstBlockOfChunk
}
}

if useRPC {
// Calculate the end block
if d.startBlock > batchSize {
d.endBlock = d.startBlock - batchSize
} else {
d.endBlock = 1 // Prevent underflow by setting endBlock to 1 if startBlock < batchSize
break
}

fmt.Printf("Start block is %d, end block is %d\n", d.startBlock, d.endBlock)
log.Printf("No batch path found for block %d, retrying...\n", d.startBlock)
time.Sleep(30 * time.Second) // Wait for 5 seconds before retrying (adjust the duration as needed)
}

// Read raw data from storage or via RPC
var rawData bytes.Buffer
if !useRPC {
rawData, err = d.StorageInstance.Read(path)
if err != nil {
return fmt.Errorf("error reading events from storage: %w", err)
}
rawData, err = d.StorageInstance.Read(path)
if err != nil {
return fmt.Errorf("error reading events from storage: %w", err)
}

log.Printf("Processing %d customer updates for block range %d-%d", len(customerUpdates), d.startBlock, d.endBlock)
Expand All @@ -564,11 +567,8 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s

for _, update := range customerUpdates {
wg.Add(1)
if useRPC {
go d.processRPCCustomerUpdate(update, customerDBConnections, sem, errChan, &wg)
} else {
go d.processProtoCustomerUpdate(update, rawData, customerDBConnections, sem, errChan, &wg)
}
go d.processProtoCustomerUpdate(update, rawData, customerDBConnections, sem, errChan, &wg)

}

wg.Wait()
Expand Down Expand Up @@ -650,105 +650,3 @@ func (d *Synchronizer) processProtoCustomerUpdate(

<-sem // Release semaphore
}

func (d *Synchronizer) processRPCCustomerUpdate(
update indexer.CustomerUpdates,
customerDBConnections map[string]CustomerDBConnection,
sem chan struct{},
errChan chan error,
wg *sync.WaitGroup,
) {
// Decode input raw proto data using ABIs
// Write decoded data to the user Database

defer wg.Done()
sem <- struct{}{} // Acquire semaphore

customer, exists := customerDBConnections[update.CustomerID]
if !exists {
errChan <- fmt.Errorf("no DB connection for customer %s", update.CustomerID)
<-sem // Release semaphore
return
}

conn, err := customer.Pgx.GetPool().Acquire(context.Background())
if err != nil {
errChan <- fmt.Errorf("error acquiring connection for customer %s: %w", update.CustomerID, err)
<-sem // Release semaphore
return
}
defer conn.Release()

// split abis by the type of the task

eventAbis := make(map[string]map[string]*indexer.AbiEntry)
transactionAbis := make(map[string]map[string]*indexer.AbiEntry)

for address, selectorMap := range update.Abis {

for selector, abiInfo := range selectorMap {

if abiInfo.AbiType == "event" {
if _, ok := eventAbis[address]; !ok {
eventAbis[address] = make(map[string]*indexer.AbiEntry)
}
eventAbis[address][selector] = abiInfo
} else {
if _, ok := transactionAbis[address]; !ok {
transactionAbis[address] = make(map[string]*indexer.AbiEntry)
}
transactionAbis[address][selector] = abiInfo
}
}

}

// Transactions

var transactions []indexer.TransactionLabel
var blocksCache map[uint64]common.BlockWithTransactions

if len(transactionAbis) != 0 {
transactions, blocksCache, err = d.Client.GetTransactionsLabels(d.endBlock, d.startBlock, transactionAbis, d.threads)

if err != nil {
log.Println("Error getting transactions for customer", update.CustomerID, ":", err)
errChan <- fmt.Errorf("error getting transactions for customer %s: %w", update.CustomerID, err)
<-sem // Release semaphore
return
}
} else {
transactions = make([]indexer.TransactionLabel, 0)
}

// Events

var events []indexer.EventLabel

if len(eventAbis) != 0 {

events, err = d.Client.GetEventsLabels(d.endBlock, d.startBlock, eventAbis, blocksCache)

if err != nil {

log.Println("Error getting events for customer", update.CustomerID, ":", err)

errChan <- fmt.Errorf("error getting events for customer %s: %w", update.CustomerID, err)
<-sem // Release semaphore
return

}
} else {
events = make([]indexer.EventLabel, 0)
}

err = customer.Pgx.WriteLabes(d.blockchain, transactions, events)

if err != nil {
errChan <- fmt.Errorf("error writing labels for customer %s: %w", update.CustomerID, err)
<-sem // Release semaphore
return
}

<-sem // Release semaphore
}
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package version

var SeerVersion string = "0.3.3"
var SeerVersion string = "0.3.4"

0 comments on commit fa016e6

Please sign in to comment.