Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data insertion logic #89

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
change main storage interface to handle inserts all at once
iuwqyir committed Oct 10, 2024

Verified

This commit was signed with the committer’s verified signature.
iuwqyir Toomas Oosalu
commit 0f12a69ea86380b7c79d089a564aa15f7a172536
111 changes: 23 additions & 88 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/rs/zerolog/log"
@@ -57,7 +56,7 @@ func (c *Committer) Start() {
log.Error().Err(err).Msg("Error getting block data to commit")
continue
}
if len(blockDataToCommit) == 0 {
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
log.Debug().Msg("No block data to commit")
continue
}
@@ -95,7 +94,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
return blockNumbers, nil
}

func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit()
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
@@ -104,129 +103,65 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
return nil, nil
}

blocksData, err := c.storage.StagingStorage.GetBlockData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
if len(blocksData) == 0 {
if blocksData == nil || len(*blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
return nil, nil
}

// Sort blocks by block number
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
sort.Slice(*blocksData, func(i, j int) bool {
return (*blocksData)[i].Block.Number.Cmp((*blocksData)[j].Block.Number) < 0
})

if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
}

var sequentialBlockData []common.BlockData
sequentialBlockData = append(sequentialBlockData, blocksData[0])
expectedBlockNumber := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
sequentialBlockData = append(sequentialBlockData, (*blocksData)[0])
expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1))

for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
for i := 1; i < len(*blocksData); i++ {
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
// Note: Gap detected, stop here
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
metrics.MissedBlockNumbers.Set(float64((*blocksData)[0].Block.Number.Int64()))
break
}
sequentialBlockData = append(sequentialBlockData, blocksData[i])
sequentialBlockData = append(sequentialBlockData, (*blocksData)[i])
expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1))
}

return sequentialBlockData, nil
return &sequentialBlockData, nil
}

func (c *Committer) commit(blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
for i, block := range blockData {
func (c *Committer) commit(blockData *[]common.BlockData) error {
blockNumbers := make([]*big.Int, len(*blockData))
for i, block := range *blockData {
blockNumbers[i] = block.Block.Number
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
if err := c.saveDataToMainStorage(blockData); err != nil {
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
return fmt.Errorf("error saving data to main storage: %v", err)
}

if err := c.storage.StagingStorage.DeleteBlockData(blockData); err != nil {
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
return fmt.Errorf("error deleting data from staging storage: %v", err)
}

// Update metrics for successful commits
metrics.SuccessfulCommits.Add(float64(len(blockData)))
metrics.LastCommittedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))

return nil
}

func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
var commitWg sync.WaitGroup
commitWg.Add(4)

var commitErr error
var commitErrMutex sync.Mutex

blocks := make([]common.Block, 0, len(blockData))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, block := range blockData {
blocks = append(blocks, block.Block)
logs = append(logs, block.Logs...)
transactions = append(transactions, block.Transactions...)
traces = append(traces, block.Traces...)
}

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting blocks: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertLogs(logs); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting logs: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting transactions: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTraces(traces); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting traces: %v", err)
commitErrMutex.Unlock()
}
}()

commitWg.Wait()

if commitErr != nil {
return commitErr
}
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64()))

return nil
}
2 changes: 1 addition & 1 deletion internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
@@ -114,7 +114,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
}
}
if err := fr.storage.StagingStorage.InsertBlockData(successfulResults); err != nil {
if err := fr.storage.StagingStorage.InsertStagingData(successfulResults); err != nil {
log.Error().Err(fmt.Errorf("error inserting block data in failure recoverer: %v", err))
return
}
2 changes: 1 addition & 1 deletion internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
@@ -192,7 +192,7 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
Traces: result.Data.Traces,
})
}
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
e := fmt.Errorf("error inserting block data: %v", err)
log.Error().Err(e)
for _, result := range successfulResults {
101 changes: 88 additions & 13 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
@@ -60,7 +61,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
return conn, nil
}

