Skip to content

Commit

Permalink
fix: fixing queue processing that gets stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Dec 13, 2024
1 parent fb02941 commit f26edff
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ var runCmd = &cobra.Command{

rcq := rewardsCalculatorQueue.NewRewardsCalculatorQueue(rc, l)

go rcq.Process()

p := pipeline.NewPipeline(fetchr, idxr, mds, sm, rc, rcq, cfg, sdc, l)

// Create new sidecar instance
Expand Down
5 changes: 4 additions & 1 deletion pkg/rewardsCalculatorQueue/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage
default:
response.Error = fmt.Errorf("unknown calculation type %s", msg.Data.CalculationType)
}

if msg.ResponseChan == nil {
rcq.logger.Sugar().Errorw("No response channel for rewards calculation message", "data", msg.Data)
return
}
msg.ResponseChan <- response
}
7 changes: 4 additions & 3 deletions pkg/rewardsCalculatorQueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ func NewRewardsCalculatorQueue(rc *rewards.RewardsCalculator, logger *zap.Logger
queue := &RewardsCalculatorQueue{
logger: logger,
rewardsCalculator: rc,
queue: make(chan *RewardsCalculationMessage),
// allow the queue to buffer up to 100 messages
queue: make(chan *RewardsCalculationMessage, 100),
}
go queue.Process()

return queue
}

Expand All @@ -38,8 +37,10 @@ func (rcq *RewardsCalculatorQueue) EnqueueAndWait(ctx context.Context, data Rewa

select {
case response := <-responseChan:
rcq.logger.Sugar().Infow("Received rewards calculation response")
return response.Data, response.Error
case <-ctx.Done():
rcq.logger.Sugar().Infow("Received context.Done()")
return nil, ctx.Err()
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/rpcServer/rewardsHandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *sidecarV1.Ge
return nil, status.Error(codes.InvalidArgument, "snapshot date is required")
}

rpc.Logger.Sugar().Infow("Requesting rewards generation for snapshot date",
zap.String("cutoffDate", cutoffDate),
)
_, err := rpc.rewardsQueue.EnqueueAndWait(context.Background(), rewardsCalculatorQueue.RewardsCalculationData{
CalculationType: rewardsCalculatorQueue.RewardsCalculationType_CalculateRewards,
CutoffDate: cutoffDate,
Expand All @@ -63,6 +66,9 @@ func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *sidecarV1.Ge
return nil, status.Error(codes.Internal, err.Error())
}

rpc.Logger.Sugar().Infow("Getting max snapshot for cutoff date",
zap.String("cutoffDate", cutoffDate),
)
rewardsCalcEndDate, err := rpc.rewardsCalculator.GetMaxSnapshotDateForCutoffDate(cutoffDate)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand Down

0 comments on commit f26edff

Please sign in to comment.