From ea44e7342f8ad5d5f21f10c123b62f1c2bfa3482 Mon Sep 17 00:00:00 2001 From: Toomas Oosalu Date: Fri, 20 Sep 2024 13:06:17 +0300 Subject: [PATCH] Implement committer logic for block processing (#22) ### TL;DR Implemented the commit functionality in the Commiter struct, enabling the transfer of data from staging to main storage. ### What changed? - Added methods to fetch sequential blocks from staging storage - Implemented commit logic to move data from staging to main storage - Added concurrent processing for fetching and saving data - Introduced error handling and logging for commit operations - Updated storage interfaces to include delete operations - Modified MemoryConnector to support delete operations --- internal/orchestrator/commiter.go | 201 +++++++++++++++++++++++++- internal/orchestrator/orchestrator.go | 21 +-- internal/storage/clickhouse.go | 12 ++ internal/storage/connector.go | 4 + internal/storage/memory.go | 26 +++- 5 files changed, 237 insertions(+), 27 deletions(-) diff --git a/internal/orchestrator/commiter.go b/internal/orchestrator/commiter.go index 8997ee9..6bf4aab 100644 --- a/internal/orchestrator/commiter.go +++ b/internal/orchestrator/commiter.go @@ -4,9 +4,12 @@ import ( "fmt" "log" "os" + "sort" "strconv" + "sync" "time" + "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/storage" ) @@ -42,13 +45,203 @@ func (c *Commiter) Start() { go func() { for t := range ticker.C { fmt.Println("Commiter running at", t) - // TODO: fetch max block number from main table - // TODO: fetch sequential block numbers from staging table - // TODO: save to main table - // TODO: delete from staging table + blocksToCommit, err := c.getSequentialBlocksToCommit() + if err != nil { + log.Printf("Error getting blocks to commit: %v", err) + continue + } + if len(blocksToCommit) == 0 { + log.Println("No blocks to commit") + continue + } + if err := c.commit(blocksToCommit); err != nil { + log.Printf("Error committing blocks: %v", err) + } } }() // Keep the program running (otherwise it will exit) select {} } + +func (c *Commiter) getBlockNumbersToCommit() ([]uint64, error) { + maxBlockNumber, err := c.storage.DBMainStorage.GetMaxBlockNumber() + if err != nil { + return nil, err + } + startBlock := maxBlockNumber + 1 + endBlock := maxBlockNumber + uint64(c.blocksPerCommit) + var blockNumbers []uint64 + for i := startBlock; i <= endBlock; i++ { + blockNumbers = append(blockNumbers, i) + } + return blockNumbers, nil +} + +func (c *Commiter) getSequentialBlocksToCommit() ([]common.Block, error) { + blocksToCommit, err := c.getBlockNumbersToCommit() + if err != nil { + return nil, fmt.Errorf("error determining blocks to commit: %v", err) + } + blocks, err := c.storage.DBStagingStorage.GetBlocks(storage.QueryFilter{BlockNumbers: blocksToCommit}) + if err != nil { + return nil, fmt.Errorf("error fetching blocks to commit: %v", err) + } + if len(blocks) == 0 { + return nil, nil + } + + // Sort blocks by block number + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Number < blocks[j].Number + }) + + var sequentialBlocks []common.Block + expectedBlockNumber := blocks[0].Number + + for _, block := range blocks { + if block.Number != expectedBlockNumber { + // Gap detected, stop here + break + } + sequentialBlocks = append(sequentialBlocks, block) + expectedBlockNumber++ + } + + return sequentialBlocks, nil +} + +func (c *Commiter) commit(blocks []common.Block) error { + blockNumbers := make([]uint64, len(blocks)) + for i, block := range blocks { + blockNumbers[i] = block.Number + } + + logs, transactions, err := c.getStagingDataForBlocks(blockNumbers) + if err != nil { + return fmt.Errorf("error fetching staging data: %v", err) + } + + if err := c.saveDataToMainStorage(blocks, logs, transactions); err != nil { + return fmt.Errorf("error saving data to main storage: %v", err) + } + + if err := c.deleteDataFromStagingStorage(blocks, logs, transactions); err != nil { + return fmt.Errorf("error deleting data from staging storage: %v", err) + } + + return nil +} + +func (c *Commiter) getStagingDataForBlocks(blockNumbers []uint64) (logs []common.Log, transactions []common.Transaction, err error) { + var wg sync.WaitGroup + wg.Add(2) + + var logErr, txErr error + + go func() { + defer wg.Done() + logs, logErr = c.storage.DBStagingStorage.GetEvents(storage.QueryFilter{BlockNumbers: blockNumbers}) + }() + + go func() { + defer wg.Done() + transactions, txErr = c.storage.DBStagingStorage.GetTransactions(storage.QueryFilter{BlockNumbers: blockNumbers}) + }() + + wg.Wait() + + if logErr != nil { + return nil, nil, fmt.Errorf("error fetching logs: %v", logErr) + } + if txErr != nil { + return nil, nil, fmt.Errorf("error fetching transactions: %v", txErr) + } + + return logs, transactions, nil +} + +func (c *Commiter) saveDataToMainStorage(blocks []common.Block, logs []common.Log, transactions []common.Transaction) error { + var commitWg sync.WaitGroup + commitWg.Add(3) + + var commitErr error + var commitErrMutex sync.Mutex + + go func() { + defer commitWg.Done() + if err := c.storage.DBMainStorage.InsertBlocks(blocks); err != nil { + commitErrMutex.Lock() + commitErr = fmt.Errorf("error inserting blocks: %v", err) + commitErrMutex.Unlock() + } + }() + + go func() { + defer commitWg.Done() + if err := c.storage.DBMainStorage.InsertEvents(logs); err != nil { + commitErrMutex.Lock() + commitErr = fmt.Errorf("error inserting logs: %v", err) + commitErrMutex.Unlock() + } + }() + + go func() { + defer commitWg.Done() + if err := c.storage.DBMainStorage.InsertTransactions(transactions); err != nil { + commitErrMutex.Lock() + commitErr = fmt.Errorf("error inserting transactions: %v", err) + commitErrMutex.Unlock() + } + }() + + commitWg.Wait() + + if commitErr != nil { + return commitErr + } + + return nil +} + +func (c *Commiter) deleteDataFromStagingStorage(blocks []common.Block, logs []common.Log, transactions []common.Transaction) error { + var deleteWg sync.WaitGroup + deleteWg.Add(3) + + var deleteErr error + var deleteErrMutex sync.Mutex + + go func() { + defer deleteWg.Done() + if err := c.storage.DBStagingStorage.DeleteBlocks(blocks); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting blocks from staging: %v", err) + deleteErrMutex.Unlock() + } + }() + + go func() { + defer deleteWg.Done() + if err := c.storage.DBStagingStorage.DeleteTransactions(transactions); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting transactions from staging: %v", err) + deleteErrMutex.Unlock() + } + }() + + go func() { + defer deleteWg.Done() + if err := c.storage.DBStagingStorage.DeleteEvents(logs); err != nil { + deleteErrMutex.Lock() + deleteErr = fmt.Errorf("error deleting logs from staging: %v", err) + deleteErrMutex.Unlock() + } + }() + + deleteWg.Wait() + + if deleteErr != nil { + return deleteErr + } + return nil +} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 0a10ae4..d99bd09 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -18,24 +18,9 @@ type Orchestrator struct { func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) { storage, err := storage.NewStorageConnector(&storage.StorageConfig{ - Orchestrator: storage.ConnectorConfig{ - Driver: "memory", - Memory: &storage.MemoryConnectorConfig{ - Prefix: "orchestrator", - }, - }, - Main: storage.ConnectorConfig{ - Driver: "memory", - Memory: &storage.MemoryConnectorConfig{ - Prefix: "main", - }, - }, - Staging: storage.ConnectorConfig{ - Driver: "memory", - Memory: &storage.MemoryConnectorConfig{ - Prefix: "staging", - }, - }, + Orchestrator: storage.ConnectorConfig{Driver: "memory"}, + Main: storage.ConnectorConfig{Driver: "memory"}, + Staging: storage.ConnectorConfig{Driver: "memory"}, }) if err != nil { return nil, err diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 6e44ba7..e595515 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -138,3 +138,15 @@ func (c *ClickHouseConnector) StoreLatestPolledBlockNumber(blockNumber uint64) e func (c *ClickHouseConnector) GetLatestPolledBlockNumber() (blockNumber uint64, err error) { return 0, nil } + +func (c *ClickHouseConnector) DeleteBlocks(blocks []common.Block) error { + return nil +} + +func (c *ClickHouseConnector) DeleteTransactions(txs []common.Transaction) error { + return nil +} + +func (c *ClickHouseConnector) DeleteEvents(events []common.Log) error { + return nil +} diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 01e0559..3f5b297 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -48,6 +48,10 @@ type IDBStorage interface { GetTransactions(qf QueryFilter) (events []common.Transaction, err error) GetEvents(qf QueryFilter) (events []common.Log, err error) GetMaxBlockNumber() (maxBlockNumber uint64, err error) + + DeleteBlocks(blocks []common.Block) error + DeleteTransactions(txs []common.Transaction) error + DeleteEvents(events []common.Log) error } func NewStorageConnector(cfg *StorageConfig) (IStorage, error) { diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 2eb4d4a..825bd61 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -13,7 +13,6 @@ import ( type MemoryConnectorConfig struct { MaxItems int - Prefix string } type MemoryConnector struct { @@ -21,10 +20,6 @@ type MemoryConnector struct { } func NewMemoryConnector(cfg *MemoryConnectorConfig) (*MemoryConnector, error) { - if cfg != nil && cfg.MaxItems <= 0 { - return nil, fmt.Errorf("maxItems must be greater than 0") - } - maxItems := 1000 if cfg != nil && cfg.MaxItems > 0 { maxItems = cfg.MaxItems @@ -239,3 +234,24 @@ func getBlockNumbersToCheck(qf QueryFilter) map[uint64]uint8 { } return blockNumbersToCheck } + +func (m *MemoryConnector) DeleteBlocks(blocks []common.Block) error { + for _, block := range blocks { + m.cache.Remove(fmt.Sprintf("block:%d", block.Number)) + } + return nil +} + +func (m *MemoryConnector) DeleteTransactions(txs []common.Transaction) error { + for _, tx := range txs { + m.cache.Remove(fmt.Sprintf("transaction:%s", tx.Hash)) + } + return nil +} + +func (m *MemoryConnector) DeleteEvents(events []common.Log) error { + for _, event := range events { + m.cache.Remove(fmt.Sprintf("event:%s-%d", event.TransactionHash, event.Index)) + } + return nil +}