Skip to content

Commit

Permalink
jobs: pass identity into retry job
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Zapletal <[email protected]>
  • Loading branch information
lzap authored and ezr-ondrej committed May 15, 2024
1 parent ebd6b9f commit 397a8a0
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 50 deletions.
59 changes: 59 additions & 0 deletions logger/ctx_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package logger

import (
"github.com/redhatinsights/edge-api/pkg/jobs"
"github.com/redhatinsights/platform-go-middlewares/v2/identity"
"github.com/redhatinsights/platform-go-middlewares/v2/request_id"
"github.com/sirupsen/logrus"
)

// Use request id from the standard context and add it to the message as a field.
type ctxHook struct {
}

func (h *ctxHook) Levels() []logrus.Level {
return []logrus.Level{
logrus.DebugLevel,
logrus.InfoLevel,
logrus.WarnLevel,
logrus.ErrorLevel,
logrus.FatalLevel,
logrus.PanicLevel,
}
}

func (h *ctxHook) Fire(e *logrus.Entry) error {
if e.Context == nil {
return nil
}

id := identity.GetIdentity(e.Context)
if id.Identity.OrgID != "" {
e.Data["org_id"] = id.Identity.OrgID
}

if id.Identity.AccountNumber != "" {
e.Data["account_number"] = id.Identity.AccountNumber
}

if id.Identity.User != nil && id.Identity.User.UserID != "" {
e.Data["user_id"] = id.Identity.User.UserID
}

jid := jobs.JobID(e.Context)
if jid != "" {
e.Data["job_id"] = jid
}

cid := jobs.CorrID(e.Context)
if cid != "" {
e.Data["correlation_id"] = cid
}

rid := request_id.GetReqID(e.Context)
if rid != "" {
e.Data["request_id"] = rid
}

return nil
}
37 changes: 1 addition & 36 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@
package logger

import (
"bytes"
"fmt"
"io"
"path"
"runtime"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -17,7 +12,6 @@ import (
log "github.com/sirupsen/logrus"

"github.com/redhatinsights/edge-api/config"
feature "github.com/redhatinsights/edge-api/unleash/features"
)

// Log is an instance of the global logrus.Logger
Expand All @@ -26,33 +20,6 @@ var logLevel log.Level
// hook is an instance of cloudwatch.hook
var hook *lc.LogrusHook

func prettyfier(f *runtime.Frame) (string, string) {
s := strings.Split(f.Function, ".")
funcName := s[len(s)-1]
return funcName, fmt.Sprintf("%s:%d", path.Base(f.File), f.Line)
}

type plainFormatter struct{}

// Format formats the log entry for humans (just the message or SQL if present)
func (f *plainFormatter) Format(e *log.Entry) ([]byte, error) {
var b bytes.Buffer
if f, ok := e.Data["sql"]; ok {
b.WriteString(f.(string))
} else {
b.WriteString(e.Message)
}
b.WriteRune('\n')

if f, ok := e.Data["error"]; ok {
b.WriteString("ERROR: ")
b.WriteString(fmt.Sprintf("%v", f))
b.WriteRune('\n')
}

return b.Bytes(), nil
}

// InitLogger initializes the API logger
func InitLogger(writer io.Writer) {

Expand Down Expand Up @@ -82,12 +49,10 @@ func InitLogger(writer io.Writer) {
FieldMap: log.FieldMap{
log.FieldKeyTime: "@timestamp",
},
CallerPrettyfier: prettyfier,
})
} else if !feature.LogVerboseFields.IsEnabledLocal() {
log.SetFormatter(&plainFormatter{})
}

log.AddHook(&ctxHook{})
log.SetOutput(writer)
log.SetLevel(logLevel)
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ type Stats struct {
type jobKeyID int

const (
jobIDCtxKey jobKeyID = iota
jobIDCtxKey jobKeyID = iota
corrIDCtxKey jobKeyID = iota
)

// JobID returns job id or an empty string when not set.
Expand All @@ -103,14 +104,30 @@ func WithJobID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, jobIDCtxKey, id)
}

