Skip to content

Commit

Permalink
jobs: job testing endpoints
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Zapletal <[email protected]>
  • Loading branch information
lzap authored and ezr-ondrej committed May 15, 2024
1 parent 33e0b5d commit ebd6b9f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 11 deletions.
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func webRoutes(cfg *config.EdgeConfig) *chi.Mux {
s.Route("/fdo", routes.MakeFDORouter)
s.Route("/device-groups", routes.MakeDeviceGroupsRouter)
s.Route("/storage", routes.MakeStorageRouter)

// this is meant for testing the job queue
s.Post("/ops/jobs/noop", services.CreateNoopJob)
s.Post("/ops/jobs/fallback", services.CreateFallbackJob)
})
return route
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type JobEnqueuer interface {
type JobWorker interface {
JobEnqueuer

// RegisterHandler registers an event listener for a particular type with an associated handler. The first handler
// RegisterHandlers registers an event listener for a particular type with an associated handler. The first handler
// is for business logic, the second handler is for error handling. The second handler is called when job is processing
// for too long, on graceful shutdown, panic or SIGINT.
RegisterHandlers(JobType, JobHandler, JobHandler)
Expand Down Expand Up @@ -108,7 +108,7 @@ func initJobContext(origCtx context.Context, job *Job) (context.Context, logrus.

id, err := identity.DecodeIdentity(job.Identity)
if err != nil {
logrus.WithContext(ctx).WithError(err).Warn("Error decoding identity")
logrus.WithContext(ctx).WithError(err).Warnf("Error decoding identity: %s", err)
id = identity.XRHID{}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/jobs/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jobs

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -71,19 +72,18 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error {

_, logger := initJobContext(ctx, job)

logger.WithField("job_args", job.Args).Infof("Enqueuing job %s of type %s", job.ID, job.Type)

if job.ID == uuid.Nil {
job.ID = uuid.New()
}

logger.WithField("job_args", job.Args).Infof("Enqueuing job %s of type %s", job.ID, job.Type)
w.q <- job
w.sen.Add(1)
metrics.JobEnqueuedCount.WithLabelValues(string(job.Type)).Inc()
return nil
}

// Starts managed goroutines to process jobs from the queue. Additionally, start
// Start managed goroutines to process jobs from the queue. Additionally, start
// goroutine to handle interrupt signal if provided. This method does not block.
// Worker must be gracefully stopped via Stop().
func (w *MemoryWorker) Start(ctx context.Context) {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (w *MemoryWorker) Start(ctx context.Context) {
}
}

// Stops processing of all free goroutines, queue is discarded but all active jobs are left
// Stop processing of all free goroutines, queue is discarded but all active jobs are left
// to finish. It blocks until all workers are done which may be terminated by kubernetes.
func (w *MemoryWorker) Stop(ctx context.Context) {
w.oc.Do(func() {
Expand Down Expand Up @@ -210,7 +210,7 @@ func (w *MemoryWorker) processJob(ctx context.Context, job *Job, wid uuid.UUID)
} else if ctx.Err() != nil {
logger.Warningf("Job %s of type %s was cancelled: %s, calling interrupt handler", job.ID, job.Type, ctx.Err().Error())
call = true
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
metrics.JobProcessedCount.WithLabelValues(string(job.Type), "timeouted").Inc()
} else {
metrics.JobProcessedCount.WithLabelValues(string(job.Type), "cancelled").Inc()
Expand All @@ -230,7 +230,7 @@ func (w *MemoryWorker) processJob(ctx context.Context, job *Job, wid uuid.UUID)
start := time.Now()
h(ctx, job)
elapsed := time.Since(start)
logger.Infof("Job %s of type %s completed in %s seconds", job.ID, job.Type, elapsed.Seconds())
logger.Infof("Job %s of type %s completed in %.02f seconds", job.ID, job.Type, elapsed.Seconds())
metrics.JobProcessedCount.WithLabelValues(string(job.Type), "finished").Inc()
metrics.BackgroundJobDuration.WithLabelValues(string(job.Type)).Observe(elapsed.Seconds())
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func InitMemoryWorker() {
registerHandlers()
}

// InitMemoryWorker initializes the dummy (testing) worker queue with an in-memory worker. Call
// InitDummyWorker initializes the dummy (testing) worker queue with an in-memory worker. Call
// RegisterHandlers() before calling this function to register job handlers.
func InitDummyWorker() {
Queue = NewDummyWorker()
Expand All @@ -36,7 +36,7 @@ func registerHandlers() {
}
}

// Returns the default worker queue.
// Worker returns the default worker queue.
func Worker() JobWorker {
return Queue
}
Expand Down
84 changes: 84 additions & 0 deletions pkg/services/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package services

import (
"context"
"net/http"

"github.com/redhatinsights/edge-api/pkg/jobs"
feature "github.com/redhatinsights/edge-api/unleash/features"
"github.com/redhatinsights/platform-go-middlewares/v2/identity"
log "github.com/sirupsen/logrus"
)

type NoopJob struct {
}

func NoopHandler(ctx context.Context, _ *jobs.Job) {
log.WithContext(ctx).Info("NoopHandler called")
}

func NoopFailureHandler(ctx context.Context, _ *jobs.Job) {
log.WithContext(ctx).Info("NoopFailureHandler called")
}

func FallbackHandler(ctx context.Context, _ *jobs.Job) {
log.WithContext(ctx).Info("FallbackHandler called")
panic("failure")
}

func FallbackFailureHandler(ctx context.Context, _ *jobs.Job) {
log.WithContext(ctx).Info("FallbackFailureHandler called")
}

func init() {
jobs.RegisterHandlers("NoopJob", NoopHandler, NoopFailureHandler)
jobs.RegisterHandlers("FallbackJob", FallbackHandler, FallbackFailureHandler)
}

func CreateNoopJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

if feature.JobQueue.IsEnabledCtx(ctx) {
orgID := identity.GetIdentity(ctx).Identity.OrgID
log.WithContext(ctx).Infof("Enqueuing NoopJob for org %s", orgID)

job := jobs.Job{
Type: "NoopJob",
Args: &NoopJob{},
Identity: identity.GetRawIdentity(ctx),
}

err := jobs.Enqueue(ctx, &job)
if err != nil {
log.WithContext(ctx).Errorf("Cannot enqueue job: %s", err)
}
w.WriteHeader(http.StatusOK)
} else {
log.WithContext(ctx).Info("Not enqueuing NoopJob - job queue not enabled")
w.WriteHeader(http.StatusBadRequest)
}
}

func CreateFallbackJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

if feature.JobQueue.IsEnabledCtx(ctx) {
orgID := identity.GetIdentity(ctx).Identity.OrgID
log.WithContext(ctx).Infof("Enqueuing NoopJob for org %s", orgID)

job := jobs.Job{
Type: "FallbackJob",
Args: &NoopJob{},
Identity: identity.GetRawIdentity(ctx),
}

err := jobs.Enqueue(ctx, &job)
if err != nil {
log.WithContext(ctx).Errorf("Cannot enqueue job: %s", err)
}
w.WriteHeader(http.StatusOK)
} else {
log.WithContext(ctx).Info("Not enqueuing NoopJob - job queue not enabled")
w.WriteHeader(http.StatusBadRequest)
}
}
2 changes: 1 addition & 1 deletion unleash/features/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (ff *Flag) IsEnabled(options ...unleash.FeatureOption) bool {
return false
}

// IsEnabled checks both the feature flag service and env vars on demand.
// IsEnabledCtx checks both the feature flag service and env vars on demand.
// Organization ID is passed from the context if present.
func (ff *Flag) IsEnabledCtx(ctx context.Context, options ...unleash.FeatureOption) bool {
if ff.Name != "" && CheckFeatureCtx(ctx, ff.Name, options...) {
Expand Down

0 comments on commit ebd6b9f

Please sign in to comment.