diff --git a/main.go b/main.go index 78908de49..25517ff68 100644 --- a/main.go +++ b/main.go @@ -143,6 +143,10 @@ func webRoutes(cfg *config.EdgeConfig) *chi.Mux { s.Route("/fdo", routes.MakeFDORouter) s.Route("/device-groups", routes.MakeDeviceGroupsRouter) s.Route("/storage", routes.MakeStorageRouter) + + // this is meant for testing the job queue + s.Post("/ops/jobs/noop", services.CreateNoopJob) + s.Post("/ops/jobs/fallback", services.CreateFallbackJob) }) return route } diff --git a/pkg/jobs/job.go b/pkg/jobs/job.go index 4c0d7b1d3..5105a9c7a 100644 --- a/pkg/jobs/job.go +++ b/pkg/jobs/job.go @@ -55,7 +55,7 @@ type JobEnqueuer interface { type JobWorker interface { JobEnqueuer - // RegisterHandler registers an event listener for a particular type with an associated handler. The first handler + // RegisterHandlers registers an event listener for a particular type with an associated handler. The first handler // is for business logic, the second handler is for error handling. The second handler is called when job is processing // for too long, on graceful shutdown, panic or SIGINT. RegisterHandlers(JobType, JobHandler, JobHandler) @@ -108,7 +108,7 @@ func initJobContext(origCtx context.Context, job *Job) (context.Context, logrus. id, err := identity.DecodeIdentity(job.Identity) if err != nil { - logrus.WithContext(ctx).WithError(err).Warn("Error decoding identity") + logrus.WithContext(ctx).WithError(err).Warnf("Error decoding identity: %s", err) id = identity.XRHID{} } diff --git a/pkg/jobs/memory.go b/pkg/jobs/memory.go index 66b7aa2ca..cb01ae71c 100644 --- a/pkg/jobs/memory.go +++ b/pkg/jobs/memory.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "errors" "fmt" "os" "os/signal" @@ -71,19 +72,18 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error { _, logger := initJobContext(ctx, job) - logger.WithField("job_args", job.Args).Infof("Enqueuing job %s of type %s", job.ID, job.Type) - if job.ID == uuid.Nil { job.ID = uuid.New() } + logger.WithField("job_args", job.Args).Infof("Enqueuing job %s of type %s", job.ID, job.Type) w.q <- job w.sen.Add(1) metrics.JobEnqueuedCount.WithLabelValues(string(job.Type)).Inc() return nil } -// Starts managed goroutines to process jobs from the queue. Additionally, start +// Start managed goroutines to process jobs from the queue. Additionally, start // goroutine to handle interrupt signal if provided. This method does not block. // Worker must be gracefully stopped via Stop(). func (w *MemoryWorker) Start(ctx context.Context) { @@ -120,7 +120,7 @@ func (w *MemoryWorker) Start(ctx context.Context) { } } -// Stops processing of all free goroutines, queue is discarded but all active jobs are left +// Stop processing of all free goroutines, queue is discarded but all active jobs are left // to finish. It blocks until all workers are done which may be terminated by kubernetes. func (w *MemoryWorker) Stop(ctx context.Context) { w.oc.Do(func() { @@ -210,7 +210,7 @@ func (w *MemoryWorker) processJob(ctx context.Context, job *Job, wid uuid.UUID) } else if ctx.Err() != nil { logger.Warningf("Job %s of type %s was cancelled: %s, calling interrupt handler", job.ID, job.Type, ctx.Err().Error()) call = true - if ctx.Err() == context.DeadlineExceeded { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { metrics.JobProcessedCount.WithLabelValues(string(job.Type), "timeouted").Inc() } else { metrics.JobProcessedCount.WithLabelValues(string(job.Type), "cancelled").Inc() @@ -230,7 +230,7 @@ func (w *MemoryWorker) processJob(ctx context.Context, job *Job, wid uuid.UUID) start := time.Now() h(ctx, job) elapsed := time.Since(start) - logger.Infof("Job %s of type %s completed in %s seconds", job.ID, job.Type, elapsed.Seconds()) + logger.Infof("Job %s of type %s completed in %.02f seconds", job.ID, job.Type, elapsed.Seconds()) metrics.JobProcessedCount.WithLabelValues(string(job.Type), "finished").Inc() metrics.BackgroundJobDuration.WithLabelValues(string(job.Type)).Observe(elapsed.Seconds()) } else { diff --git a/pkg/jobs/queue.go b/pkg/jobs/queue.go index 9356de2e1..99a09310f 100644 --- a/pkg/jobs/queue.go +++ b/pkg/jobs/queue.go @@ -22,7 +22,7 @@ func InitMemoryWorker() { registerHandlers() } -// InitMemoryWorker initializes the dummy (testing) worker queue with an in-memory worker. Call +// InitDummyWorker initializes the dummy (testing) worker queue with an in-memory worker. Call // RegisterHandlers() before calling this function to register job handlers. func InitDummyWorker() { Queue = NewDummyWorker() @@ -36,7 +36,7 @@ func registerHandlers() { } } -// Returns the default worker queue. +// Worker returns the default worker queue. func Worker() JobWorker { return Queue } diff --git a/pkg/services/jobs.go b/pkg/services/jobs.go new file mode 100644 index 000000000..bd54fa819 --- /dev/null +++ b/pkg/services/jobs.go @@ -0,0 +1,84 @@ +package services + +import ( + "context" + "net/http" + + "github.com/redhatinsights/edge-api/pkg/jobs" + feature "github.com/redhatinsights/edge-api/unleash/features" + "github.com/redhatinsights/platform-go-middlewares/v2/identity" + log "github.com/sirupsen/logrus" +) + +type NoopJob struct { +} + +func NoopHandler(ctx context.Context, _ *jobs.Job) { + log.WithContext(ctx).Info("NoopHandler called") +} + +func NoopFailureHandler(ctx context.Context, _ *jobs.Job) { + log.WithContext(ctx).Info("NoopFailureHandler called") +} + +func FallbackHandler(ctx context.Context, _ *jobs.Job) { + log.WithContext(ctx).Info("FallbackHandler called") + panic("failure") +} + +func FallbackFailureHandler(ctx context.Context, _ *jobs.Job) { + log.WithContext(ctx).Info("FallbackFailureHandler called") +} + +func init() { + jobs.RegisterHandlers("NoopJob", NoopHandler, NoopFailureHandler) + jobs.RegisterHandlers("FallbackJob", FallbackHandler, FallbackFailureHandler) +} + +func CreateNoopJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + if feature.JobQueue.IsEnabledCtx(ctx) { + orgID := identity.GetIdentity(ctx).Identity.OrgID + log.WithContext(ctx).Infof("Enqueuing NoopJob for org %s", orgID) + + job := jobs.Job{ + Type: "NoopJob", + Args: &NoopJob{}, + Identity: identity.GetRawIdentity(ctx), + } + + err := jobs.Enqueue(ctx, &job) + if err != nil { + log.WithContext(ctx).Errorf("Cannot enqueue job: %s", err) + } + w.WriteHeader(http.StatusOK) + } else { + log.WithContext(ctx).Info("Not enqueuing NoopJob - job queue not enabled") + w.WriteHeader(http.StatusBadRequest) + } +} + +func CreateFallbackJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + if feature.JobQueue.IsEnabledCtx(ctx) { + orgID := identity.GetIdentity(ctx).Identity.OrgID + log.WithContext(ctx).Infof("Enqueuing NoopJob for org %s", orgID) + + job := jobs.Job{ + Type: "FallbackJob", + Args: &NoopJob{}, + Identity: identity.GetRawIdentity(ctx), + } + + err := jobs.Enqueue(ctx, &job) + if err != nil { + log.WithContext(ctx).Errorf("Cannot enqueue job: %s", err) + } + w.WriteHeader(http.StatusOK) + } else { + log.WithContext(ctx).Info("Not enqueuing NoopJob - job queue not enabled") + w.WriteHeader(http.StatusBadRequest) + } +} diff --git a/unleash/features/feature.go b/unleash/features/feature.go index 77624038e..15987615b 100644 --- a/unleash/features/feature.go +++ b/unleash/features/feature.go @@ -189,7 +189,7 @@ func (ff *Flag) IsEnabled(options ...unleash.FeatureOption) bool { return false } -// IsEnabled checks both the feature flag service and env vars on demand. +// IsEnabledCtx checks both the feature flag service and env vars on demand. // Organization ID is passed from the context if present. func (ff *Flag) IsEnabledCtx(ctx context.Context, options ...unleash.FeatureOption) bool { if ff.Name != "" && CheckFeatureCtx(ctx, ff.Name, options...) {