Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data insertion logic #89

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 23 additions & 88 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -57,7 +56,7 @@ func (c *Committer) Start() {
log.Error().Err(err).Msg("Error getting block data to commit")
continue
}
if len(blockDataToCommit) == 0 {
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
log.Debug().Msg("No block data to commit")
continue
}
Expand Down Expand Up @@ -95,7 +94,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
return blockNumbers, nil
}

func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit()
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
Expand All @@ -104,129 +103,65 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
return nil, nil
}

blocksData, err := c.storage.StagingStorage.GetBlockData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
if len(blocksData) == 0 {
if blocksData == nil || len(*blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
return nil, nil
}

// Sort blocks by block number
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
sort.Slice(*blocksData, func(i, j int) bool {
return (*blocksData)[i].Block.Number.Cmp((*blocksData)[j].Block.Number) < 0
})

if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
}

var sequentialBlockData []common.BlockData
sequentialBlockData = append(sequentialBlockData, blocksData[0])
expectedBlockNumber := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
sequentialBlockData = append(sequentialBlockData, (*blocksData)[0])
expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1))

for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
for i := 1; i < len(*blocksData); i++ {
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
// Note: Gap detected, stop here
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
metrics.MissedBlockNumbers.Set(float64((*blocksData)[0].Block.Number.Int64()))
break
}
sequentialBlockData = append(sequentialBlockData, blocksData[i])
sequentialBlockData = append(sequentialBlockData, (*blocksData)[i])
expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1))
}

return sequentialBlockData, nil
return &sequentialBlockData, nil
}

func (c *Committer) commit(blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
for i, block := range blockData {
func (c *Committer) commit(blockData *[]common.BlockData) error {
blockNumbers := make([]*big.Int, len(*blockData))
for i, block := range *blockData {
blockNumbers[i] = block.Block.Number
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
if err := c.saveDataToMainStorage(blockData); err != nil {
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
return fmt.Errorf("error saving data to main storage: %v", err)
}

if err := c.storage.StagingStorage.DeleteBlockData(blockData); err != nil {
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
return fmt.Errorf("error deleting data from staging storage: %v", err)
}

// Update metrics for successful commits
metrics.SuccessfulCommits.Add(float64(len(blockData)))
metrics.LastCommittedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))

return nil
}

func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
var commitWg sync.WaitGroup
commitWg.Add(4)

var commitErr error
var commitErrMutex sync.Mutex

blocks := make([]common.Block, 0, len(blockData))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, block := range blockData {
blocks = append(blocks, block.Block)
logs = append(logs, block.Logs...)
transactions = append(transactions, block.Transactions...)
traces = append(traces, block.Traces...)
}

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting blocks: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertLogs(logs); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting logs: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting transactions: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTraces(traces); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting traces: %v", err)
commitErrMutex.Unlock()
}
}()

commitWg.Wait()

if commitErr != nil {
return commitErr
}
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64()))

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
}
}
if err := fr.storage.StagingStorage.InsertBlockData(successfulResults); err != nil {
if err := fr.storage.StagingStorage.InsertStagingData(successfulResults); err != nil {
log.Error().Err(fmt.Errorf("error inserting block data in failure recoverer: %v", err))
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
Traces: result.Data.Traces,
})
}
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
e := fmt.Errorf("error inserting block data: %v", err)
log.Error().Err(e)
for _, result := range successfulResults {
Expand Down
101 changes: 88 additions & 13 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -60,7 +61,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
return conn, nil
}

func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error {
query := `
INSERT INTO ` + c.cfg.Database + `.blocks (
chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce,
Expand All @@ -73,7 +74,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
if err != nil {
return err
}
for _, block := range blocks {
for _, block := range *blocks {
err := batch.Append(
block.ChainId,
block.Number,
Expand Down Expand Up @@ -105,7 +106,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error {
func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error {
query := `
INSERT INTO ` + c.cfg.Database + `.transactions (
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
Expand All @@ -117,7 +118,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
if err != nil {
return err
}
for _, tx := range txs {
for _, tx := range *txs {
err := batch.Append(
tx.ChainId,
tx.Hash,
Expand Down Expand Up @@ -147,7 +148,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
return batch.Send()
}

func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
query := `
INSERT INTO ` + c.cfg.Database + `.logs (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
Expand All @@ -158,7 +159,7 @@ func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
if err != nil {
return err
}
for _, log := range logs {
for _, log := range *logs {
err := batch.Append(
log.ChainId,
log.BlockNumber,
Expand Down Expand Up @@ -585,7 +586,7 @@ func getBlockNumbersStringArray(blockNumbers []*big.Int) string {
return blockNumbersString
}

func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
query := `INSERT INTO ` + c.cfg.Database + `.block_data (chain_id, block_number, data)`
batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
Expand All @@ -608,7 +609,7 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []common.BlockData, err error) {
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) {
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))

Expand Down Expand Up @@ -638,12 +639,12 @@ func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []comm
if err != nil {
return nil, err
}
blockDataList = append(blockDataList, blockData)
*blockDataList = append(*blockDataList, blockData)
}
return blockDataList, nil
}

func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error {
query := fmt.Sprintf(`
INSERT INTO %s.block_data (
chain_id, block_number, is_deleted
Expand All @@ -655,7 +656,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
return err
}

for _, blockData := range data {
for _, blockData := range *data {
err := batch.Append(
blockData.Block.ChainId,
blockData.Block.Number,
Expand All @@ -668,7 +669,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error {
query := `
INSERT INTO ` + c.cfg.Database + `.traces (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
Expand All @@ -680,7 +681,7 @@ func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
if err != nil {
return err
}
for _, trace := range traces {
for _, trace := range *traces {
err = batch.Append(
trace.ChainID,
trace.BlockNumber,
Expand Down Expand Up @@ -761,3 +762,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
}
return traces, nil
}

// TODO make this atomic
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, blockData := range *data {
blocks = append(blocks, blockData.Block)
logs = append(logs, blockData.Logs...)
transactions = append(transactions, blockData.Transactions...)
traces = append(traces, blockData.Traces...)
AmineAfia marked this conversation as resolved.
Show resolved Hide resolved
}

var saveErr error
var saveErrMutex sync.Mutex
var wg sync.WaitGroup

if len(blocks) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertBlocks(&blocks); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(logs) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertLogs(&logs); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(transactions) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTransactions(&transactions); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(traces) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTraces(&traces); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErrMutex.Unlock()
}
}()
}

wg.Wait()

if saveErr != nil {
return saveErr
}
return nil
}
Loading
Loading