Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(BEDS-875) Add rewards chart history based on premium perks #1220

Open
wants to merge 6 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/pkg/api/data_access/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (d *DummyService) GetValidatorDashboardGroupRewards(ctx context.Context, da
return getDummyStruct[t.VDBGroupRewardsData](ctx)
}

func (d *DummyService) GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, protocolModes t.VDBProtocolModes) (*t.ChartData[int, decimal.Decimal], error) {
func (d *DummyService) GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, groupIds []int64, protocolModes t.VDBProtocolModes, aggregation enums.ChartAggregation, afterTs uint64, beforeTs uint64) (*t.ChartData[int, decimal.Decimal], error) {
return getDummyStruct[t.ChartData[int, decimal.Decimal]](ctx)
}

Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/api/data_access/vdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type ValidatorDashboardRepository interface {

GetValidatorDashboardRewards(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBRewardsColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBRewardsTableRow, *t.Paging, error)
GetValidatorDashboardGroupRewards(ctx context.Context, dashboardId t.VDBId, groupId int64, epoch uint64, protocolModes t.VDBProtocolModes) (*t.VDBGroupRewardsData, error)
GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, protocolModes t.VDBProtocolModes) (*t.ChartData[int, decimal.Decimal], error)
GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, groupIds []int64, protocolModes t.VDBProtocolModes, aggregation enums.ChartAggregation, afterTs uint64, beforeTs uint64) (*t.ChartData[int, decimal.Decimal], error)

GetValidatorDashboardDuties(ctx context.Context, dashboardId t.VDBId, epoch uint64, groupId int64, cursor string, colSort t.Sort[enums.VDBDutiesColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBEpochDutiesTableRow, *t.Paging, error)

Expand Down
238 changes: 148 additions & 90 deletions backend/pkg/api/data_access/vdb_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
"github.com/doug-martin/goqu/v9/exp"
"github.com/gobitfly/beaconchain/pkg/api/enums"
t "github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
Expand Down Expand Up @@ -705,37 +706,88 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex
return ret, nil
}

func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, protocolModes t.VDBProtocolModes) (*t.ChartData[int, decimal.Decimal], error) {
func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, groupIds []int64, protocolModes t.VDBProtocolModes, aggregation enums.ChartAggregation, afterTs uint64, beforeTs uint64) (*t.ChartData[int, decimal.Decimal], error) {
// @DATA-ACCESS incorporate protocolModes
// bar chart for the CL and EL rewards for each group for each epoch.
// NO series for all groups combined except if AggregateGroups is true.
// series id is group id, series property is 'cl' or 'el'
ret := &t.ChartData[int, decimal.Decimal]{}

wg := errgroup.Group{}
if len(groupIds) == 0 {
return ret, nil
}

latestFinalizedEpoch := cache.LatestFinalizedEpoch.Get()
const epochLookBack = 224
startEpoch := uint64(0)
if latestFinalizedEpoch > epochLookBack {
startEpoch = latestFinalizedEpoch - epochLookBack
var err error

var dataTable exp.LiteralExpression
dataColumn := goqu.C("t")
switch aggregation {
case enums.IntervalEpoch:
dataTable = goqu.L("validator_dashboard_data_epoch AS e")
dataColumn = goqu.C("epoch_timestamp")
case enums.IntervalHourly:
dataTable = goqu.L("validator_dashboard_data_hourly AS e FINAL")
case enums.IntervalDaily:
dataTable = goqu.L("validator_dashboard_data_daily AS e FINAL")
case enums.IntervalWeekly:
dataTable = goqu.L("validator_dashboard_data_weekly AS e FINAL")
default:
return nil, fmt.Errorf("unexpected aggregation type: %v", aggregation)
}

requestedAllGroups := dashboardId.AggregateGroups
for _, groupId := range groupIds {
if groupId == t.AllGroups {
// note: requesting all groups is only convenience on api level, this will NOT result in a "total" series as it wouldn't make sense for this endpoint
requestedAllGroups = true
break
}
}

// ------------------------------------------------------------------------------------------------------------------
// Build the query that serves as base for both the main and EL rewards queries
// CL
rewardsDs := goqu.Dialect("postgres").
Select(
goqu.L("e.epoch"),
goqu.L(`SUM(COALESCE(e.attestations_reward, 0) + COALESCE(e.blocks_cl_reward, 0) + COALESCE(e.sync_reward, 0)) AS cl_rewards`)).
From(goqu.L("validator_dashboard_data_epoch e")).
With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)).
Where(goqu.L("e.epoch_timestamp >= fromUnixTimestamp(?)", utils.EpochToTime(startEpoch).Unix()))
Select(goqu.L(`SUM(COALESCE(e.attestations_reward, 0) + COALESCE(e.blocks_cl_reward, 0) + COALESCE(e.sync_reward, 0)) AS cl_rewards`)).
From(dataTable).
With("validators", goqu.Dialect("postgres").
From(goqu.T("users_val_dashboards_validators")).
Select(
goqu.I("validator_index"),
goqu.I("group_id"),
).
Where(
goqu.I("dashboard_id").Eq(dashboardId.Id),
goqu.Or(
goqu.I("group_id").In(groupIds),
goqu.V(requestedAllGroups),
),
),
).
Where(
dataColumn.Between(goqu.Range(
goqu.L("fromUnixTimestamp(?)", afterTs),
goqu.L("fromUnixTimestamp(?)", beforeTs))),
)

