Skip to content

Commit

Permalink
feat: HTTP server for dead job management (#46)
Browse files Browse the repository at this point in the history
- Add dead job management handler with endpoints for dead job
  management:
  - /dead-jobs: JSON/HTML response with content negotiation. Response is
    paginated.
  - /resurrect-jobs: Move specified dead jobs to jobs_queue table.
  - /clear-jobs: Remove specified dead jobs from dead_jobs table.
- Start a HTTP server on specified job manager port when worker is run.
  • Loading branch information
sudo-suhas authored Jul 27, 2023
1 parent 5b95374 commit b70fd31
Show file tree
Hide file tree
Showing 13 changed files with 1,114 additions and 36 deletions.
4 changes: 2 additions & 2 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func workerCmd(cfg *Config) *cobra.Command {
func workerStartCommand(cfg *Config) *cobra.Command {
c := &cobra.Command{
Use: "start",
Short: "Start server on default port 8080",
Example: "compass server start",
Short: "Start worker to start processing jobs and a web server for dead job management on default port 8085",
Example: "compass worker start",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if err := runWorker(cmd.Context(), cfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ worker:
name: compass
username: compass
password: compass_password
job_manager_port: 8085

client:
host: localhost:8081
Expand Down
8 changes: 8 additions & 0 deletions internal/testutils/utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package testutils

import (
"context"
"encoding/json"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
Expand Down Expand Up @@ -35,3 +37,9 @@ func Marshal(t *testing.T, v interface{}) []byte {

return data
}

type ArgMatcher interface{ Matches(interface{}) bool }

func OfTypeContext() ArgMatcher {
return mock.MatchedBy(func(ctx context.Context) bool { return ctx != nil })
}
42 changes: 31 additions & 11 deletions internal/workermanager/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package workermanager

import (
"context"
"errors"
"fmt"
"net/http"
"sync/atomic"
"time"

Expand All @@ -12,10 +14,12 @@ import (
)

type Manager struct {
processor *pgq.Processor
registered atomic.Bool
worker Worker
discoveryRepo DiscoveryRepository
processor *pgq.Processor
registered atomic.Bool
worker Worker
jobManagerPort int
discoveryRepo DiscoveryRepository
logger log.Logger
}

//go:generate mockery --name=Worker -r --case underscore --with-expecter --structname Worker --filename worker_mock.go --output=./mocks
Expand All @@ -27,10 +31,11 @@ type Worker interface {
}

type Config struct {
Enabled bool `mapstructure:"enabled"`
WorkerCount int `mapstructure:"worker_count" default:"3"`
PollInterval time.Duration `mapstructure:"poll_interval" default:"500ms"`
PGQ pgq.Config `mapstructure:"pgq"`
Enabled bool `mapstructure:"enabled"`
WorkerCount int `mapstructure:"worker_count" default:"3"`
PollInterval time.Duration `mapstructure:"poll_interval" default:"500ms"`
PGQ pgq.Config `mapstructure:"pgq"`
JobManagerPort int `mapstructure:"job_manager_port"`
}

type Deps struct {
Expand All @@ -56,9 +61,11 @@ func New(ctx context.Context, deps Deps) (*Manager, error) {
}

return &Manager{
processor: processor,
worker: w,
discoveryRepo: deps.DiscoveryRepo,
processor: processor,
worker: w,
jobManagerPort: cfg.JobManagerPort,
discoveryRepo: deps.DiscoveryRepo,
logger: deps.Logger,
}, nil
}

Expand All @@ -74,6 +81,19 @@ func (m *Manager) Run(ctx context.Context) error {
return fmt.Errorf("run async worker: %w", err)
}

go func() {
srv := http.Server{
Addr: fmt.Sprintf(":%d", m.jobManagerPort),
Handler: worker.DeadJobManagementHandler(m.processor),
ReadTimeout: 3 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
m.logger.Error("Worker job manager - listen and serve", "err", err)
}
}()

return m.worker.Run(ctx)
}

Expand Down
132 changes: 132 additions & 0 deletions pkg/worker/dead_jobs_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package worker

import (
"context"
_ "embed"
"encoding/json"
"html/template"
"net/http"
"net/url"
"strconv"
"strings"
)

//go:embed dead_jobs_page.html
var deadJobsPageTplSrc string

//go:generate mockery --name=DeadJobManager -r --case underscore --with-expecter --structname DeadJobManager --filename dead_job_manager_mock.go --output=./mocks

type DeadJobManager interface {
DeadJobs(ctx context.Context, size, offset int) ([]Job, error)
Resurrect(ctx context.Context, jobIDs []string) error
ClearDeadJobs(ctx context.Context, jobIDs []string) error
}

const (
listDeadJobsPath = "/dead-jobs"
resurrectJobsPath = "/resurrect-jobs"
clearJobsPath = "/clear-jobs"
)

// DeadJobManagementHandler returns a http handler with endpoints for dead job
// management:
// - /dead-jobs: JSON/HTML response with content negotiation. Response is
// paginated.
// - /resurrect-jobs: Move specified dead jobs to jobs_queue table.
// - /clear-jobs: Remove specified dead jobs from dead_jobs table.
func DeadJobManagementHandler(mgr DeadJobManager) http.Handler {
mux := http.NewServeMux()
mux.HandleFunc(listDeadJobsPath, deadJobsHandler(mgr))
mux.HandleFunc(resurrectJobsPath, jobsTransformerHandler(func(ctx context.Context, jobIDs []string) error {
return mgr.Resurrect(ctx, jobIDs)
}))
mux.HandleFunc(clearJobsPath, jobsTransformerHandler(func(ctx context.Context, jobIDs []string) error {
return mgr.ClearDeadJobs(ctx, jobIDs)
}))
return mux
}

func deadJobsHandler(mgr DeadJobManager) http.HandlerFunc {
deadJobsPageTpl := template.Must(template.New("").Parse(deadJobsPageTplSrc))

return func(w http.ResponseWriter, r *http.Request) {
qry := r.URL.Query()
size, err := strconv.Atoi(qry.Get("size"))
if err != nil || size <= 0 {
size = 20
}

offset, _ := strconv.Atoi(qry.Get("offset"))
if offset <= 0 {
offset = 0
}

jobs, err := mgr.DeadJobs(r.Context(), size, offset)
if err != nil {
writeJSONResponse(w, http.StatusInternalServerError, err)
return
}

if strings.Contains(r.Header.Get("Accept"), "application/json") {
writeJSONResponse(w, http.StatusOK, jobs)
return
}

tplData := map[string]any{
"jobs": jobs,
"page_size": size,
"next_page": offset + size,
"prev_page": offset - size,
"err_msg": strings.TrimSpace(qry.Get("error")),
}
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
if err := deadJobsPageTpl.Execute(w, tplData); err != nil {
sendTo := listDeadJobsPath + "?error=" + url.QueryEscape(err.Error())
http.Redirect(w, r, sendTo, http.StatusSeeOther)
return
}
}
}

func jobsTransformerHandler(fn func(context.Context, []string) error) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
sendTo := listDeadJobsPath + "?error=" + url.QueryEscape(err.Error())
http.Redirect(w, r, sendTo, http.StatusSeeOther)
return
}

jobIDs := r.Form["job_ids"]
if len(jobIDs) == 0 {
sendTo := listDeadJobsPath + "?error=" + url.QueryEscape("no job IDs specified")
http.Redirect(w, r, sendTo, http.StatusSeeOther)
return
}

if err := fn(r.Context(), jobIDs); err != nil {
sendTo := listDeadJobsPath + "?error=" + url.QueryEscape(err.Error())
http.Redirect(w, r, sendTo, http.StatusSeeOther)
return
}

http.Redirect(w, r, listDeadJobsPath, http.StatusSeeOther)
}
}

func writeJSONResponse(w http.ResponseWriter, status int, v any) {
if err, ok := v.(error); ok {
v = map[string]interface{}{"error": err.Error()}
}

data, err := json.Marshal(v)
if err != nil {
http.Error(w, "encode response failed", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_, _ = w.Write(data)
}
Loading

0 comments on commit b70fd31

Please sign in to comment.