func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error {
query := `
INSERT INTO ` + c.cfg.Database + `.blocks (
chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce,
@@ -73,7 +74,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
if err != nil {
return err
}
for _, block := range blocks {
for _, block := range *blocks {
err := batch.Append(
block.ChainId,
block.Number,
@@ -105,7 +106,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error {
func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error {
query := `
INSERT INTO ` + c.cfg.Database + `.transactions (
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
@@ -117,7 +118,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
if err != nil {
return err
}
for _, tx := range txs {
for _, tx := range *txs {
err := batch.Append(
tx.ChainId,
tx.Hash,
@@ -147,7 +148,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
return batch.Send()
}

func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
query := `
INSERT INTO ` + c.cfg.Database + `.logs (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -158,7 +159,7 @@ func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
if err != nil {
return err
}
for _, log := range logs {
for _, log := range *logs {
err := batch.Append(
log.ChainId,
log.BlockNumber,
@@ -585,7 +586,7 @@ func getBlockNumbersStringArray(blockNumbers []*big.Int) string {
return blockNumbersString
}

func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
query := `INSERT INTO ` + c.cfg.Database + `.block_data (chain_id, block_number, data)`
batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
@@ -608,7 +609,7 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []common.BlockData, err error) {
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) {
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))

@@ -638,12 +639,12 @@ func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []comm
if err != nil {
return nil, err
}
blockDataList = append(blockDataList, blockData)
*blockDataList = append(*blockDataList, blockData)
}
return blockDataList, nil
}

func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error {
query := fmt.Sprintf(`
INSERT INTO %s.block_data (
chain_id, block_number, is_deleted
@@ -655,7 +656,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
return err
}

for _, blockData := range data {
for _, blockData := range *data {
err := batch.Append(
blockData.Block.ChainId,
blockData.Block.Number,
@@ -668,7 +669,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error {
query := `
INSERT INTO ` + c.cfg.Database + `.traces (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -680,7 +681,7 @@ func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
if err != nil {
return err
}
for _, trace := range traces {
for _, trace := range *traces {
err = batch.Append(
trace.ChainID,
trace.BlockNumber,
@@ -761,3 +762,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
}
return traces, nil
}

// TODO make this atomic
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, blockData := range *data {
blocks = append(blocks, blockData.Block)
logs = append(logs, blockData.Logs...)
transactions = append(transactions, blockData.Transactions...)
traces = append(traces, blockData.Traces...)
AmineAfia marked this conversation as resolved.
Show resolved Hide resolved
}

var saveErr error
var saveErrMutex sync.Mutex
var wg sync.WaitGroup

if len(blocks) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertBlocks(&blocks); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(logs) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertLogs(&logs); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(transactions) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTransactions(&transactions); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(traces) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTraces(&traces); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErrMutex.Unlock()
}
}()
}

wg.Wait()

if saveErr != nil {
return saveErr
}
return nil
}
13 changes: 5 additions & 8 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
@@ -41,19 +41,16 @@ type IOrchestratorStorage interface {
}

type IStagingStorage interface {
InsertBlockData(data []common.BlockData) error
GetBlockData(qf QueryFilter) (data []common.BlockData, err error)
DeleteBlockData(data []common.BlockData) error
InsertStagingData(data []common.BlockData) error
GetStagingData(qf QueryFilter) (data *[]common.BlockData, err error)
DeleteStagingData(data *[]common.BlockData) error
GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
}

type IMainStorage interface {
InsertBlocks(blocks []common.Block) error
InsertTransactions(txs []common.Transaction) error
InsertLogs(logs []common.Log) error
InsertTraces(traces []common.Trace) error
InsertBlockData(data *[]common.BlockData) error

GetBlocks(qf QueryFilter) (logs []common.Block, err error)
GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
54 changes: 41 additions & 13 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
@@ -73,8 +73,8 @@ func (m *MemoryConnector) DeleteBlockFailures(failures []common.BlockFailure) er
return nil
}

func (m *MemoryConnector) InsertBlocks(blocks []common.Block) error {
for _, block := range blocks {
func (m *MemoryConnector) insertBlocks(blocks *[]common.Block) error {
for _, block := range *blocks {
blockJson, err := json.Marshal(block)
if err != nil {
return err
@@ -109,8 +109,8 @@ func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) {
return blocks, nil
}

func (m *MemoryConnector) InsertTransactions(txs []common.Transaction) error {
for _, tx := range txs {
func (m *MemoryConnector) insertTransactions(txs *[]common.Transaction) error {
for _, tx := range *txs {
txJson, err := json.Marshal(tx)
if err != nil {
return err
@@ -143,8 +143,8 @@ func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction,
return txs, nil
}

func (m *MemoryConnector) InsertLogs(logs []common.Log) error {
for _, log := range logs {
func (m *MemoryConnector) insertLogs(logs *[]common.Log) error {
for _, log := range *logs {
logJson, err := json.Marshal(log)
if err != nil {
return err
@@ -251,7 +251,7 @@ func getBlockNumbersToCheck(qf QueryFilter) map[string]uint8 {
return blockNumbersToCheck
}

func (m *MemoryConnector) InsertBlockData(data []common.BlockData) error {
func (m *MemoryConnector) InsertStagingData(data []common.BlockData) error {
for _, blockData := range data {
dataJson, err := json.Marshal(blockData)
if err != nil {
@@ -262,7 +262,7 @@ func (m *MemoryConnector) InsertBlockData(data []common.BlockData) error {
return nil
}

func (m *MemoryConnector) GetBlockData(qf QueryFilter) ([]common.BlockData, error) {
func (m *MemoryConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) {
blockData := []common.BlockData{}
limit := getLimit(qf)
blockNumbersToCheck := getBlockNumbersToCheck(qf)
@@ -283,19 +283,19 @@ func (m *MemoryConnector) GetBlockData(qf QueryFilter) ([]common.BlockData, erro
}
}
}
return blockData, nil
return &blockData, nil
}

func (m *MemoryConnector) DeleteBlockData(data []common.BlockData) error {
for _, blockData := range data {
func (m *MemoryConnector) DeleteStagingData(data *[]common.BlockData) error {
for _, blockData := range *data {
key := fmt.Sprintf("blockData:%s:%s", blockData.Block.ChainId.String(), blockData.Block.Number.String())
m.cache.Remove(key)
}
return nil
}

func (m *MemoryConnector) InsertTraces(traces []common.Trace) error {
for _, trace := range traces {
func (m *MemoryConnector) insertTraces(traces *[]common.Trace) error {
for _, trace := range *traces {
traceJson, err := json.Marshal(trace)
if err != nil {
return err
@@ -331,3 +331,31 @@ func (m *MemoryConnector) GetTraces(qf QueryFilter) ([]common.Trace, error) {
func traceAddressToString(traceAddress []uint64) string {
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
}

func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, blockData := range *data {
blocks = append(blocks, blockData.Block)
logs = append(logs, blockData.Logs...)
transactions = append(transactions, blockData.Transactions...)
traces = append(traces, blockData.Traces...)
}

if err := m.insertBlocks(&blocks); err != nil {
return err
}
if err := m.insertLogs(&logs); err != nil {
return err
}
if err := m.insertTransactions(&transactions); err != nil {
return err
}
if err := m.insertTraces(&traces); err != nil {
return err
}
return nil
}