if aggregation == enums.IntervalEpoch {
rewardsDs = rewardsDs.
SelectAppend(goqu.L("e.epoch").As("epoch_start")).
SelectAppend(goqu.L("e.epoch").As("epoch_end"))
} else {
rewardsDs = rewardsDs.
SelectAppend(goqu.L("epoch_start")).
SelectAppend(goqu.L("epoch_end"))
}

// EL
elDs := goqu.Dialect("postgres").
Select(
goqu.L("b.epoch"),
goqu.L("epoch_start"),
// goqu.L("epoch_end"), not needed
goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")).
From(goqu.L("users_val_dashboards_validators v")).
LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("v.validator_index = b.proposer AND b.status = '1'"))).
RightJoin(goqu.L("blocks b"), goqu.On(goqu.L("v.validator_index = b.proposer AND b.status = '1'"))).
LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))).
LeftJoin(
goqu.Lateral(goqu.Dialect("postgres").
Expand All @@ -746,8 +798,16 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex
Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")).
GroupBy("exec_block_hash")).As("rb"),
goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")),
).
Where(goqu.L("b.epoch >= ?", startEpoch))
)

// grouping, ordering
rewardsDs = rewardsDs.
GroupBy(goqu.L("epoch_start, epoch_end")).
Order(goqu.L("epoch_start").Asc())

elDs = elDs.
GroupBy(goqu.L("epoch_start, epoch_end")).
Order(goqu.L("epoch_start").Asc())

if dashboardId.Validators == nil {
rewardsDs = rewardsDs.
Expand All @@ -758,149 +818,147 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex

if dashboardId.AggregateGroups {
rewardsDs = rewardsDs.
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)).
GroupBy(goqu.L("e.epoch")).
Order(goqu.L("e.epoch").Asc())
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId))
elDs = elDs.
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)).
GroupBy(goqu.L("b.epoch")).
Order(goqu.L("b.epoch").Asc())
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId))
} else {
rewardsDs = rewardsDs.
SelectAppend(goqu.L("v.group_id AS result_group_id")).
GroupBy(goqu.L("e.epoch"), goqu.L("result_group_id")).
Order(goqu.L("e.epoch").Asc(), goqu.L("result_group_id").Asc())
GroupByAppend(goqu.L("result_group_id")).
OrderAppend(goqu.L("result_group_id").Asc())
elDs = elDs.
SelectAppend(goqu.L("v.group_id AS result_group_id")).
GroupBy(goqu.L("b.epoch"), goqu.L("result_group_id")).
Order(goqu.L("b.epoch").Asc(), goqu.L("result_group_id").Asc())
GroupByAppend(goqu.L("result_group_id")).
OrderAppend(goqu.L("result_group_id").Asc())
}
} else {
// In case a list of validators is provided set the group to the default id
rewardsDs = rewardsDs.
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)).
Where(goqu.L("e.validator_index IN ?", dashboardId.Validators)).
GroupBy(goqu.L("e.epoch")).
Order(goqu.L("e.epoch").Asc())
Where(goqu.L("e.validator_index IN ?", dashboardId.Validators))
elDs = elDs.
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)).
Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))).
GroupBy(goqu.L("b.epoch")).
Order(goqu.L("b.epoch").Asc())
Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators)))
}

