Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add RPC to generate staker-operators data for previously calculated rewards #130

Merged
merged 7 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1
github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241122223729-1734c60ac737
github.com/Layr-Labs/protocol-apis v1.0.0-rc.1.0.20241204030420-83d31161930e
github.com/ethereum/go-ethereum v1.14.9
github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1
github.com/google/uuid v1.6.0
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 h1:Blb4AE+jC/vddV71w4/MQA
github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13/go.mod h1:PD/HoyzZjxDw1tAcZw3yD0yGddo+yhmwQAi+lk298r4=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241122223729-1734c60ac737 h1:I/0YAw2ue150YuLNavErIQ4t7yoTDuH3nqZkOS7RTjg=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241122223729-1734c60ac737/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241203213718-bda8083a30b9 h1:tvBtFPGqMw1Hwp++cGP/otFTC2OQrJMtVjkH/vNbttE=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241203213718-bda8083a30b9/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241203213920-e35bed7723dc h1:P7S3ijgAQ4Xdpdocfl7rGgaTsPAurHsSCYHfhWm9au4=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241203213920-e35bed7723dc/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241203214053-44b580f4ea84 h1:BLF8GHMmXSC1YtjlqBRC7pVBDVK0QwEJpwXYjZO7t1w=
github.com/Layr-Labs/protocol-apis v0.1.0-beta.3.0.20241203214053-44b580f4ea84/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v1.0.0-rc.1 h1:N3OAsdZ5V/QVAjsJbJa1kruocoi50jfLquyVk4bL9HQ=
github.com/Layr-Labs/protocol-apis v1.0.0-rc.1/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v1.0.0-rc.1.0.20241203225729-619c724a75e3 h1:kc3jPZzgTsXWTPDzEi9iB9DAc2+soPHsGd41KTVhC04=
github.com/Layr-Labs/protocol-apis v1.0.0-rc.1.0.20241203225729-619c724a75e3/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v1.0.0-rc.1.0.20241204030420-83d31161930e h1:h6ptdsDTKiTldAyel+XXfbusrarpF2+arhPlaFUeq7M=
github.com/Layr-Labs/protocol-apis v1.0.0-rc.1.0.20241204030420-83d31161930e/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
Expand Down
98 changes: 94 additions & 4 deletions pkg/rewards/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func (rc *RewardsCalculator) releaseGenerationLock() {
rc.isGenerating.Store(false)
}

type RewardsCalculationInProgressError struct{}
type ErrRewardsCalculationInProgress struct{}