// CorrID returns job id or an empty string when not set.
func CorrID(ctx context.Context) string {
value := ctx.Value(corrIDCtxKey)
if value == nil {
return ""
}
return value.(string)
}

// WithCorrID returns context copy with trace id value.
func WithCorrID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, corrIDCtxKey, id)
}

func initJobContext(origCtx context.Context, job *Job) (context.Context, logrus.FieldLogger) {
ctx := WithJobID(origCtx, job.ID.String())
ctx = WithCorrID(ctx, job.CorrelationID)

id, err := identity.DecodeIdentity(job.Identity)
if err != nil {
logrus.WithContext(ctx).WithError(err).Warnf("Error decoding identity: %s", err)
id = identity.XRHID{}
}
ctx = identity.WithIdentity(ctx, id)

return ctx, logrus.WithContext(ctx).WithFields(
logrus.Fields{
Expand Down
3 changes: 1 addition & 2 deletions pkg/jobs/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,11 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error {
return fmt.Errorf("unable to enqueue job: %w", ErrJobNotFound)
}

_, logger := initJobContext(ctx, job)

if job.ID == uuid.Nil {
job.ID = uuid.New()
}

_, logger := initJobContext(ctx, job)
logger.WithField("job_args", job.Args).Infof("Enqueuing job %s of type %s", job.ID, job.Type)
w.q <- job
w.sen.Add(1)
Expand Down
21 changes: 13 additions & 8 deletions pkg/services/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,28 +1332,33 @@ func init() {

// RetryCreateImage retries the whole post process of the image creation
func (s *ImageService) RetryCreateImage(ctx context.Context, image *models.Image) error {
s.log = s.log.WithFields(log.Fields{"imageID": image.ID, "commitID": image.Commit.ID})
logger := log.WithContext(ctx).WithFields(log.Fields{"imageID": image.ID, "commitID": image.Commit.ID})

// recompose commit
image, err := s.ImageBuilder.ComposeCommit(image)
if err != nil {
s.log.WithField("error", err.Error()).Error("Failed recomposing commit")
logger.WithField("error", err.Error()).Error("Failed recomposing commit")
return err
}
err = s.SetBuildingStatusOnImageToRetryBuild(image)
if err != nil {
s.log.WithField("error", err.Error()).Error("Failed setting image status")
logger.WithField("error", err.Error()).Error("Failed setting image status")
return nil
}
if feature.JobQueue.IsEnabledCtx(ctx) {
orgID := identity.GetIdentity(ctx).Identity.OrgID
s.log.Infof("Enqueuing RetryCreateImageJob for org %s", orgID)
logger.Infof("Enqueuing RetryCreateImageJob for org %s", orgID)
job := jobs.Job{
Type: "RetryCreateImageJob",
Args: &RetryCreateImageJob{ImageID: image.ID},
Type: "RetryCreateImageJob",
Args: &RetryCreateImageJob{ImageID: image.ID},
Identity: identity.GetRawIdentity(ctx),
}
err := jobs.Enqueue(ctx, &job)
if err != nil {
logger.WithField("error", err.Error()).Error("Failed enqueueing job")
}
jobs.Enqueue(ctx, &job)
} else {
s.log.Info("Calling RetryCreateImageJob")
logger.Info("Calling RetryCreateImageJob")
go s.processImage(ctx, image.ID, DefaultLoopDelay, true)
}
return nil
Expand Down
3 changes: 0 additions & 3 deletions unleash/features/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ type Flag struct {

// LONG_TERM OPERATIONAL AND DEV FLAGS

// LogVerboseFields adds fields to the log output. Primarily for local dev and not an Unleash flag.
var LogVerboseFields = &Flag{Name: "edge-management.log_verbose_fields", EnvVar: "FEATURE_LOG_VERBOSE_FIELDS"}

// GLITCHTIP LOGGING FLAGS

// GlitchtipLogging is the feature flag for reporting errors to GlitchTip
Expand Down

0 comments on commit 397a8a0

Please sign in to comment.