From 2969e992ea3a60216e719012184e7e394389feb0 Mon Sep 17 00:00:00 2001 From: catalyst17 <37663786+catalyst17@users.noreply.github.com> Date: Fri, 18 Oct 2024 20:24:26 +0200 Subject: [PATCH] feat: enable aggregates and group_by --- api/api.go | 2 +- internal/handlers/logs_handlers.go | 2 +- internal/handlers/transactions_handlers.go | 2 +- internal/storage/clickhouse.go | 45 ++++++++++++++++++++-- internal/storage/connector.go | 2 +- test/mocks/MockIMainStorage.go | 16 ++++---- test/mocks/MockIOrchestratorStorage.go | 2 +- test/mocks/MockIRPCClient.go | 2 +- test/mocks/MockIStagingStorage.go | 2 +- 9 files changed, 56 insertions(+), 19 deletions(-) diff --git a/api/api.go b/api/api.go index 45e86ca..f4538be 100644 --- a/api/api.go +++ b/api/api.go @@ -30,7 +30,7 @@ type QueryParams struct { // @Description Map of filter parameters FilterParams map[string]string `schema:"-"` // @Description Field to group results by - GroupBy string `schema:"group_by"` + GroupBy []string `schema:"group_by"` // @Description Field to sort results by SortBy string `schema:"sort_by"` // @Description Sort order (asc or desc) diff --git a/internal/handlers/logs_handlers.go b/internal/handlers/logs_handlers.go index dbbc3b5..a05a8e0 100644 --- a/internal/handlers/logs_handlers.go +++ b/internal/handlers/logs_handlers.go @@ -135,7 +135,7 @@ func handleLogsRequest(c *gin.Context, contractAddress, signature string) { logs, err := mainStorage.GetLogs(storage.QueryFilter{ FilterParams: queryParams.FilterParams, - GroupBy: []string{queryParams.GroupBy}, + GroupBy: queryParams.GroupBy, SortBy: queryParams.SortBy, SortOrder: queryParams.SortOrder, Page: queryParams.Page, diff --git a/internal/handlers/transactions_handlers.go b/internal/handlers/transactions_handlers.go index 04fd937..0cd842a 100644 --- a/internal/handlers/transactions_handlers.go +++ b/internal/handlers/transactions_handlers.go @@ -136,7 +136,7 @@ func handleTransactionsRequest(c *gin.Context, contractAddress, signature string result, err := mainStorage.GetTransactions(storage.QueryFilter{ FilterParams: queryParams.FilterParams, - GroupBy: []string{queryParams.GroupBy}, + GroupBy: queryParams.GroupBy, SortBy: queryParams.SortBy, SortOrder: queryParams.SortOrder, Page: queryParams.Page, diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 0abf208..97bfda9 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -291,9 +291,19 @@ func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (QueryResult[commo return executeQuery[common.Transaction](c, "transactions", columns, qf, scanTransaction) } -func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], error) { - columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3" - return executeQuery[common.Log](c, "logs", columns, qf, scanLog) +func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[map[string]interface{}], error) { + var columns string + + if len(qf.GroupBy) > 0 || len(qf.Aggregates) > 0 { + // Build columns for SELECT when grouping or aggregating + selectColumns := append(qf.GroupBy, qf.Aggregates...) + columns = strings.Join(selectColumns, ", ") + } else { + // Default columns when not grouping + columns = "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3" + } + + return executeQuery[map[string]interface{}](c, "logs", columns, qf, scanRowToMap) } func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) { @@ -346,7 +356,13 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) query = addFilterParams(key, strings.ToLower(value), query) } - // Add sort by clause + // Add GROUP BY clause if specified + if len(qf.GroupBy) > 0 { + groupByColumns := strings.Join(qf.GroupBy, ", ") + query += fmt.Sprintf(" GROUP BY %s", groupByColumns) + } + + // Add ORDER BY clause if qf.SortBy != "" { query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) } @@ -505,6 +521,27 @@ func scanLog(rows driver.Rows) (common.Log, error) { return log, nil } +func scanRowToMap(rows driver.Rows) (map[string]interface{}, error) { + columns := rows.Columns() + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for i := range columns { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + + result := make(map[string]interface{}) + for i, col := range columns { + result[col] = values[i] + } + + return result, nil +} + func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database) if chainId.Sign() > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index baf9fb2..ac75069 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -54,7 +54,7 @@ type IMainStorage interface { GetBlocks(qf QueryFilter) (blocks []common.Block, err error) GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error) - GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error) + GetLogs(qf QueryFilter) (logs QueryResult[map[string]interface{}], err error) GetTraces(qf QueryFilter) (traces []common.Trace, err error) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) /** diff --git a/test/mocks/MockIMainStorage.go b/test/mocks/MockIMainStorage.go index 8800917..edc1088 100644 --- a/test/mocks/MockIMainStorage.go +++ b/test/mocks/MockIMainStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.2. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. //go:build !production @@ -132,22 +132,22 @@ func (_c *MockIMainStorage_GetBlocks_Call) RunAndReturn(run func(storage.QueryFi } // GetLogs provides a mock function with given fields: qf -func (_m *MockIMainStorage) GetLogs(qf storage.QueryFilter) (storage.QueryResult[common.Log], error) { +func (_m *MockIMainStorage) GetLogs(qf storage.QueryFilter) (storage.QueryResult[map[string]interface{}], error) { ret := _m.Called(qf) if len(ret) == 0 { panic("no return value specified for GetLogs") } - var r0 storage.QueryResult[common.Log] + var r0 storage.QueryResult[map[string]interface{}] var r1 error - if rf, ok := ret.Get(0).(func(storage.QueryFilter) (storage.QueryResult[common.Log], error)); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) (storage.QueryResult[map[string]interface{}], error)); ok { return rf(qf) } - if rf, ok := ret.Get(0).(func(storage.QueryFilter) storage.QueryResult[common.Log]); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) storage.QueryResult[map[string]interface{}]); ok { r0 = rf(qf) } else { - r0 = ret.Get(0).(storage.QueryResult[common.Log]) + r0 = ret.Get(0).(storage.QueryResult[map[string]interface{}]) } if rf, ok := ret.Get(1).(func(storage.QueryFilter) error); ok { @@ -177,12 +177,12 @@ func (_c *MockIMainStorage_GetLogs_Call) Run(run func(qf storage.QueryFilter)) * return _c } -func (_c *MockIMainStorage_GetLogs_Call) Return(logs storage.QueryResult[common.Log], err error) *MockIMainStorage_GetLogs_Call { +func (_c *MockIMainStorage_GetLogs_Call) Return(logs storage.QueryResult[map[string]interface{}], err error) *MockIMainStorage_GetLogs_Call { _c.Call.Return(logs, err) return _c } -func (_c *MockIMainStorage_GetLogs_Call) RunAndReturn(run func(storage.QueryFilter) (storage.QueryResult[common.Log], error)) *MockIMainStorage_GetLogs_Call { +func (_c *MockIMainStorage_GetLogs_Call) RunAndReturn(run func(storage.QueryFilter) (storage.QueryResult[map[string]interface{}], error)) *MockIMainStorage_GetLogs_Call { _c.Call.Return(run) return _c } diff --git a/test/mocks/MockIOrchestratorStorage.go b/test/mocks/MockIOrchestratorStorage.go index 284134a..84b9003 100644 --- a/test/mocks/MockIOrchestratorStorage.go +++ b/test/mocks/MockIOrchestratorStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.2. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. //go:build !production diff --git a/test/mocks/MockIRPCClient.go b/test/mocks/MockIRPCClient.go index 5a078ed..4b062a7 100644 --- a/test/mocks/MockIRPCClient.go +++ b/test/mocks/MockIRPCClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.2. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. //go:build !production diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 969967a..dc0a55b 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.2. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. //go:build !production