Skip to content

Commit

Permalink
feat: separate storage method for aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
catalyst17 committed Oct 23, 2024
1 parent 2969e99 commit 8e4c390
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 110 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type QueryResponse struct {
// @Description Query result data
Data interface{} `json:"data,omitempty"`
// @Description Aggregation results
Aggregations map[string]string `json:"aggregations,omitempty"`
Aggregations []map[string]interface{} `json:"aggregations,omitempty"`
}

func writeError(w http.ResponseWriter, message string, code int) {
Expand Down
54 changes: 37 additions & 17 deletions internal/handlers/logs_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,39 +133,59 @@ func handleLogsRequest(c *gin.Context, contractAddress, signature string) {
return
}

logs, err := mainStorage.GetLogs(storage.QueryFilter{
// Prepare the QueryFilter
qf := storage.QueryFilter{
FilterParams: queryParams.FilterParams,
GroupBy: queryParams.GroupBy,
ContractAddress: contractAddress,
Signature: signatureHash,
ChainId: chainId,
SortBy: queryParams.SortBy,
SortOrder: queryParams.SortOrder,
Page: queryParams.Page,
Limit: queryParams.Limit,
Aggregates: queryParams.Aggregates,
ContractAddress: contractAddress,
Signature: signatureHash,
ChainId: chainId,
})
if err != nil {
log.Error().Err(err).Msg("Error querying logs")
api.InternalErrorHandler(c)
return
}

response := api.QueryResponse{
// Initialize the QueryResult
queryResult := api.QueryResponse{
Meta: api.Meta{
ChainId: chainId.Uint64(),
ContractAddress: contractAddress,
Signature: signatureHash,
Page: queryParams.Page,
Limit: queryParams.Limit,
TotalItems: len(logs.Data),
TotalPages: 0, // TODO: Implement total pages count
TotalItems: 0,
TotalPages: 0, // Implement total pages count if needed
},
Data: logs.Data,
Aggregations: logs.Aggregates,
Data: nil,
Aggregations: nil,
}

// If aggregates are specified, retrieve them
if len(queryParams.Aggregates) > 0 {
qf.Aggregates = queryParams.Aggregates
qf.GroupBy = queryParams.GroupBy

aggregatesResult, err := mainStorage.GetAggregations("logs", qf)
if err != nil {
log.Error().Err(err).Msg("Error querying aggregates")
api.InternalErrorHandler(c)
return
}
queryResult.Aggregations = aggregatesResult.Aggregates
queryResult.Meta.TotalItems = len(aggregatesResult.Aggregates)
} else {
// Retrieve logs data
logsResult, err := mainStorage.GetLogs(qf)
if err != nil {
log.Error().Err(err).Msg("Error querying logs")
api.InternalErrorHandler(c)
return
}
queryResult.Data = logsResult.Data
queryResult.Meta.TotalItems = len(logsResult.Data)
}

sendJSONResponse(c, response)
sendJSONResponse(c, queryResult)
}

func getMainStorage() (storage.IMainStorage, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/transactions_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func handleTransactionsRequest(c *gin.Context, contractAddress, signature string
TotalPages: 0, // TODO: Implement total pages count
},
Data: result.Data,
Aggregations: result.Aggregates,
Aggregations: nil,
}

c.JSON(http.StatusOK, response)
Expand Down
141 changes: 61 additions & 80 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,66 @@ func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (QueryResult[commo
return executeQuery[common.Transaction](c, "transactions", columns, qf, scanTransaction)
}

func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[map[string]interface{}], error) {
var columns string
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], error) {
columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
return executeQuery[common.Log](c, "logs", columns, qf, scanLog)
}

func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) {
// Build the SELECT clause with aggregates
columns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ")
query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table)

// Apply filters
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
query = addFilterParams("chain_id", qf.ChainId.String(), query)
}
query = addContractAddress(table, query, qf.ContractAddress)

if qf.Signature != "" {
query += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature)
}

if len(qf.GroupBy) > 0 || len(qf.Aggregates) > 0 {
// Build columns for SELECT when grouping or aggregating
selectColumns := append(qf.GroupBy, qf.Aggregates...)
columns = strings.Join(selectColumns, ", ")
} else {
// Default columns when not grouping
columns = "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
for key, value := range qf.FilterParams {
query = addFilterParams(key, strings.ToLower(value), query)
}

return executeQuery[map[string]interface{}](c, "logs", columns, qf, scanRowToMap)
// Add GROUP BY clause if specified
if len(qf.GroupBy) > 0 {
groupByColumns := strings.Join(qf.GroupBy, ", ")
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
}

// Execute the query
rows, err := c.conn.Query(context.Background(), query)
if err != nil {
return QueryResult[interface{}]{}, err
}
defer rows.Close()

// Collect results
var aggregates []map[string]interface{}
for rows.Next() {
columns := rows.Columns()
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range columns {
valuePtrs[i] = &values[i]
}

if err := rows.Scan(valuePtrs...); err != nil {
return QueryResult[interface{}]{}, err
}

result := make(map[string]interface{})
for i, col := range columns {
result[col] = values[i]
}

aggregates = append(aggregates, result)
}

return QueryResult[interface{}]{Data: nil, Aggregates: aggregates}, nil
}

