Skip to content

Commit

Permalink
create new storage functions for handling reorgs
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 7, 2024
1 parent dd391a7 commit ddcfcbb
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 1 deletion.
6 changes: 6 additions & 0 deletions internal/common/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ type BlockData struct {
Traces []Trace
}

type BlockHeader struct {
Number *big.Int `json:"number"`
Hash string `json:"hash"`
ParentHash string `json:"parent_hash"`
}

type RawBlock = map[string]interface{}
116 changes: 116 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,122 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
return traces, nil
}

func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
var blockNumberString string
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
if err != nil {
return nil, err
}
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')", c.cfg.Database, chainId, blockNumber.String())
err := c.conn.Exec(context.Background(), query)
return err
}

func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) {
query := fmt.Sprintf("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC", c.cfg.Database, chainId.String(), lookbackStart.String())
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
query += getLimitClause(limit)

rows, err := c.conn.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var blockHeader common.BlockHeader
err := rows.Scan(&blockHeader.Number, &blockHeader.Hash, &blockHeader.ParentHash)
if err != nil {
return nil, err
}
blockHeaders = append(blockHeaders, blockHeader)
}
return blockHeaders, nil
}

func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
var saveErr error
var saveErrMutex sync.Mutex
var wg sync.WaitGroup
wg.Add(4)

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "blocks", "chain_id, number, is_deleted"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "logs", "chain_id, block_number, is_deleted"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "transactions", "chain_id, block_number, is_deleted"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErrMutex.Unlock()
}
}()

go func() {
defer wg.Done()
if err := c.deleteBatch(chainId, blockNumbers, "traces", "chain_id, block_number, is_deleted"); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErrMutex.Unlock()
}
}()

wg.Wait()

if saveErr != nil {
return saveErr
}
return nil
}

func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, columns string) error {
query := fmt.Sprintf("INSERT INTO %s.%s (%s)", c.cfg.Database, table, columns)
batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
return err
}
for _, blockNumber := range blockNumbers {
err = batch.Append(
chainId,
blockNumber,
1,
)
if err != nil {
return err
}
}
return batch.Send()
}

// TODO make this atomic
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
Expand Down
7 changes: 7 additions & 0 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type IOrchestratorStorage interface {
GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
StoreBlockFailures(failures []common.BlockFailure) error
DeleteBlockFailures(failures []common.BlockFailure) error
GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
}

type IStagingStorage interface {
Expand All @@ -55,6 +57,11 @@ type IMainStorage interface {
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
/**
* Get block headers ordered from latest to oldest.
*/
LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error)
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error
}

func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {
Expand Down
66 changes: 66 additions & 0 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b
return maxBlockNumber, nil
}

func isKeyForSomeBlock(key string, prefixes []string, blocksFilter map[string]uint8) bool {
for _, prefix := range prefixes {
if isKeyForBlock(key, prefix, blocksFilter) {
return true
}
}
return false
}

func isKeyForBlock(key string, prefix string, blocksFilter map[string]uint8) bool {
if !strings.HasPrefix(key, prefix) {
return false
Expand Down Expand Up @@ -332,6 +341,24 @@ func traceAddressToString(traceAddress []uint64) string {
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
}

func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
key := fmt.Sprintf("reorg_check:%s", chainId.String())
value, ok := m.cache.Get(key)
if !ok {
return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String())
}
blockNumber, ok := new(big.Int).SetString(value, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", value)
}
return blockNumber, nil
}

func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String())
return nil
}

func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
logs := make([]common.Log, 0)
Expand Down Expand Up @@ -359,3 +386,42 @@ func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
}
return nil
}

func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers})
for _, key := range m.cache.Keys() {
prefixes := []string{fmt.Sprintf("block:%s:", chainId.String()), fmt.Sprintf("log:%s:", chainId.String()), fmt.Sprintf("transaction:%s:", chainId.String()), fmt.Sprintf("trace:%s:", chainId.String())}
shouldDelete := isKeyForSomeBlock(key, prefixes, blockNumbersToCheck)
if shouldDelete {
m.cache.Remove(key)
}
}
return nil
}

func (m *MemoryConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) ([]common.BlockHeader, error) {
blockHeaders := []common.BlockHeader{}
for _, key := range m.cache.Keys() {
if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) {
blockNumberStr := strings.Split(key, ":")[2]
blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
if blockNumber.Cmp(lookbackStart) <= 0 {
value, _ := m.cache.Get(key)
block := common.Block{}
err := json.Unmarshal([]byte(value), &block)
if err != nil {
return nil, err
}
blockHeaders = append(blockHeaders, common.BlockHeader{
Number: blockNumber,
Hash: block.Hash,
ParentHash: block.ParentHash,
})
}
}
}
return blockHeaders, nil
}
20 changes: 20 additions & 0 deletions internal/storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/big"

"github.com/go-redis/redis/v8"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -115,3 +116,22 @@ func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) err
}
return nil
}

func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
ctx := context.Background()
blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result()
if err != nil {
return nil, err
}
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
ctx := context.Background()
r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0)
return nil
}
7 changes: 7 additions & 0 deletions internal/tools/clickhouse_create_cursors_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE cursors (
`chain_id` UInt256,
`cursor_type` String,
`cursor_value` String,
`insert_timestamp` DateTime DEFAULT now(),
) ENGINE = ReplacingMergeTree(insert_timestamp)
ORDER BY (chain_id, cursor_type);
2 changes: 1 addition & 1 deletion internal/tools/clickhouse_create_staging_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ CREATE TABLE block_data (
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number) PRIMARY KEY (chain_id, block_number)
ORDER BY (chain_id, block_number)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

0 comments on commit ddcfcbb

Please sign in to comment.