Skip to content

Commit

Permalink
Create storage functions for handling reorgs (#90)
Browse files Browse the repository at this point in the history
### TL;DR

Added support for chain reorganization detection and handling in the storage layer.

### What changed?

- Introduced a new `BlockHeader` struct in `block.go`
- Added methods to `ClickHouseConnector` for managing reorg checks and block headers
- Implemented `GetLastReorgCheckedBlockNumber` and `SetLastReorgCheckedBlockNumber` in storage connectors
- Added `LookbackBlockHeaders` and `DeleteDataForBlocks` methods to `IMainStorage` interface
- Updated `MemoryConnector` to support new reorg-related operations
- Added reorg check functionality to `RedisConnector`
- Created a new SQL script for a `cursors` table in ClickHouse
- Modified the staging table creation script to remove the primary key
  • Loading branch information
iuwqyir authored Oct 10, 2024
2 parents 5f3c0f7 + 0f70f2e commit 80fcbb5
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 4 deletions.
6 changes: 6 additions & 0 deletions internal/common/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ type BlockData struct {
Traces []Trace
}

type BlockHeader struct {
Number *big.Int `json:"number"`
Hash string `json:"hash"`
ParentHash string `json:"parent_hash"`
}

type RawBlock = map[string]interface{}
117 changes: 114 additions & 3 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) {
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) {
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))

Expand All @@ -625,6 +625,7 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c
}
defer rows.Close()

blockDataList := make([]common.BlockData, 0)
for rows.Next() {
var blockDataJson string
err := rows.Scan(
Expand All @@ -639,9 +640,9 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c
if err != nil {
return nil, err
}
*blockDataList = append(*blockDataList, blockData)
blockDataList = append(blockDataList, blockData)
}
return blockDataList, nil
return &blockDataList, nil
}

func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error {
Expand Down Expand Up @@ -763,6 +764,116 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
return traces, nil
}

func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
var blockNumberString string
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
if err != nil {
return nil, err
}
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')", c.cfg.Database, chainId, blockNumber.String())
err := c.conn.Exec(context.Background(), query)
return err
}

func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) {
query := fmt.Sprintf("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC", c.cfg.Database, chainId.String(), lookbackStart.String())
query += getLimitClause(limit)

rows, err := c.conn.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var blockHeader common.BlockHeader
err := rows.Scan(&blockHeader.Number, &blockHeader.Hash, &blockHeader.ParentHash)
if err != nil {
return nil, err
}
blockHeaders = append(blockHeaders, blockHeader)
}
return blockHeaders, nil
}

func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
var saveErr error
var saveErrMutex sync.Mutex
var wg sync.WaitGroup
wg.Add(4)

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "blocks", "number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "logs", "block_number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "transactions", "block_number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "traces", "block_number"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErrMutex.Unlock()
}
}()

wg.Wait()

if saveErr != nil {
return saveErr
}
return nil
}

func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, blockNumberColumn string) error {
query := fmt.Sprintf("ALTER TABLE %s.%s DELETE WHERE chain_id = ? AND %s IN (?)", c.cfg.Database, table, blockNumberColumn)

blockNumbersStr := make([]string, len(blockNumbers))
for i, bn := range blockNumbers {
blockNumbersStr[i] = bn.String()
}

err := c.conn.Exec(context.Background(), query, chainId, blockNumbersStr)
if err != nil {
return fmt.Errorf("error deleting from %s: %w", table, err)
}

return nil
}

// TODO make this atomic
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
Expand Down
7 changes: 7 additions & 0 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type IOrchestratorStorage interface {
GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
StoreBlockFailures(failures []common.BlockFailure) error
DeleteBlockFailures(failures []common.BlockFailure) error
GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
}

type IStagingStorage interface {
Expand All @@ -55,6 +57,11 @@ type IMainStorage interface {
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
/**
* Get block headers ordered from latest to oldest.
*/
LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error)
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error
}

func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {
Expand Down
66 changes: 66 additions & 0 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b
return maxBlockNumber, nil
}

func isKeyForSomeBlock(key string, prefixes []string, blocksFilter map[string]uint8) bool {
for _, prefix := range prefixes {
if isKeyForBlock(key, prefix, blocksFilter) {
return true
}
}
return false
}

func isKeyForBlock(key string, prefix string, blocksFilter map[string]uint8) bool {
if !strings.HasPrefix(key, prefix) {
return false
Expand Down Expand Up @@ -332,6 +341,24 @@ func traceAddressToString(traceAddress []uint64) string {
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
}

func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
key := fmt.Sprintf("reorg_check:%s", chainId.String())
value, ok := m.cache.Get(key)
if !ok {
return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String())
}
blockNumber, ok := new(big.Int).SetString(value, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", value)
}
return blockNumber, nil
}

func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String())
return nil
}

func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
logs := make([]common.Log, 0)
Expand Down Expand Up @@ -359,3 +386,42 @@ func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
}
return nil
}

func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers})
for _, key := range m.cache.Keys() {
prefixes := []string{fmt.Sprintf("block:%s:", chainId.String()), fmt.Sprintf("log:%s:", chainId.String()), fmt.Sprintf("transaction:%s:", chainId.String()), fmt.Sprintf("trace:%s:", chainId.String())}
shouldDelete := isKeyForSomeBlock(key, prefixes, blockNumbersToCheck)
if shouldDelete {
m.cache.Remove(key)
}
}
return nil
}

func (m *MemoryConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) ([]common.BlockHeader, error) {
blockHeaders := []common.BlockHeader{}
for _, key := range m.cache.Keys() {
if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) {
blockNumberStr := strings.Split(key, ":")[2]
blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
if blockNumber.Cmp(lookbackStart) <= 0 {
value, _ := m.cache.Get(key)
block := common.Block{}
err := json.Unmarshal([]byte(value), &block)
if err != nil {
return nil, err
}
blockHeaders = append(blockHeaders, common.BlockHeader{
Number: blockNumber,
Hash: block.Hash,
ParentHash: block.ParentHash,
})
}
}
}
return blockHeaders, nil
}
20 changes: 20 additions & 0 deletions internal/storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/big"

"github.com/go-redis/redis/v8"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -115,3 +116,22 @@ func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) err
}
return nil
}

func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
ctx := context.Background()
blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result()
if err != nil {
return nil, err
}
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
ctx := context.Background()
r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0)
return nil
}
7 changes: 7 additions & 0 deletions internal/tools/clickhouse_create_cursors_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE cursors (
`chain_id` UInt256,
`cursor_type` String,
`cursor_value` String,
`insert_timestamp` DateTime DEFAULT now(),
) ENGINE = ReplacingMergeTree(insert_timestamp)
ORDER BY (chain_id, cursor_type);
2 changes: 1 addition & 1 deletion internal/tools/clickhouse_create_staging_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ CREATE TABLE block_data (
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number) PRIMARY KEY (chain_id, block_number)
ORDER BY (chain_id, block_number)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

0 comments on commit 80fcbb5

Please sign in to comment.