diff --git a/cmd/run.go b/cmd/run.go index 2ad8f543..c640c9e1 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -111,6 +111,8 @@ var runCmd = &cobra.Command{ rcq := rewardsCalculatorQueue.NewRewardsCalculatorQueue(rc, l) + go rcq.Process() + p := pipeline.NewPipeline(fetchr, idxr, mds, sm, rc, rcq, cfg, sdc, l) // Create new sidecar instance diff --git a/pkg/rewardsCalculatorQueue/handler.go b/pkg/rewardsCalculatorQueue/handler.go index ff15a8f7..05ab9350 100644 --- a/pkg/rewardsCalculatorQueue/handler.go +++ b/pkg/rewardsCalculatorQueue/handler.go @@ -42,6 +42,9 @@ func (rcq *RewardsCalculatorQueue) processMessage(msg *RewardsCalculationMessage default: response.Error = fmt.Errorf("unknown calculation type %s", msg.Data.CalculationType) } - + if msg.ResponseChan == nil { + rcq.logger.Sugar().Errorw("No response channel for rewards calculation message", "data", msg.Data) + return + } msg.ResponseChan <- response } diff --git a/pkg/rewardsCalculatorQueue/queue.go b/pkg/rewardsCalculatorQueue/queue.go index 36abb793..533cc5b1 100644 --- a/pkg/rewardsCalculatorQueue/queue.go +++ b/pkg/rewardsCalculatorQueue/queue.go @@ -11,10 +11,9 @@ func NewRewardsCalculatorQueue(rc *rewards.RewardsCalculator, logger *zap.Logger queue := &RewardsCalculatorQueue{ logger: logger, rewardsCalculator: rc, - queue: make(chan *RewardsCalculationMessage), + // allow the queue to buffer up to 100 messages + queue: make(chan *RewardsCalculationMessage, 100), } - go queue.Process() - return queue } @@ -38,8 +37,10 @@ func (rcq *RewardsCalculatorQueue) EnqueueAndWait(ctx context.Context, data Rewa select { case response := <-responseChan: + rcq.logger.Sugar().Infow("Received rewards calculation response") return response.Data, response.Error case <-ctx.Done(): + rcq.logger.Sugar().Infow("Received context.Done()") return nil, ctx.Err() } } diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index 0cd87ec5..499a1b4a 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -55,6 +55,9 @@ func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *sidecarV1.Ge return nil, status.Error(codes.InvalidArgument, "snapshot date is required") } + rpc.Logger.Sugar().Infow("Requesting rewards generation for snapshot date", + zap.String("cutoffDate", cutoffDate), + ) _, err := rpc.rewardsQueue.EnqueueAndWait(context.Background(), rewardsCalculatorQueue.RewardsCalculationData{ CalculationType: rewardsCalculatorQueue.RewardsCalculationType_CalculateRewards, CutoffDate: cutoffDate, @@ -63,6 +66,9 @@ func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *sidecarV1.Ge return nil, status.Error(codes.Internal, err.Error()) } + rpc.Logger.Sugar().Infow("Getting max snapshot for cutoff date", + zap.String("cutoffDate", cutoffDate), + ) rewardsCalcEndDate, err := rpc.rewardsCalculator.GetMaxSnapshotDateForCutoffDate(cutoffDate) if err != nil { return nil, status.Error(codes.Internal, err.Error())