Skip to content

Commit

Permalink
Implement committer logic for block processing (#22)
Browse files Browse the repository at this point in the history
### TL;DR

Implemented the commit functionality in the Commiter struct, enabling the transfer of data from staging to main storage.

### What changed?

- Added methods to fetch sequential blocks from staging storage
- Implemented commit logic to move data from staging to main storage
- Added concurrent processing for fetching and saving data
- Introduced error handling and logging for commit operations
- Updated storage interfaces to include delete operations
- Modified MemoryConnector to support delete operations
  • Loading branch information
iuwqyir authored Sep 20, 2024
1 parent dee9843 commit ea44e73
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 27 deletions.
201 changes: 197 additions & 4 deletions internal/orchestrator/commiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"fmt"
"log"
"os"
"sort"
"strconv"
"sync"
"time"

"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/storage"
)

Expand Down Expand Up @@ -42,13 +45,203 @@ func (c *Commiter) Start() {
go func() {
for t := range ticker.C {
fmt.Println("Commiter running at", t)
// TODO: fetch max block number from main table
// TODO: fetch sequential block numbers from staging table
// TODO: save to main table
// TODO: delete from staging table
blocksToCommit, err := c.getSequentialBlocksToCommit()
if err != nil {
log.Printf("Error getting blocks to commit: %v", err)
continue
}
if len(blocksToCommit) == 0 {
log.Println("No blocks to commit")
continue
}
if err := c.commit(blocksToCommit); err != nil {
log.Printf("Error committing blocks: %v", err)
}
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func (c *Commiter) getBlockNumbersToCommit() ([]uint64, error) {
maxBlockNumber, err := c.storage.DBMainStorage.GetMaxBlockNumber()
if err != nil {
return nil, err
}
startBlock := maxBlockNumber + 1
endBlock := maxBlockNumber + uint64(c.blocksPerCommit)
var blockNumbers []uint64
for i := startBlock; i <= endBlock; i++ {
blockNumbers = append(blockNumbers, i)
}
return blockNumbers, nil
}

func (c *Commiter) getSequentialBlocksToCommit() ([]common.Block, error) {
blocksToCommit, err := c.getBlockNumbersToCommit()
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
}
blocks, err := c.storage.DBStagingStorage.GetBlocks(storage.QueryFilter{BlockNumbers: blocksToCommit})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
if len(blocks) == 0 {
return nil, nil
}

// Sort blocks by block number
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Number < blocks[j].Number
})

var sequentialBlocks []common.Block
expectedBlockNumber := blocks[0].Number

for _, block := range blocks {
if block.Number != expectedBlockNumber {
// Gap detected, stop here
break
}
sequentialBlocks = append(sequentialBlocks, block)
expectedBlockNumber++
}

return sequentialBlocks, nil
}

func (c *Commiter) commit(blocks []common.Block) error {
blockNumbers := make([]uint64, len(blocks))
for i, block := range blocks {
blockNumbers[i] = block.Number
}

logs, transactions, err := c.getStagingDataForBlocks(blockNumbers)
if err != nil {
return fmt.Errorf("error fetching staging data: %v", err)
}

if err := c.saveDataToMainStorage(blocks, logs, transactions); err != nil {
return fmt.Errorf("error saving data to main storage: %v", err)
}

if err := c.deleteDataFromStagingStorage(blocks, logs, transactions); err != nil {
return fmt.Errorf("error deleting data from staging storage: %v", err)
}

return nil
}

func (c *Commiter) getStagingDataForBlocks(blockNumbers []uint64) (logs []common.Log, transactions []common.Transaction, err error) {
var wg sync.WaitGroup
wg.Add(2)

var logErr, txErr error

go func() {
defer wg.Done()
logs, logErr = c.storage.DBStagingStorage.GetEvents(storage.QueryFilter{BlockNumbers: blockNumbers})
}()

go func() {
defer wg.Done()
transactions, txErr = c.storage.DBStagingStorage.GetTransactions(storage.QueryFilter{BlockNumbers: blockNumbers})
}()

wg.Wait()

if logErr != nil {
return nil, nil, fmt.Errorf("error fetching logs: %v", logErr)
}
if txErr != nil {
return nil, nil, fmt.Errorf("error fetching transactions: %v", txErr)
}

return logs, transactions, nil
}

func (c *Commiter) saveDataToMainStorage(blocks []common.Block, logs []common.Log, transactions []common.Transaction) error {
var commitWg sync.WaitGroup
commitWg.Add(3)

var commitErr error
var commitErrMutex sync.Mutex

go func() {
defer commitWg.Done()
if err := c.storage.DBMainStorage.InsertBlocks(blocks); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting blocks: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.DBMainStorage.InsertEvents(logs); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting logs: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.DBMainStorage.InsertTransactions(transactions); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting transactions: %v", err)
commitErrMutex.Unlock()
}
}()

