From 093e3cb11f1187308ad427f7ba7dfd1c097cb9a6 Mon Sep 17 00:00:00 2001 From: artaasadi Date: Sat, 14 Dec 2024 18:17:52 +0100 Subject: [PATCH] fix: send status --- worker/worker.go | 56 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index f0f4370..4431145 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -3,7 +3,6 @@ package worker import ( "context" "encoding/json" - "errors" fmt "fmt" "github.com/nats-io/nats.go/jetstream" "github.com/opengovern/og-util/pkg/jq" @@ -65,11 +64,18 @@ func (w *Worker) Run(ctx context.Context) error { jetstream.PullMaxMessages(1), }, func(msg jetstream.Msg) { w.logger.Info("received a new job") - - defer msg.Ack() - - ctx, cancel := context.WithTimeoutCause(ctx, time.Minute*25, errors.New("describe worker timed out")) - defer cancel() + w.logger.Info("committing") + if err := msg.InProgress(); err != nil { + w.logger.Error("failed to send the initial in progress message", zap.Error(err), zap.Any("msg", msg)) + } + ticker := time.NewTicker(15 * time.Second) + go func() { + for range ticker.C { + if err := msg.InProgress(); err != nil { + w.logger.Error("failed to send an in progress message", zap.Error(err), zap.Any("msg", msg)) + } + } + }() if err := w.ProcessMessage(ctx, msg); err != nil { w.logger.Error("failed to process message", zap.Error(err)) @@ -94,13 +100,46 @@ func (w *Worker) Run(ctx context.Context) error { return nil } -func (w *Worker) ProcessMessage(ctx context.Context, msg jetstream.Msg) error { +func (w *Worker) ProcessMessage(ctx context.Context, msg jetstream.Msg) (err error) { var request scheduler.TaskRequest if err := json.Unmarshal(msg.Data(), &request); err != nil { w.logger.Error("Failed to unmarshal ComplianceReportJob results", zap.Error(err)) return err } + var response scheduler.TaskResponse + + defer func() { + if err != nil { + //response.FailureMessage = err.Error() + response.Status = models.TaskRunStatusFailed + } else { + response.Status = models.TaskRunStatusFailed + } + + responseJson, err := json.Marshal(response) + if err != nil { + w.logger.Error("failed to create job result json", zap.Error(err)) + return + } + + if _, err := w.jq.Produce(ctx, ResultTopicName, responseJson, fmt.Sprintf("task-run-result-%d", request.RunID)); err != nil { + w.logger.Error("failed to publish job result", zap.String("jobResult", string(responseJson)), zap.Error(err)) + } + }() + + response.RunID = request.RunID + response.Status = models.TaskRunStatusInProgress + responseJson, err := json.Marshal(response) + if err != nil { + w.logger.Error("failed to create response json", zap.Error(err)) + return err + } + + if _, err = w.jq.Produce(ctx, ResultTopicName, responseJson, fmt.Sprintf("task-%d", request.RunID)); err != nil { + w.logger.Error("failed to publish job in progress", zap.String("response", string(responseJson)), zap.Error(err)) + } + image := "nginx:latest" // Run the Grype command @@ -111,11 +150,10 @@ func (w *Worker) ProcessMessage(ctx context.Context, msg jetstream.Msg) error { return err } - var response scheduler.TaskResponse response.Result = output response.RunID = request.RunID response.Status = models.TaskRunStatusFinished - responseJson, err := json.Marshal(response) + responseJson, err = json.Marshal(response) if err != nil { w.logger.Error("failed to create response json", zap.Error(err)) return err