From 6c6514f31936cc1b7bbc2cd6d2113db857dd194e Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Fri, 11 Oct 2024 18:01:18 +0300 Subject: [PATCH] schema changes --- internal/common/transaction.go | 1 + internal/rpc/serializer.go | 11 +++++++++++ internal/storage/clickhouse.go | 16 +++++++++++++--- .../tools/clickhouse_create_blocks_table.sql | 4 ++-- internal/tools/clickhouse_create_logs_table.sql | 7 +++++-- .../tools/clickhouse_create_traces_table.sql | 4 ++-- .../clickhouse_create_transactions_table.sql | 6 ++++-- 7 files changed, 38 insertions(+), 11 deletions(-) diff --git a/internal/common/transaction.go b/internal/common/transaction.go index c7b4407..2464c11 100644 --- a/internal/common/transaction.go +++ b/internal/common/transaction.go @@ -18,6 +18,7 @@ type Transaction struct { Gas uint64 `json:"gas"` GasPrice *big.Int `json:"gas_price"` Data string `json:"data"` + FunctionSelector string `json:"function_selector"` MaxFeePerGas *big.Int `json:"max_fee_per_gas"` MaxPriorityFeePerGas *big.Int `json:"max_priority_fee_per_gas"` TransactionType uint8 `json:"transaction_type"` diff --git a/internal/rpc/serializer.go b/internal/rpc/serializer.go index 69e2562..36a488c 100644 --- a/internal/rpc/serializer.go +++ b/internal/rpc/serializer.go @@ -159,6 +159,7 @@ func serializeTransaction(chainId *big.Int, rawTx interface{}, blockTimestamp ui Gas: hexToUint64(tx["gas"]), GasPrice: hexToBigInt(tx["gasPrice"]), Data: interfaceToString(tx["input"]), + FunctionSelector: extractFunctionSelector(interfaceToString(tx["input"])), MaxFeePerGas: hexToBigInt(tx["maxFeePerGas"]), MaxPriorityFeePerGas: hexToBigInt(tx["maxPriorityFeePerGas"]), TransactionType: uint8(hexToUint64(tx["type"])), @@ -169,6 +170,16 @@ func serializeTransaction(chainId *big.Int, rawTx interface{}, blockTimestamp ui } } +/** + * Extracts the function selector (first 4 bytes) from a transaction input. + */ +func extractFunctionSelector(s string) string { + if len(s) < 10 { + return "" + } + return s[0:10] +} + func serializeLogs(chainId *big.Int, rawLogs []map[string]interface{}, block common.Block) []common.Log { serializedLogs := make([]common.Log, len(rawLogs)) for i, rawLog := range rawLogs { diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 443f6c6..8ecdb5a 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -3,6 +3,7 @@ package storage import ( "context" "crypto/tls" + "database/sql" "encoding/json" "fmt" "math/big" @@ -110,7 +111,7 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro query := ` INSERT INTO ` + c.cfg.Database + `.transactions ( chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index, - from_address, to_address, value, gas, gas_price, data, max_fee_per_gas, max_priority_fee_per_gas, + from_address, to_address, value, gas, gas_price, data, function_selector, max_fee_per_gas, max_priority_fee_per_gas, transaction_type, r, s, v, access_list ) ` @@ -133,6 +134,7 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro tx.Gas, tx.GasPrice, tx.Data, + tx.FunctionSelector, tx.MaxFeePerGas, tx.MaxPriorityFeePerGas, tx.TransactionType, @@ -490,12 +492,16 @@ func scanLog(rows driver.Rows) (common.Log, error) { } func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { - query := fmt.Sprintf("SELECT max(number) FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database) + query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database) if chainId.Sign() > 0 { query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) } + query += " ORDER BY number DESC LIMIT 1" err = c.conn.QueryRow(context.Background(), query).Scan(&maxBlockNumber) if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } return nil, err } zLog.Debug().Msgf("Max block number in main storage is: %s", maxBlockNumber.String()) @@ -503,15 +509,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe } func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) { - query := fmt.Sprintf("SELECT max(block_number) FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database) + query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database) if chainId.Sign() > 0 { query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) } if rangeEnd.Sign() > 0 { query += fmt.Sprintf(" AND block_number <= %s", rangeEnd.String()) } + query += " ORDER BY block_number DESC LIMIT 1" err = c.conn.QueryRow(context.Background(), query).Scan(&maxBlockNumber) if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } return nil, err } return maxBlockNumber, nil diff --git a/internal/tools/clickhouse_create_blocks_table.sql b/internal/tools/clickhouse_create_blocks_table.sql index fc5e550..efcc731 100644 --- a/internal/tools/clickhouse_create_blocks_table.sql +++ b/internal/tools/clickhouse_create_blocks_table.sql @@ -24,7 +24,7 @@ CREATE TABLE blocks ( `insert_timestamp` DateTime DEFAULT now(), `is_deleted` UInt8 DEFAULT 0, INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 1, - INDEX idx_number number TYPE minmax GRANULARITY 1, + INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) -ORDER BY (chain_id, hash) +ORDER BY (chain_id, number) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file diff --git a/internal/tools/clickhouse_create_logs_table.sql b/internal/tools/clickhouse_create_logs_table.sql index 1b05ef9..4ece815 100644 --- a/internal/tools/clickhouse_create_logs_table.sql +++ b/internal/tools/clickhouse_create_logs_table.sql @@ -15,10 +15,13 @@ CREATE TABLE logs ( `insert_timestamp` DateTime DEFAULT now(), `is_deleted` UInt8 DEFAULT 0, INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1, - INDEX idx_block_number block_number TYPE minmax GRANULARITY 1, + INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1, INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1, INDEX idx_address address TYPE bloom_filter GRANULARITY 1, INDEX idx_topic0 topic_0 TYPE bloom_filter GRANULARITY 1, + INDEX idx_topic1 topic_1 TYPE bloom_filter GRANULARITY 1, + INDEX idx_topic2 topic_2 TYPE bloom_filter GRANULARITY 1, + INDEX idx_topic3 topic_3 TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) -ORDER BY (chain_id, transaction_hash, log_index, block_hash) +ORDER BY (chain_id, block_number, transaction_hash, log_index) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file diff --git a/internal/tools/clickhouse_create_traces_table.sql b/internal/tools/clickhouse_create_traces_table.sql index a284b6a..1d59696 100644 --- a/internal/tools/clickhouse_create_traces_table.sql +++ b/internal/tools/clickhouse_create_traces_table.sql @@ -23,11 +23,11 @@ CREATE TABLE traces ( `is_deleted` UInt8 DEFAULT 0, `insert_timestamp` DateTime DEFAULT now(), INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1, - INDEX idx_block_number block_number TYPE minmax GRANULARITY 1, INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1, + INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1, INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1, INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1, INDEX idx_type type TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) -ORDER BY (chain_id, transaction_hash, trace_address, block_hash) +ORDER BY (chain_id, block_number, transaction_hash, trace_address) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file diff --git a/internal/tools/clickhouse_create_transactions_table.sql b/internal/tools/clickhouse_create_transactions_table.sql index baeb22f..f43c72e 100644 --- a/internal/tools/clickhouse_create_transactions_table.sql +++ b/internal/tools/clickhouse_create_transactions_table.sql @@ -12,6 +12,7 @@ CREATE TABLE transactions ( `gas` UInt64, `gas_price` UInt256, `data` String, + `function_selector` FixedString(10), `max_fee_per_gas` UInt128, `max_priority_fee_per_gas` UInt128, `transaction_type` UInt8, @@ -22,9 +23,10 @@ CREATE TABLE transactions ( `is_deleted` UInt8 DEFAULT 0, `insert_timestamp` DateTime DEFAULT now(), INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1, - INDEX idx_block_number block_number TYPE minmax GRANULARITY 1, INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1, + INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1, INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1, INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1, + INDEX idx_function_selector function_selector TYPE bloom_filter GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) -ORDER BY (chain_id, hash) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file +ORDER BY (chain_id, block_number, hash) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file