commitWg.Wait()

if commitErr != nil {
return commitErr
}

return nil
}

func (c *Commiter) deleteDataFromStagingStorage(blocks []common.Block, logs []common.Log, transactions []common.Transaction) error {
var deleteWg sync.WaitGroup
deleteWg.Add(3)

var deleteErr error
var deleteErrMutex sync.Mutex

go func() {
defer deleteWg.Done()
if err := c.storage.DBStagingStorage.DeleteBlocks(blocks); err != nil {
deleteErrMutex.Lock()
deleteErr = fmt.Errorf("error deleting blocks from staging: %v", err)
deleteErrMutex.Unlock()
}
}()

go func() {
defer deleteWg.Done()
if err := c.storage.DBStagingStorage.DeleteTransactions(transactions); err != nil {
deleteErrMutex.Lock()
deleteErr = fmt.Errorf("error deleting transactions from staging: %v", err)
deleteErrMutex.Unlock()
}
}()

go func() {
defer deleteWg.Done()
if err := c.storage.DBStagingStorage.DeleteEvents(logs); err != nil {
deleteErrMutex.Lock()
deleteErr = fmt.Errorf("error deleting logs from staging: %v", err)
deleteErrMutex.Unlock()
}
}()

deleteWg.Wait()

if deleteErr != nil {
return deleteErr
}
return nil
}
21 changes: 3 additions & 18 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,9 @@ type Orchestrator struct {

func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) {
storage, err := storage.NewStorageConnector(&storage.StorageConfig{
Orchestrator: storage.ConnectorConfig{
Driver: "memory",
Memory: &storage.MemoryConnectorConfig{
Prefix: "orchestrator",
},
},
Main: storage.ConnectorConfig{
Driver: "memory",
Memory: &storage.MemoryConnectorConfig{
Prefix: "main",
},
},
Staging: storage.ConnectorConfig{
Driver: "memory",
Memory: &storage.MemoryConnectorConfig{
Prefix: "staging",
},
},
Orchestrator: storage.ConnectorConfig{Driver: "memory"},
Main: storage.ConnectorConfig{Driver: "memory"},
Staging: storage.ConnectorConfig{Driver: "memory"},
})
if err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,15 @@ func (c *ClickHouseConnector) StoreLatestPolledBlockNumber(blockNumber uint64) e
func (c *ClickHouseConnector) GetLatestPolledBlockNumber() (blockNumber uint64, err error) {
return 0, nil
}

func (c *ClickHouseConnector) DeleteBlocks(blocks []common.Block) error {
return nil
}

func (c *ClickHouseConnector) DeleteTransactions(txs []common.Transaction) error {
return nil
}

func (c *ClickHouseConnector) DeleteEvents(events []common.Log) error {
return nil
}
4 changes: 4 additions & 0 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type IDBStorage interface {
GetTransactions(qf QueryFilter) (events []common.Transaction, err error)
GetEvents(qf QueryFilter) (events []common.Log, err error)
GetMaxBlockNumber() (maxBlockNumber uint64, err error)

DeleteBlocks(blocks []common.Block) error
DeleteTransactions(txs []common.Transaction) error
DeleteEvents(events []common.Log) error
}

func NewStorageConnector(cfg *StorageConfig) (IStorage, error) {
Expand Down
26 changes: 21 additions & 5 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,13 @@ import (

type MemoryConnectorConfig struct {
MaxItems int
Prefix string
}

type MemoryConnector struct {
cache *lru.Cache[string, string]
}

func NewMemoryConnector(cfg *MemoryConnectorConfig) (*MemoryConnector, error) {
if cfg != nil && cfg.MaxItems <= 0 {
return nil, fmt.Errorf("maxItems must be greater than 0")
}

maxItems := 1000
if cfg != nil && cfg.MaxItems > 0 {
maxItems = cfg.MaxItems
Expand Down Expand Up @@ -239,3 +234,24 @@ func getBlockNumbersToCheck(qf QueryFilter) map[uint64]uint8 {
}
return blockNumbersToCheck
}

func (m *MemoryConnector) DeleteBlocks(blocks []common.Block) error {
for _, block := range blocks {
m.cache.Remove(fmt.Sprintf("block:%d", block.Number))
}
return nil
}

func (m *MemoryConnector) DeleteTransactions(txs []common.Transaction) error {
for _, tx := range txs {
m.cache.Remove(fmt.Sprintf("transaction:%s", tx.Hash))
}
return nil
}

func (m *MemoryConnector) DeleteEvents(events []common.Log) error {
for _, event := range events {
m.cache.Remove(fmt.Sprintf("event:%s-%d", event.TransactionHash, event.Index))
}
return nil
}

0 comments on commit ea44e73

Please sign in to comment.