func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) {
Expand All @@ -316,8 +363,7 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
defer rows.Close()

queryResult := QueryResult[T]{
Data: []T{},
Aggregates: map[string]string{},
Data: []T{},
}

for rows.Next() {
Expand All @@ -328,21 +374,13 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
queryResult.Data = append(queryResult.Data, item)
}

if len(qf.Aggregates) > 0 {
aggregates, err := c.executeAggregateQuery(table, qf)
if err != nil {
return queryResult, err
}
queryResult.Aggregates = aggregates
}

return queryResult, nil
}

func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) string {
query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table)

if qf.ChainId.Sign() > 0 {
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
query = addFilterParams("chain_id", qf.ChainId.String(), query)
}
query = addContractAddress(table, query, qf.ContractAddress)
Expand All @@ -356,12 +394,6 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter)
query = addFilterParams(key, strings.ToLower(value), query)
}

// Add GROUP BY clause if specified
if len(qf.GroupBy) > 0 {
groupByColumns := strings.Join(qf.GroupBy, ", ")
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
}

// Add ORDER BY clause
if qf.SortBy != "" {
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
Expand All @@ -371,9 +403,8 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter)
if qf.Page > 0 && qf.Limit > 0 {
offset := (qf.Page - 1) * qf.Limit
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
} else {
// Add limit clause
query += getLimitClause(int(qf.Limit))
} else if qf.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
}

return query
Expand Down Expand Up @@ -433,35 +464,6 @@ func getTopicValueFormat(topic string) string {
return result
}

func (c *ClickHouseConnector) executeAggregateQuery(table string, qf QueryFilter) (map[string]string, error) {
aggregateQuery := "SELECT " + strings.Join(qf.Aggregates, ", ") +
fmt.Sprintf(" FROM %s.%s WHERE is_deleted = 0", c.cfg.Database, table)

if qf.ContractAddress != "" {
aggregateQuery += fmt.Sprintf(" AND address = '%s'", qf.ContractAddress)
}
if qf.Signature != "" {
aggregateQuery += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature)
}
for key, value := range qf.FilterParams {
aggregateQuery += fmt.Sprintf(" AND %s = '%s'", key, value)
}

row := c.conn.QueryRow(context.Background(), aggregateQuery)
aggregateResultsJSON, err := json.Marshal(row)
if err != nil {
return nil, fmt.Errorf("error marshaling aggregate results to JSON: %w", err)
}

var aggregateResultsMap map[string]string
err = json.Unmarshal(aggregateResultsJSON, &aggregateResultsMap)
if err != nil {
return nil, fmt.Errorf("error unmarshaling aggregate results JSON to map: %w", err)
}

return aggregateResultsMap, nil
}

func scanTransaction(rows driver.Rows) (common.Transaction, error) {
var tx common.Transaction
err := rows.Scan(
Expand Down Expand Up @@ -521,27 +523,6 @@ func scanLog(rows driver.Rows) (common.Log, error) {
return log, nil
}

func scanRowToMap(rows driver.Rows) (map[string]interface{}, error) {
columns := rows.Columns()
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))

for i := range columns {
valuePtrs[i] = &values[i]
}

if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}

result := make(map[string]interface{})
for i, col := range columns {
result[col] = values[i]
}

return result, nil
}

func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database)
if chainId.Sign() > 0 {
Expand Down
10 changes: 6 additions & 4 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ type QueryFilter struct {
Page int
Limit int
Offset int
Aggregates []string
Aggregates []string // e.g., ["COUNT(*) AS count", "SUM(amount) AS total_amount"]
FromAddress string
ContractAddress string
Signature string
}
type QueryResult[T any] struct {
// TODO: findout how to only allow Log/transaction arrays or split the result
Data []T `json:"data"`
Aggregates map[string]string `json:"aggregates"`
Data []T `json:"data"`
Aggregates []map[string]interface{} `json:"aggregates"`
}

type IStorage struct {
OrchestratorStorage IOrchestratorStorage
MainStorage IMainStorage
Expand All @@ -54,7 +55,8 @@ type IMainStorage interface {

GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
GetLogs(qf QueryFilter) (logs QueryResult[map[string]interface{}], err error)
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
/**
Expand Down
Loading

0 comments on commit 8e4c390

Please sign in to comment.