// ------------------------------------------------------------------------------------------------------------------
// Build the main query and get the data
queryResult := []struct {
Epoch uint64 `db:"epoch"`
GroupId uint64 `db:"result_group_id"`
ClRewards int64 `db:"cl_rewards"`
EpochStart uint64 `db:"epoch_start"`
EpochEnd uint64 `db:"epoch_end"`
GroupId uint64 `db:"result_group_id"`
ClRewards int64 `db:"cl_rewards"`
}{}

wg.Go(func() error {
query, args, err := rewardsDs.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}
query, args, err := rewardsDs.Prepared(true).ToSQL()
if err != nil {
return nil, fmt.Errorf("error preparing query: %w", err)
}

err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rewards chart data: %w", err)
}
return nil
})
err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return nil, fmt.Errorf("error retrieving rewards chart data: %w", err)
}

if len(queryResult) == 0 {
return ret, nil
}

var epochStarts, epochEnds []uint64
for _, res := range queryResult {
epochStarts = append(epochStarts, res.EpochStart)
epochEnds = append(epochEnds, res.EpochEnd)
}
elDs = elDs.
With("epoch_ranges(epoch_start, epoch_end)", goqu.L("(SELECT * FROM unnest(?::int[], ?::int[]))", pq.Array(epochStarts), pq.Array(epochEnds))).
InnerJoin(goqu.L("epoch_ranges"), goqu.On(goqu.L("b.epoch BETWEEN epoch_ranges.epoch_start AND epoch_ranges.epoch_end"))).
Where(goqu.L("b.epoch BETWEEN ? AND ?", epochStarts[0], epochEnds[len(epochEnds)-1]))

// ------------------------------------------------------------------------------------------------------------------
// Get the EL rewards
elRewards := make(map[uint64]map[uint64]decimal.Decimal)
wg.Go(func() error {
elQueryResult := []struct {
Epoch uint64 `db:"epoch"`
GroupId uint64 `db:"result_group_id"`
ElRewards decimal.Decimal `db:"el_rewards"`
}{}

query, args, err := elDs.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}
elQueryResult := []struct {
EpochStart uint64 `db:"epoch_start"`
GroupId uint64 `db:"result_group_id"`
ElRewards decimal.Decimal `db:"el_rewards"`
}{}

err = d.readerDb.SelectContext(ctx, &elQueryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving el rewards data for rewards chart: %w", err)
}
query, args, err = elDs.Prepared(true).ToSQL()
if err != nil {
return nil, fmt.Errorf("error preparing query: %w", err)
}

for _, entry := range elQueryResult {
if _, ok := elRewards[entry.Epoch]; !ok {
elRewards[entry.Epoch] = make(map[uint64]decimal.Decimal)
}
elRewards[entry.Epoch][entry.GroupId] = entry.ElRewards
err = d.readerDb.SelectContext(ctx, &elQueryResult, query, args...)
if err != nil {
return nil, fmt.Errorf("error retrieving el rewards data for rewards chart: %w", err)
}

for _, entry := range elQueryResult {
if _, ok := elRewards[entry.EpochStart]; !ok {
elRewards[entry.EpochStart] = make(map[uint64]decimal.Decimal)
}
return nil
})
elRewards[entry.EpochStart][entry.GroupId] = entry.ElRewards
}

