Skip to content

Commit

Permalink
fix: send status
Browse files Browse the repository at this point in the history
  • Loading branch information
artaasadi committed Dec 14, 2024
1 parent dbcde38 commit 093e3cb
Showing 1 changed file with 47 additions and 9 deletions.
56 changes: 47 additions & 9 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package worker
import (
"context"
"encoding/json"
"errors"
fmt "fmt"
"github.com/nats-io/nats.go/jetstream"
"github.com/opengovern/og-util/pkg/jq"
Expand Down Expand Up @@ -65,11 +64,18 @@ func (w *Worker) Run(ctx context.Context) error {
jetstream.PullMaxMessages(1),
}, func(msg jetstream.Msg) {
w.logger.Info("received a new job")

defer msg.Ack()

ctx, cancel := context.WithTimeoutCause(ctx, time.Minute*25, errors.New("describe worker timed out"))
defer cancel()
w.logger.Info("committing")
if err := msg.InProgress(); err != nil {
w.logger.Error("failed to send the initial in progress message", zap.Error(err), zap.Any("msg", msg))
}
ticker := time.NewTicker(15 * time.Second)
go func() {
for range ticker.C {
if err := msg.InProgress(); err != nil {
w.logger.Error("failed to send an in progress message", zap.Error(err), zap.Any("msg", msg))
}
}
}()

if err := w.ProcessMessage(ctx, msg); err != nil {
w.logger.Error("failed to process message", zap.Error(err))
Expand All @@ -94,13 +100,46 @@ func (w *Worker) Run(ctx context.Context) error {
return nil
}

func (w *Worker) ProcessMessage(ctx context.Context, msg jetstream.Msg) error {
func (w *Worker) ProcessMessage(ctx context.Context, msg jetstream.Msg) (err error) {
var request scheduler.TaskRequest
if err := json.Unmarshal(msg.Data(), &request); err != nil {
w.logger.Error("Failed to unmarshal ComplianceReportJob results", zap.Error(err))
return err
}

var response scheduler.TaskResponse

defer func() {
if err != nil {
//response.FailureMessage = err.Error()
response.Status = models.TaskRunStatusFailed
} else {
response.Status = models.TaskRunStatusFailed
}

responseJson, err := json.Marshal(response)
if err != nil {
w.logger.Error("failed to create job result json", zap.Error(err))
return
}

if _, err := w.jq.Produce(ctx, ResultTopicName, responseJson, fmt.Sprintf("task-run-result-%d", request.RunID)); err != nil {
w.logger.Error("failed to publish job result", zap.String("jobResult", string(responseJson)), zap.Error(err))
}
}()

response.RunID = request.RunID
response.Status = models.TaskRunStatusInProgress
responseJson, err := json.Marshal(response)
if err != nil {
w.logger.Error("failed to create response json", zap.Error(err))
return err
}

if _, err = w.jq.Produce(ctx, ResultTopicName, responseJson, fmt.Sprintf("task-%d", request.RunID)); err != nil {
w.logger.Error("failed to publish job in progress", zap.String("response", string(responseJson)), zap.Error(err))
}

image := "nginx:latest"

// Run the Grype command
Expand All @@ -111,11 +150,10 @@ func (w *Worker) ProcessMessage(ctx context.Context, msg jetstream.Msg) error {
return err
}

var response scheduler.TaskResponse
response.Result = output
response.RunID = request.RunID
response.Status = models.TaskRunStatusFinished
responseJson, err := json.Marshal(response)
responseJson, err = json.Marshal(response)
if err != nil {
w.logger.Error("failed to create response json", zap.Error(err))
return err
Expand Down

0 comments on commit 093e3cb

Please sign in to comment.