func (e *RewardsCalculationInProgressError) Error() string {
func (e *ErrRewardsCalculationInProgress) Error() string {
return "rewards calculation already in progress"
}

Expand All @@ -71,7 +71,7 @@ func (e *RewardsCalculationInProgressError) Error() string {
// If there is no previous DistributionRoot, the rewards are calculated from EigenLayer Genesis.
func (rc *RewardsCalculator) calculateRewardsForSnapshotDate(snapshotDate string) error {
if rc.GetIsGenerating() {
err := &RewardsCalculationInProgressError{}
err := &ErrRewardsCalculationInProgress{}
rc.logger.Sugar().Infow(err.Error())
return err
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func (rc *RewardsCalculator) CalculateRewardsForSnapshotDate(snapshotDate string
go func() {
for {
err := rc.calculateRewardsForSnapshotDate(snapshotDate)
if errors.Is(err, &RewardsCalculationInProgressError{}) {
if errors.Is(err, &ErrRewardsCalculationInProgress{}) {
rc.logger.Sugar().Infow("Rewards calculation already in progress, sleeping", zap.String("snapshotDate", snapshotDate))
time.Sleep(1 * time.Minute)
} else {
Expand Down Expand Up @@ -239,6 +239,96 @@ func (rc *RewardsCalculator) GetMaxSnapshotDateForCutoffDate(cutoffDate string)
return maxSnapshotStr, nil
}

func (rc *RewardsCalculator) BackfillAllStakerOperators() error {
var generatedSnapshots []storage.GeneratedRewardsSnapshots
query := `select * from generated_rewards_snapshots order by snapshot_date asc`
res := rc.grm.Raw(query).Scan(&generatedSnapshots)
if res.Error != nil {
rc.logger.Sugar().Errorw("Failed to get generated snapshots", "error", res.Error)
return res.Error
}

// First acquire a lock. If we cant, return and let the caller retry.
rc.logger.Sugar().Infow("Acquiring rewards generation lock for staker operator backfill")
if rc.GetIsGenerating() {
err := &ErrRewardsCalculationInProgress{}
rc.logger.Sugar().Infow(err.Error())
return err
}
rc.acquireGenerationLock()
defer rc.releaseGenerationLock()

// take the largest snapshot date and generate the snapshot tables, which will be all-inclusive
latestSnapshotDate := generatedSnapshots[len(generatedSnapshots)-1].SnapshotDate

rc.logger.Sugar().Infow("Generating snapshot data for backfill", "snapshotDate", latestSnapshotDate)
if err := rc.generateSnapshotData(latestSnapshotDate); err != nil {
rc.logger.Sugar().Errorw("Failed to generate snapshot data", "error", err)
return err
}

// iterate over each snapshot and generate the staker operators table data for each
for _, snapshot := range generatedSnapshots {
rc.logger.Sugar().Infow("Generating staker operators table for snapshot", "snapshotDate", snapshot.SnapshotDate)
if err := rc.sog.GenerateStakerOperatorsTable(snapshot.SnapshotDate); err != nil {
rc.logger.Sugar().Errorw("Failed to generate staker operators table", "error", err)
return err
}
}
return nil
}

// GenerateStakerOperatorsTableForPastSnapshot generates the staker operators table for a past snapshot date, OR
// generates the rewards and the related staker-operator table data if the snapshot is greater than the latest snapshot.
func (rc *RewardsCalculator) GenerateStakerOperatorsTableForPastSnapshot(cutoffDate string) error {
// find the first snapshot that is >= to the provided cutoff date
var generatedSnapshot storage.GeneratedRewardsSnapshots
query := `select * from generated_rewards_snapshots where snapshot_date >= ? order by snapshot_date asc limit 1`
res := rc.grm.Raw(query, cutoffDate).Scan(&generatedSnapshot)
if res.Error != nil && errors.Is(res.Error, gorm.ErrRecordNotFound) {
rc.logger.Sugar().Errorw("Failed to get generated snapshot", "error", res.Error)
return res.Error
}
if res.RowsAffected == 0 || errors.Is(res.Error, gorm.ErrRecordNotFound) {
rc.logger.Sugar().Infow("No snapshot found for cutoff date, rewards need to be calculated", "cutoffDate", cutoffDate)
return rc.CalculateRewardsForSnapshotDate(cutoffDate)
}

// since rewards are already calculated and the corresponding tables are tied to the snapshot date,
// we need to use the snapshot date from the generated snapshot to generate the staker operators table.
//
// Since this date is larger, and the insert into the staker-operators table discards duplicates,
// this should be safe to do.
cutoffDate = generatedSnapshot.SnapshotDate

// Since this was a previous calculation, we have the date-suffixed gold tables, but not necessarily the snapshot tables.
// In order for our calculations to work, we need to generate the snapshot tables for the cutoff date.
//
// First check to see if there is already a rewards generation in progress. If there is, return an error and let the caller try again.
if rc.GetIsGenerating() {
err := &ErrRewardsCalculationInProgress{}
rc.logger.Sugar().Infow(err.Error())
return err
}

// Acquire the generation lock and proceed with generating snapshot tables and then the staker operators table.
rc.acquireGenerationLock()
defer rc.releaseGenerationLock()

rc.logger.Sugar().Infow("Acquired rewards generation lock", "cutoffDate", cutoffDate)

if err := rc.generateSnapshotData(cutoffDate); err != nil {
rc.logger.Sugar().Errorw("Failed to generate snapshot data", "error", err)
return err
}

if err := rc.sog.GenerateStakerOperatorsTable(cutoffDate); err != nil {
rc.logger.Sugar().Errorw("Failed to generate staker operators table", "error", err)
return err
}
return nil
}

type Reward struct {
Earner string
Token string
Expand Down
2 changes: 1 addition & 1 deletion pkg/rewards/rewards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func Test_RewardsCalculatorLock(t *testing.T) {
time.Sleep(1 * time.Second)
t.Logf("Attempting to calculate second rewards for snapshot date: 2024-08-02")
err = rc.calculateRewardsForSnapshotDate("2024-08-02")
assert.True(t, errors.Is(err, &RewardsCalculationInProgressError{}))
assert.True(t, errors.Is(err, &ErrRewardsCalculationInProgress{}))

t.Cleanup(func() {
postgres.TeardownTestDatabase(dbFileName, cfg, grm, l)
Expand Down
1 change: 1 addition & 0 deletions pkg/rewards/stakerOperators/6_stakerOperatorStaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (sog *StakerOperatorsGenerator) GenerateAndInsert6StakerOperatorStaging(cut
zap.String("cutoffDate", cutoffDate),
zap.Error(res.Error),
)
return res.Error
}

return nil
Expand Down
30 changes: 30 additions & 0 deletions pkg/rpcServer/rewardsHandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package rpcServer

import (
"context"
"errors"
sidecarV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1"
"github.com/Layr-Labs/sidecar/pkg/rewards"
"github.com/Layr-Labs/sidecar/pkg/utils"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -78,6 +80,34 @@ func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *sidecarV1.Ge
}, nil
}

func (rpc *RpcServer) GenerateStakerOperators(ctx context.Context, req *sidecarV1.GenerateStakerOperatorsRequest) (*sidecarV1.GenerateStakerOperatorsResponse, error) {
cutoffDate := req.GetCutoffDate()

if cutoffDate == "" {
return nil, status.Error(codes.InvalidArgument, "snapshot date is required")
}

err := rpc.rewardsCalculator.GenerateStakerOperatorsTableForPastSnapshot(cutoffDate)
if err != nil {
if errors.Is(err, &rewards.ErrRewardsCalculationInProgress{}) {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
return &sidecarV1.GenerateStakerOperatorsResponse{}, nil
}

func (rpc *RpcServer) BackfillStakerOperators(ctx context.Context, req *sidecarV1.BackfillStakerOperatorsRequest) (*sidecarV1.BackfillStakerOperatorsResponse, error) {
err := rpc.rewardsCalculator.BackfillAllStakerOperators()
if err != nil {
if errors.Is(err, &rewards.ErrRewardsCalculationInProgress{}) {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
return &sidecarV1.BackfillStakerOperatorsResponse{}, nil
}

func (rpc *RpcServer) GetRewardsForSnapshot(ctx context.Context, req *sidecarV1.GetRewardsForSnapshotRequest) (*sidecarV1.GetRewardsForSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "method GetRewardsForSnapshot not implemented")
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/sidecar/blockIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Sidecar) ProcessNewBlocks(ctx context.Context) error {

// If the latest tip is behind what we have indexed, sleep for a bit
if latestTip < uint64(latestIndexedBlock) {
s.Logger.Sugar().Infow("Latest tip is behind latest indexed block, sleeping for a bit")
s.Logger.Sugar().Debugw("Latest tip is behind latest indexed block, sleeping for a bit")
time.Sleep(BLOCK_POLL_INTERVAL)
continue
}
Expand Down Expand Up @@ -155,6 +155,10 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {
currentTip := atomic.Uint64{}
currentTip.Store(blockNumber)

indexComplete := atomic.Bool{}
indexComplete.Store(false)
defer indexComplete.Store(true)

// Every 10 seconds, check to see if the current tip has changed while the backfill/sync
// process is still running. If it has changed, update the value which will extend the loop
// to include the newly discovered blocks.
Expand All @@ -165,6 +169,10 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {
s.Logger.Sugar().Infow("Shutting down block listener...")
return
}
if indexComplete.Load() {
s.Logger.Sugar().Infow("Indexing complete, shutting down tip listener")
return
}
latestTip, err := s.EthereumClient.GetBlockNumberUint64(ctx)
if err != nil {
s.Logger.Sugar().Errorw("Failed to get latest tip", zap.Error(err))
Expand Down
Loading