From 8e4c3907d90a46ba0f2f47c0174b613ba30ef427 Mon Sep 17 00:00:00 2001 From: catalyst17 <37663786+catalyst17@users.noreply.github.com> Date: Tue, 22 Oct 2024 19:52:12 +0200 Subject: [PATCH] feat: separate storage method for aggregates --- api/api.go | 2 +- internal/handlers/logs_handlers.go | 54 +++++--- internal/handlers/transactions_handlers.go | 2 +- internal/storage/clickhouse.go | 141 +++++++++------------ internal/storage/connector.go | 10 +- test/mocks/MockIMainStorage.go | 71 ++++++++++- 6 files changed, 170 insertions(+), 110 deletions(-) diff --git a/api/api.go b/api/api.go index f4538be..0fd16d7 100644 --- a/api/api.go +++ b/api/api.go @@ -70,7 +70,7 @@ type QueryResponse struct { // @Description Query result data Data interface{} `json:"data,omitempty"` // @Description Aggregation results - Aggregations map[string]string `json:"aggregations,omitempty"` + Aggregations []map[string]interface{} `json:"aggregations,omitempty"` } func writeError(w http.ResponseWriter, message string, code int) { diff --git a/internal/handlers/logs_handlers.go b/internal/handlers/logs_handlers.go index a05a8e0..0f40c3d 100644 --- a/internal/handlers/logs_handlers.go +++ b/internal/handlers/logs_handlers.go @@ -133,39 +133,59 @@ func handleLogsRequest(c *gin.Context, contractAddress, signature string) { return } - logs, err := mainStorage.GetLogs(storage.QueryFilter{ + // Prepare the QueryFilter + qf := storage.QueryFilter{ FilterParams: queryParams.FilterParams, - GroupBy: queryParams.GroupBy, + ContractAddress: contractAddress, + Signature: signatureHash, + ChainId: chainId, SortBy: queryParams.SortBy, SortOrder: queryParams.SortOrder, Page: queryParams.Page, Limit: queryParams.Limit, - Aggregates: queryParams.Aggregates, - ContractAddress: contractAddress, - Signature: signatureHash, - ChainId: chainId, - }) - if err != nil { - log.Error().Err(err).Msg("Error querying logs") - api.InternalErrorHandler(c) - return } - response := api.QueryResponse{ + // Initialize the QueryResult + queryResult := api.QueryResponse{ Meta: api.Meta{ ChainId: chainId.Uint64(), ContractAddress: contractAddress, Signature: signatureHash, Page: queryParams.Page, Limit: queryParams.Limit, - TotalItems: len(logs.Data), - TotalPages: 0, // TODO: Implement total pages count + TotalItems: 0, + TotalPages: 0, // Implement total pages count if needed }, - Data: logs.Data, - Aggregations: logs.Aggregates, + Data: nil, + Aggregations: nil, + } + + // If aggregates are specified, retrieve them + if len(queryParams.Aggregates) > 0 { + qf.Aggregates = queryParams.Aggregates + qf.GroupBy = queryParams.GroupBy + + aggregatesResult, err := mainStorage.GetAggregations("logs", qf) + if err != nil { + log.Error().Err(err).Msg("Error querying aggregates") + api.InternalErrorHandler(c) + return + } + queryResult.Aggregations = aggregatesResult.Aggregates + queryResult.Meta.TotalItems = len(aggregatesResult.Aggregates) + } else { + // Retrieve logs data + logsResult, err := mainStorage.GetLogs(qf) + if err != nil { + log.Error().Err(err).Msg("Error querying logs") + api.InternalErrorHandler(c) + return + } + queryResult.Data = logsResult.Data + queryResult.Meta.TotalItems = len(logsResult.Data) } - sendJSONResponse(c, response) + sendJSONResponse(c, queryResult) } func getMainStorage() (storage.IMainStorage, error) { diff --git a/internal/handlers/transactions_handlers.go b/internal/handlers/transactions_handlers.go index 0cd842a..f99f54a 100644 --- a/internal/handlers/transactions_handlers.go +++ b/internal/handlers/transactions_handlers.go @@ -163,7 +163,7 @@ func handleTransactionsRequest(c *gin.Context, contractAddress, signature string TotalPages: 0, // TODO: Implement total pages count }, Data: result.Data, - Aggregations: result.Aggregates, + Aggregations: nil, } c.JSON(http.StatusOK, response) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 97bfda9..c0a1d4c 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -291,19 +291,66 @@ func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (QueryResult[commo return executeQuery[common.Transaction](c, "transactions", columns, qf, scanTransaction) } -func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[map[string]interface{}], error) { - var columns string +func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], error) { + columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3" + return executeQuery[common.Log](c, "logs", columns, qf, scanLog) +} + +func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) { + // Build the SELECT clause with aggregates + columns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ") + query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table) + + // Apply filters + if qf.ChainId != nil && qf.ChainId.Sign() > 0 { + query = addFilterParams("chain_id", qf.ChainId.String(), query) + } + query = addContractAddress(table, query, qf.ContractAddress) + + if qf.Signature != "" { + query += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature) + } - if len(qf.GroupBy) > 0 || len(qf.Aggregates) > 0 { - // Build columns for SELECT when grouping or aggregating - selectColumns := append(qf.GroupBy, qf.Aggregates...) - columns = strings.Join(selectColumns, ", ") - } else { - // Default columns when not grouping - columns = "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3" + for key, value := range qf.FilterParams { + query = addFilterParams(key, strings.ToLower(value), query) } - return executeQuery[map[string]interface{}](c, "logs", columns, qf, scanRowToMap) + // Add GROUP BY clause if specified + if len(qf.GroupBy) > 0 { + groupByColumns := strings.Join(qf.GroupBy, ", ") + query += fmt.Sprintf(" GROUP BY %s", groupByColumns) + } + + // Execute the query + rows, err := c.conn.Query(context.Background(), query) + if err != nil { + return QueryResult[interface{}]{}, err + } + defer rows.Close() + + // Collect results + var aggregates []map[string]interface{} + for rows.Next() { + columns := rows.Columns() + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range columns { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + return QueryResult[interface{}]{}, err + } + + result := make(map[string]interface{}) + for i, col := range columns { + result[col] = values[i] + } + + aggregates = append(aggregates, result) + } + + return QueryResult[interface{}]{Data: nil, Aggregates: aggregates}, nil } func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) { @@ -316,8 +363,7 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query defer rows.Close() queryResult := QueryResult[T]{ - Data: []T{}, - Aggregates: map[string]string{}, + Data: []T{}, } for rows.Next() { @@ -328,21 +374,13 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query queryResult.Data = append(queryResult.Data, item) } - if len(qf.Aggregates) > 0 { - aggregates, err := c.executeAggregateQuery(table, qf) - if err != nil { - return queryResult, err - } - queryResult.Aggregates = aggregates - } - return queryResult, nil } func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) string { query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table) - if qf.ChainId.Sign() > 0 { + if qf.ChainId != nil && qf.ChainId.Sign() > 0 { query = addFilterParams("chain_id", qf.ChainId.String(), query) } query = addContractAddress(table, query, qf.ContractAddress) @@ -356,12 +394,6 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) query = addFilterParams(key, strings.ToLower(value), query) } - // Add GROUP BY clause if specified - if len(qf.GroupBy) > 0 { - groupByColumns := strings.Join(qf.GroupBy, ", ") - query += fmt.Sprintf(" GROUP BY %s", groupByColumns) - } - // Add ORDER BY clause if qf.SortBy != "" { query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) @@ -371,9 +403,8 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) if qf.Page > 0 && qf.Limit > 0 { offset := (qf.Page - 1) * qf.Limit query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset) - } else { - // Add limit clause - query += getLimitClause(int(qf.Limit)) + } else if qf.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", qf.Limit) } return query @@ -433,35 +464,6 @@ func getTopicValueFormat(topic string) string { return result } -func (c *ClickHouseConnector) executeAggregateQuery(table string, qf QueryFilter) (map[string]string, error) { - aggregateQuery := "SELECT " + strings.Join(qf.Aggregates, ", ") + - fmt.Sprintf(" FROM %s.%s WHERE is_deleted = 0", c.cfg.Database, table) - - if qf.ContractAddress != "" { - aggregateQuery += fmt.Sprintf(" AND address = '%s'", qf.ContractAddress) - } - if qf.Signature != "" { - aggregateQuery += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature) - } - for key, value := range qf.FilterParams { - aggregateQuery += fmt.Sprintf(" AND %s = '%s'", key, value) - } - - row := c.conn.QueryRow(context.Background(), aggregateQuery) - aggregateResultsJSON, err := json.Marshal(row) - if err != nil { - return nil, fmt.Errorf("error marshaling aggregate results to JSON: %w", err) - } - - var aggregateResultsMap map[string]string - err = json.Unmarshal(aggregateResultsJSON, &aggregateResultsMap) - if err != nil { - return nil, fmt.Errorf("error unmarshaling aggregate results JSON to map: %w", err) - } - - return aggregateResultsMap, nil -} - func scanTransaction(rows driver.Rows) (common.Transaction, error) { var tx common.Transaction err := rows.Scan( @@ -521,27 +523,6 @@ func scanLog(rows driver.Rows) (common.Log, error) { return log, nil } -func scanRowToMap(rows driver.Rows) (map[string]interface{}, error) { - columns := rows.Columns() - values := make([]interface{}, len(columns)) - valuePtrs := make([]interface{}, len(columns)) - - for i := range columns { - valuePtrs[i] = &values[i] - } - - if err := rows.Scan(valuePtrs...); err != nil { - return nil, err - } - - result := make(map[string]interface{}) - for i, col := range columns { - result[col] = values[i] - } - - return result, nil -} - func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database) if chainId.Sign() > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index ac75069..86cddbf 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -18,16 +18,17 @@ type QueryFilter struct { Page int Limit int Offset int - Aggregates []string + Aggregates []string // e.g., ["COUNT(*) AS count", "SUM(amount) AS total_amount"] FromAddress string ContractAddress string Signature string } type QueryResult[T any] struct { // TODO: findout how to only allow Log/transaction arrays or split the result - Data []T `json:"data"` - Aggregates map[string]string `json:"aggregates"` + Data []T `json:"data"` + Aggregates []map[string]interface{} `json:"aggregates"` } + type IStorage struct { OrchestratorStorage IOrchestratorStorage MainStorage IMainStorage @@ -54,7 +55,8 @@ type IMainStorage interface { GetBlocks(qf QueryFilter) (blocks []common.Block, err error) GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error) - GetLogs(qf QueryFilter) (logs QueryResult[map[string]interface{}], err error) + GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error) + GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) GetTraces(qf QueryFilter) (traces []common.Trace, err error) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) /** diff --git a/test/mocks/MockIMainStorage.go b/test/mocks/MockIMainStorage.go index edc1088..5423841 100644 --- a/test/mocks/MockIMainStorage.go +++ b/test/mocks/MockIMainStorage.go @@ -73,6 +73,63 @@ func (_c *MockIMainStorage_DeleteBlockData_Call) RunAndReturn(run func(*big.Int, return _c } +// GetAggregations provides a mock function with given fields: table, qf +func (_m *MockIMainStorage) GetAggregations(table string, qf storage.QueryFilter) (storage.QueryResult[interface{}], error) { + ret := _m.Called(table, qf) + + if len(ret) == 0 { + panic("no return value specified for GetAggregations") + } + + var r0 storage.QueryResult[interface{}] + var r1 error + if rf, ok := ret.Get(0).(func(string, storage.QueryFilter) (storage.QueryResult[interface{}], error)); ok { + return rf(table, qf) + } + if rf, ok := ret.Get(0).(func(string, storage.QueryFilter) storage.QueryResult[interface{}]); ok { + r0 = rf(table, qf) + } else { + r0 = ret.Get(0).(storage.QueryResult[interface{}]) + } + + if rf, ok := ret.Get(1).(func(string, storage.QueryFilter) error); ok { + r1 = rf(table, qf) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIMainStorage_GetAggregations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAggregations' +type MockIMainStorage_GetAggregations_Call struct { + *mock.Call +} + +// GetAggregations is a helper method to define mock.On call +// - table string +// - qf storage.QueryFilter +func (_e *MockIMainStorage_Expecter) GetAggregations(table interface{}, qf interface{}) *MockIMainStorage_GetAggregations_Call { + return &MockIMainStorage_GetAggregations_Call{Call: _e.mock.On("GetAggregations", table, qf)} +} + +func (_c *MockIMainStorage_GetAggregations_Call) Run(run func(table string, qf storage.QueryFilter)) *MockIMainStorage_GetAggregations_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(storage.QueryFilter)) + }) + return _c +} + +func (_c *MockIMainStorage_GetAggregations_Call) Return(_a0 storage.QueryResult[interface{}], _a1 error) *MockIMainStorage_GetAggregations_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockIMainStorage_GetAggregations_Call) RunAndReturn(run func(string, storage.QueryFilter) (storage.QueryResult[interface{}], error)) *MockIMainStorage_GetAggregations_Call { + _c.Call.Return(run) + return _c +} + // GetBlocks provides a mock function with given fields: qf func (_m *MockIMainStorage) GetBlocks(qf storage.QueryFilter) ([]common.Block, error) { ret := _m.Called(qf) @@ -132,22 +189,22 @@ func (_c *MockIMainStorage_GetBlocks_Call) RunAndReturn(run func(storage.QueryFi } // GetLogs provides a mock function with given fields: qf -func (_m *MockIMainStorage) GetLogs(qf storage.QueryFilter) (storage.QueryResult[map[string]interface{}], error) { +func (_m *MockIMainStorage) GetLogs(qf storage.QueryFilter) (storage.QueryResult[common.Log], error) { ret := _m.Called(qf) if len(ret) == 0 { panic("no return value specified for GetLogs") } - var r0 storage.QueryResult[map[string]interface{}] + var r0 storage.QueryResult[common.Log] var r1 error - if rf, ok := ret.Get(0).(func(storage.QueryFilter) (storage.QueryResult[map[string]interface{}], error)); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) (storage.QueryResult[common.Log], error)); ok { return rf(qf) } - if rf, ok := ret.Get(0).(func(storage.QueryFilter) storage.QueryResult[map[string]interface{}]); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) storage.QueryResult[common.Log]); ok { r0 = rf(qf) } else { - r0 = ret.Get(0).(storage.QueryResult[map[string]interface{}]) + r0 = ret.Get(0).(storage.QueryResult[common.Log]) } if rf, ok := ret.Get(1).(func(storage.QueryFilter) error); ok { @@ -177,12 +234,12 @@ func (_c *MockIMainStorage_GetLogs_Call) Run(run func(qf storage.QueryFilter)) * return _c } -func (_c *MockIMainStorage_GetLogs_Call) Return(logs storage.QueryResult[map[string]interface{}], err error) *MockIMainStorage_GetLogs_Call { +func (_c *MockIMainStorage_GetLogs_Call) Return(logs storage.QueryResult[common.Log], err error) *MockIMainStorage_GetLogs_Call { _c.Call.Return(logs, err) return _c } -func (_c *MockIMainStorage_GetLogs_Call) RunAndReturn(run func(storage.QueryFilter) (storage.QueryResult[map[string]interface{}], error)) *MockIMainStorage_GetLogs_Call { +func (_c *MockIMainStorage_GetLogs_Call) RunAndReturn(run func(storage.QueryFilter) (storage.QueryResult[common.Log], error)) *MockIMainStorage_GetLogs_Call { _c.Call.Return(run) return _c }