Skip to content

Commit

Permalink
Enable aggregates and group_by (#109)
Browse files Browse the repository at this point in the history
### TL;DR

Enhanced the query functionality to support multiple grouping fields and aggregations, with improved type handling for ClickHouse data types.

### What changed?

- Modified `GroupBy` field in `QueryParams` to accept an array of strings instead of a single string
- Added `GetAggregations` method to handle grouped and aggregated queries separately
- Updated `Aggregations` field in `QueryResponse` to use `[]map[string]interface{}` for flexible result handling
- Implemented comprehensive ClickHouse type mapping to Go types with support for nullable, array, and low cardinality types
- Separated data retrieval logic in handlers to handle aggregations distinctly from regular queries
- Added unit tests for ClickHouse type mapping functionality

### How to test?

1. Query logs/transactions endpoints with multiple group_by parameters
2. Test various aggregation functions (COUNT, SUM, etc.) with grouping
3. Verify response structure for both regular and aggregated queries
4. Test handling of different ClickHouse data types and their nullable variants
5. Run new unit tests for type mapping functionality

### Why make this change?

This enhancement provides more robust support for data analysis by properly handling grouped queries and aggregations. The improved type mapping system ensures accurate data representation when working with ClickHouse's various data types, while maintaining clean separation between regular queries and aggregations.
  • Loading branch information
catalyst17 authored Oct 25, 2024
2 parents 2f29801 + 3e4e848 commit 18234d0
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 86 deletions.
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type QueryParams struct {
// @Description Map of filter parameters
FilterParams map[string]string `schema:"-"`
// @Description Field to group results by
GroupBy string `schema:"group_by"`
GroupBy []string `schema:"group_by"`
// @Description Field to sort results by
SortBy string `schema:"sort_by"`
// @Description Sort order (asc or desc)
Expand Down Expand Up @@ -70,7 +70,7 @@ type QueryResponse struct {
// @Description Query result data
Data interface{} `json:"data,omitempty"`
// @Description Aggregation results
Aggregations map[string]string `json:"aggregations,omitempty"`
Aggregations []map[string]interface{} `json:"aggregations,omitempty"`
}

func writeError(w http.ResponseWriter, message string, code int) {
Expand Down
52 changes: 36 additions & 16 deletions internal/handlers/logs_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,39 +133,59 @@ func handleLogsRequest(c *gin.Context, contractAddress, signature string) {
return
}

logs, err := mainStorage.GetLogs(storage.QueryFilter{
// Prepare the QueryFilter
qf := storage.QueryFilter{
FilterParams: queryParams.FilterParams,
GroupBy: []string{queryParams.GroupBy},
ContractAddress: contractAddress,
Signature: signatureHash,
ChainId: chainId,
SortBy: queryParams.SortBy,
SortOrder: queryParams.SortOrder,
Page: queryParams.Page,
Limit: queryParams.Limit,
Aggregates: queryParams.Aggregates,
ContractAddress: contractAddress,
Signature: signatureHash,
ChainId: chainId,
})
if err != nil {
log.Error().Err(err).Msg("Error querying logs")
api.InternalErrorHandler(c)
return
}

response := api.QueryResponse{
// Initialize the QueryResult
queryResult := api.QueryResponse{
Meta: api.Meta{
ChainId: chainId.Uint64(),
ContractAddress: contractAddress,
Signature: signatureHash,
Page: queryParams.Page,
Limit: queryParams.Limit,
TotalItems: len(logs.Data),
TotalItems: 0,
TotalPages: 0, // TODO: Implement total pages count
},
Data: logs.Data,
Aggregations: logs.Aggregates,
Data: nil,
Aggregations: nil,
}

// If aggregates or groupings are specified, retrieve them
if len(queryParams.Aggregates) > 0 || len(queryParams.GroupBy) > 0 {
qf.Aggregates = queryParams.Aggregates
qf.GroupBy = queryParams.GroupBy

aggregatesResult, err := mainStorage.GetAggregations("logs", qf)
if err != nil {
log.Error().Err(err).Msg("Error querying aggregates")
api.InternalErrorHandler(c)
return
}
queryResult.Aggregations = aggregatesResult.Aggregates
queryResult.Meta.TotalItems = len(aggregatesResult.Aggregates)
} else {
// Retrieve logs data
logsResult, err := mainStorage.GetLogs(qf)
if err != nil {
log.Error().Err(err).Msg("Error querying logs")
api.InternalErrorHandler(c)
return
}
queryResult.Data = logsResult.Data
queryResult.Meta.TotalItems = len(logsResult.Data)
}

sendJSONResponse(c, response)
sendJSONResponse(c, queryResult)
}

func getMainStorage() (storage.IMainStorage, error) {
Expand Down
54 changes: 37 additions & 17 deletions internal/handlers/transactions_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,37 +134,57 @@ func handleTransactionsRequest(c *gin.Context, contractAddress, signature string
return
}

result, err := mainStorage.GetTransactions(storage.QueryFilter{
// Prepare the QueryFilter
qf := storage.QueryFilter{
FilterParams: queryParams.FilterParams,
GroupBy: []string{queryParams.GroupBy},
ContractAddress: contractAddress,
Signature: signatureHash,
ChainId: chainId,
SortBy: queryParams.SortBy,
SortOrder: queryParams.SortOrder,
Page: queryParams.Page,
Limit: queryParams.Limit,
Aggregates: queryParams.Aggregates,
ContractAddress: contractAddress,
Signature: signatureHash,
ChainId: chainId,
})
if err != nil {
log.Error().Err(err).Msg("Error querying transactions")
api.InternalErrorHandler(c)
return
}

response := api.QueryResponse{
// Initialize the QueryResult
queryResult := api.QueryResponse{
Meta: api.Meta{
ChainId: chainId.Uint64(),
ContractAddress: contractAddress,
Signature: signature,
Signature: signatureHash,
Page: queryParams.Page,
Limit: queryParams.Limit,
TotalItems: 0, // TODO: Implement total items count
TotalItems: 0,
TotalPages: 0, // TODO: Implement total pages count
},
Data: result.Data,
Aggregations: result.Aggregates,
Data: nil,
Aggregations: nil,
}

// If aggregates or groupings are specified, retrieve them
if len(queryParams.Aggregates) > 0 || len(queryParams.GroupBy) > 0 {
qf.Aggregates = queryParams.Aggregates
qf.GroupBy = queryParams.GroupBy

aggregatesResult, err := mainStorage.GetAggregations("transactions", qf)
if err != nil {
log.Error().Err(err).Msg("Error querying aggregates")
api.InternalErrorHandler(c)
return
}
queryResult.Aggregations = aggregatesResult.Aggregates
queryResult.Meta.TotalItems = len(aggregatesResult.Aggregates)
} else {
// Retrieve logs data
transactionsResult, err := mainStorage.GetTransactions(qf)
if err != nil {
log.Error().Err(err).Msg("Error querying tran")
api.InternalErrorHandler(c)
return
}
queryResult.Data = transactionsResult.Data
queryResult.Meta.TotalItems = len(transactionsResult.Data)
}

c.JSON(http.StatusOK, response)
c.JSON(http.StatusOK, queryResult)
}
Loading

0 comments on commit 18234d0

Please sign in to comment.