err := wg.Wait()
if err != nil {
return nil, fmt.Errorf("error retrieving validator dashboard rewards chart data: %w", err)
}

// ------------------------------------------------------------------------------------------------------------------
// Create a map structure to store the data
epochData := make(map[uint64]map[uint64]t.ClElValue[decimal.Decimal])
epochList := make([]uint64, 0)
epochStartData := make(map[uint64]map[int]t.ClElValue[decimal.Decimal])
epochStartList := make([]uint64, 0)

for _, res := range queryResult {
if _, ok := epochData[res.Epoch]; !ok {
epochData[res.Epoch] = make(map[uint64]t.ClElValue[decimal.Decimal])
epochList = append(epochList, res.Epoch)
if _, ok := epochStartData[res.EpochStart]; !ok {
epochStartData[res.EpochStart] = make(map[int]t.ClElValue[decimal.Decimal])
epochStartList = append(epochStartList, res.EpochStart)
}

epochData[res.Epoch][res.GroupId] = t.ClElValue[decimal.Decimal]{
El: elRewards[res.Epoch][res.GroupId],
epochStartData[res.EpochStart][int(res.GroupId)] = t.ClElValue[decimal.Decimal]{
El: elRewards[res.EpochStart][res.GroupId],
Cl: utils.GWeiToWei(big.NewInt(res.ClRewards)),
}
}

// Get the list of groups
// It should be identical for all epochs
var groupList []uint64
for _, groupData := range epochData {
// It should be identical for all epochs (in most cases)
var groupList []int
for _, groupData := range epochStartData {
for groupId := range groupData {
groupList = append(groupList, groupId)
}
break
}
slices.Sort(groupList)

// Create the result
var result t.ChartData[int, decimal.Decimal]

// Create the series structure
propertyNames := []string{"el", "cl"}
for _, groupId := range groupList {
for _, propertyName := range propertyNames {
result.Series = append(result.Series, t.ChartSeries[int, decimal.Decimal]{
Id: int(groupId),
ret.Series = append(ret.Series, t.ChartSeries[int, decimal.Decimal]{
Id: groupId,
Property: propertyName,
})
}
}

// Fill the epoch data
for _, epoch := range epochList {
result.Categories = append(result.Categories, epoch)
for idx, series := range result.Series {
for _, epoch := range epochStartList {
ret.Categories = append(ret.Categories, uint64(utils.EpochToTime(epoch).Unix()))
for idx, series := range ret.Series {
if series.Property == "el" {
result.Series[idx].Data = append(result.Series[idx].Data, epochData[epoch][uint64(series.Id)].El)
ret.Series[idx].Data = append(ret.Series[idx].Data, epochStartData[epoch][series.Id].El)
} else if series.Property == "cl" {
result.Series[idx].Data = append(result.Series[idx].Data, epochData[epoch][uint64(series.Id)].Cl)
ret.Series[idx].Data = append(ret.Series[idx].Data, epochStartData[epoch][series.Id].Cl)
} else {
return nil, fmt.Errorf("unknown series property: %s", series.Property)
}
}
}

return &result, nil
return ret, nil
}

func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, dashboardId t.VDBId, epoch uint64, groupId int64, cursor string, colSort t.Sort[enums.VDBDutiesColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBEpochDutiesTableRow, *t.Paging, error) {
Expand Down
4 changes: 0 additions & 4 deletions backend/pkg/api/data_access/vdb_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,13 +1026,9 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex

var queryResults []*t.VDBValidatorSummaryChartRow

containsGroups := false
requestedGroupsMap := make(map[int64]bool)
for _, groupId := range groupIds {
requestedGroupsMap[groupId] = true
if !containsGroups && groupId >= 0 {
containsGroups = true
}
}

totalLineRequested := requestedGroupsMap[t.AllGroups]
Expand Down
Loading
Loading