From 0f70f2e15b32fcb8c5a5188a9e9f40e40577f595 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Fri, 4 Oct 2024 19:30:22 +0300 Subject: [PATCH] create new storage functions for handling reorgs --- internal/common/block.go | 6 + internal/storage/clickhouse.go | 117 +++++++++++++++++- internal/storage/connector.go | 7 ++ internal/storage/memory.go | 66 ++++++++++ internal/storage/redis.go | 20 +++ .../tools/clickhouse_create_cursors_table.sql | 7 ++ .../tools/clickhouse_create_staging_table.sql | 2 +- 7 files changed, 221 insertions(+), 4 deletions(-) create mode 100644 internal/tools/clickhouse_create_cursors_table.sql diff --git a/internal/common/block.go b/internal/common/block.go index 48a4570..4bcbe04 100644 --- a/internal/common/block.go +++ b/internal/common/block.go @@ -36,4 +36,10 @@ type BlockData struct { Traces []Trace } +type BlockHeader struct { + Number *big.Int `json:"number"` + Hash string `json:"hash"` + ParentHash string `json:"parent_hash"` +} + type RawBlock = map[string]interface{} diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index f831d65..c318cda 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -609,7 +609,7 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error { return batch.Send() } -func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) { +func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) { query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) @@ -625,6 +625,7 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c } defer rows.Close() + blockDataList := make([]common.BlockData, 0) for rows.Next() { var blockDataJson string err := rows.Scan( @@ -639,9 +640,9 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c if err != nil { return nil, err } - *blockDataList = append(*blockDataList, blockData) + blockDataList = append(blockDataList, blockData) } - return blockDataList, nil + return &blockDataList, nil } func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error { @@ -763,6 +764,116 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace, return traces, nil } +func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) + if chainId.Sign() > 0 { + query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) + } + var blockNumberString string + err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString) + if err != nil { + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')", c.cfg.Database, chainId, blockNumber.String()) + err := c.conn.Exec(context.Background(), query) + return err +} + +func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) { + query := fmt.Sprintf("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC", c.cfg.Database, chainId.String(), lookbackStart.String()) + query += getLimitClause(limit) + + rows, err := c.conn.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var blockHeader common.BlockHeader + err := rows.Scan(&blockHeader.Number, &blockHeader.Hash, &blockHeader.ParentHash) + if err != nil { + return nil, err + } + blockHeaders = append(blockHeaders, blockHeader) + } + return blockHeaders, nil +} + +func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error { + var saveErr error + var saveErrMutex sync.Mutex + var wg sync.WaitGroup + wg.Add(4) + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "blocks", "number"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting blocks: %v", err) + saveErrMutex.Unlock() + } + }() + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "logs", "block_number"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting logs: %v", err) + saveErrMutex.Unlock() + } + }() + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "transactions", "block_number"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting transactions: %v", err) + saveErrMutex.Unlock() + } + }() + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "traces", "block_number"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting traces: %v", err) + saveErrMutex.Unlock() + } + }() + + wg.Wait() + + if saveErr != nil { + return saveErr + } + return nil +} + +func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, blockNumberColumn string) error { + query := fmt.Sprintf("ALTER TABLE %s.%s DELETE WHERE chain_id = ? AND %s IN (?)", c.cfg.Database, table, blockNumberColumn) + + blockNumbersStr := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumbersStr[i] = bn.String() + } + + err := c.conn.Exec(context.Background(), query, chainId, blockNumbersStr) + if err != nil { + return fmt.Errorf("error deleting from %s: %w", table, err) + } + + return nil +} + // TODO make this atomic func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { blocks := make([]common.Block, 0, len(*data)) diff --git a/internal/storage/connector.go b/internal/storage/connector.go index be102d5..5afeaf8 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -38,6 +38,8 @@ type IOrchestratorStorage interface { GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) StoreBlockFailures(failures []common.BlockFailure) error DeleteBlockFailures(failures []common.BlockFailure) error + GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) + SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error } type IStagingStorage interface { @@ -55,6 +57,11 @@ type IMainStorage interface { GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error) GetTraces(qf QueryFilter) (traces []common.Trace, err error) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) + /** + * Get block headers ordered from latest to oldest. + */ + LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) + DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error } func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) { diff --git a/internal/storage/memory.go b/internal/storage/memory.go index b1c7855..19286a8 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -218,6 +218,15 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b return maxBlockNumber, nil } +func isKeyForSomeBlock(key string, prefixes []string, blocksFilter map[string]uint8) bool { + for _, prefix := range prefixes { + if isKeyForBlock(key, prefix, blocksFilter) { + return true + } + } + return false +} + func isKeyForBlock(key string, prefix string, blocksFilter map[string]uint8) bool { if !strings.HasPrefix(key, prefix) { return false @@ -332,6 +341,24 @@ func traceAddressToString(traceAddress []uint64) string { return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]") } +func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { + key := fmt.Sprintf("reorg_check:%s", chainId.String()) + value, ok := m.cache.Get(key) + if !ok { + return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String()) + } + blockNumber, ok := new(big.Int).SetString(value, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", value) + } + return blockNumber, nil +} + +func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String()) + return nil +} + func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error { blocks := make([]common.Block, 0, len(*data)) logs := make([]common.Log, 0) @@ -359,3 +386,42 @@ func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error { } return nil } + +func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error { + blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers}) + for _, key := range m.cache.Keys() { + prefixes := []string{fmt.Sprintf("block:%s:", chainId.String()), fmt.Sprintf("log:%s:", chainId.String()), fmt.Sprintf("transaction:%s:", chainId.String()), fmt.Sprintf("trace:%s:", chainId.String())} + shouldDelete := isKeyForSomeBlock(key, prefixes, blockNumbersToCheck) + if shouldDelete { + m.cache.Remove(key) + } + } + return nil +} + +func (m *MemoryConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) ([]common.BlockHeader, error) { + blockHeaders := []common.BlockHeader{} + for _, key := range m.cache.Keys() { + if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) { + blockNumberStr := strings.Split(key, ":")[2] + blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr) + } + if blockNumber.Cmp(lookbackStart) <= 0 { + value, _ := m.cache.Get(key) + block := common.Block{} + err := json.Unmarshal([]byte(value), &block) + if err != nil { + return nil, err + } + blockHeaders = append(blockHeaders, common.BlockHeader{ + Number: blockNumber, + Hash: block.Hash, + ParentHash: block.ParentHash, + }) + } + } + } + return blockHeaders, nil +} diff --git a/internal/storage/redis.go b/internal/storage/redis.go index 49e6e0c..3e11236 100644 --- a/internal/storage/redis.go +++ b/internal/storage/redis.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/big" "github.com/go-redis/redis/v8" "github.com/rs/zerolog/log" @@ -115,3 +116,22 @@ func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) err } return nil } + +func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { + ctx := context.Background() + blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result() + if err != nil { + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + ctx := context.Background() + r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0) + return nil +} diff --git a/internal/tools/clickhouse_create_cursors_table.sql b/internal/tools/clickhouse_create_cursors_table.sql new file mode 100644 index 0000000..a33a7d6 --- /dev/null +++ b/internal/tools/clickhouse_create_cursors_table.sql @@ -0,0 +1,7 @@ +CREATE TABLE cursors ( + `chain_id` UInt256, + `cursor_type` String, + `cursor_value` String, + `insert_timestamp` DateTime DEFAULT now(), +) ENGINE = ReplacingMergeTree(insert_timestamp) +ORDER BY (chain_id, cursor_type); diff --git a/internal/tools/clickhouse_create_staging_table.sql b/internal/tools/clickhouse_create_staging_table.sql index 1f606b6..2ab6ec2 100644 --- a/internal/tools/clickhouse_create_staging_table.sql +++ b/internal/tools/clickhouse_create_staging_table.sql @@ -6,5 +6,5 @@ CREATE TABLE block_data ( `is_deleted` UInt8 DEFAULT 0, INDEX idx_block_number block_number TYPE minmax GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) -ORDER BY (chain_id, block_number) PRIMARY KEY (chain_id, block_number) +ORDER BY (chain_id, block_number) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file