Skip to content

Commit

Permalink
feat: introduce exp. backoff for polling (#52)
Browse files Browse the repository at this point in the history
- Introduce active poll percent configuration to determine the
  percentage of workers that should actively poll as per the specified
  poll interval irrespective of whether a job was found or not.
- For a non-active poll worker, when no job is found, use exponential
  backoff to determine the poll delay for next attempt.
- Fix bug in exponential backoff calculation where it would return 0 as
  the backoff duration for high attempt number such as 70.
- Add instance name to otelsql instrumentation and use nrpgx driver for
  default PostgreSQL client (was missed).
  • Loading branch information
sudo-suhas authored Aug 10, 2023
1 parent c6f8c7c commit 9b7f898
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 23 deletions.
1 change: 1 addition & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ worker:
enabled: true
worker_count: 1
poll_interval: 1s
active_poll_percent: 20
pgq:
host: localhost
port: 5432
Expand Down
4 changes: 2 additions & 2 deletions internal/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/jmoiron/sqlx"
_ "github.com/newrelic/go-agent/v3/integrations/nrpgx" // register instrumented DB driver
"go.nhat.io/otelsql"
Expand Down Expand Up @@ -96,11 +95,12 @@ func (c *Client) Close() error {
// NewClient initializes database connection
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
driverName, err := otelsql.Register(
"pgx",
"nrpgx",
otelsql.TraceQueryWithoutArgs(),
otelsql.TraceRowsClose(),
otelsql.TraceRowsAffected(),
otelsql.WithSystem(semconv.DBSystemPostgreSQL),
otelsql.WithInstanceName("default"),
)
if err != nil {
return nil, fmt.Errorf("register otelsql: %w", err)
Expand Down
12 changes: 7 additions & 5 deletions internal/workermanager/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ type Worker interface {
}

type Config struct {
Enabled bool `mapstructure:"enabled"`
WorkerCount int `mapstructure:"worker_count" default:"3"`
PollInterval time.Duration `mapstructure:"poll_interval" default:"500ms"`
PGQ pgq.Config `mapstructure:"pgq"`
JobManagerPort int `mapstructure:"job_manager_port"`
Enabled bool `mapstructure:"enabled"`
WorkerCount int `mapstructure:"worker_count" default:"3"`
PollInterval time.Duration `mapstructure:"poll_interval" default:"500ms"`
ActivePollPercent float64 `mapstructure:"active_poll_percent" default:"20"`
PGQ pgq.Config `mapstructure:"pgq"`
JobManagerPort int `mapstructure:"job_manager_port"`
}

type Deps struct {
Expand All @@ -58,6 +59,7 @@ func New(ctx context.Context, deps Deps) (*Manager, error) {
w, err := worker.New(
workermw.WithJobProcessorInstrumentation()(processor),
worker.WithRunConfig(cfg.WorkerCount, cfg.PollInterval),
worker.WithActivePollPercent(cfg.ActivePollPercent),
worker.WithLogger(deps.Logger),
)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/worker/backoff_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ type ExponentialBackoff struct {
}

func (b *ExponentialBackoff) Backoff(attempt int) time.Duration {
duration := b.InitialDelay * time.Duration(math.Pow(b.Multiplier, float64(attempt-1)))
limit := (float64)(math.MaxInt64) / (float64)(b.InitialDelay)
multiplier := math.Pow(b.Multiplier, (float64)(attempt-1))
if multiplier > limit {
multiplier = limit
}
duration := b.InitialDelay * time.Duration(multiplier)

if b.MaxDelay > 0 && duration > b.MaxDelay {
duration = b.MaxDelay
Expand Down
10 changes: 10 additions & 0 deletions pkg/worker/backoff_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ func TestExponentialBackoff(t *testing.T) {
attempt: 11,
expected: time.Second * 15,
},
7: {
b: &ExponentialBackoff{
Multiplier: 4,
InitialDelay: time.Second * 1,
MaxDelay: time.Second * 10,
Jitter: 1,
},
attempt: 111,
expected: time.Second * 15,
},
}
for i, tc := range cases {
assert.Equal(t, tc.expected, tc.b.Backoff(tc.attempt), "test[%d]", i)
Expand Down
3 changes: 2 additions & 1 deletion pkg/worker/pgq/pgq_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewProcessor(ctx context.Context, cfg Config) (*Processor, error) {
otelsql.TraceRowsClose(),
otelsql.TraceRowsAffected(),
otelsql.WithSystem(semconv.DBSystemPostgreSQL),
otelsql.WithInstanceName("pgq"),
)
if err != nil {
return nil, fmt.Errorf("new pgq processor: %w", err)
Expand Down Expand Up @@ -98,7 +99,7 @@ func (p *Processor) Process(ctx context.Context, types []string, fn worker.JobEx
job, err := p.pickupJob(ctx, tx, types)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
return fmt.Errorf("pickup job: %w", worker.ErrNoJob)
}
return fmt.Errorf("pickup job: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/worker/pgq/pgq_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *ProcessorTestSuite) TestProcess() {
s.Fail("unexpected job invocation")
return job
})
s.NoError(err)
s.ErrorIs(err, worker.ErrNoJob)
})

s.Run("ProcessOnlyGivenTypes", func() {
Expand All @@ -168,7 +168,7 @@ func (s *ProcessorTestSuite) TestProcess() {
s.Fail("unexpected job invocation")
return job
})
s.NoError(err)
s.ErrorIs(err, worker.ErrNoJob)
})

s.Run("ProcessOnlyReadyJobs", func() {
Expand All @@ -184,7 +184,7 @@ func (s *ProcessorTestSuite) TestProcess() {
s.Fail("unexpected job invocation")
return job
})
s.NoError(err)
s.ErrorIs(err, worker.ErrNoJob)
})

s.Run("JobProcessedSuccessfully", func() {
Expand Down Expand Up @@ -296,7 +296,7 @@ func (s *ProcessorTestSuite) TestProcess() {
s.Fail("unexpected job invocation")
return job
})
s.NoError(err)
s.ErrorIs(err, worker.ErrNoJob)
return job
})
s.NoError(err)
Expand Down
45 changes: 35 additions & 10 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

Expand All @@ -14,12 +15,14 @@ var (
ErrTypeExists = errors.New("handler for given job type exists")
ErrUnknownType = errors.New("job type is invalid")
ErrJobExists = errors.New("job with id exists")
ErrNoJob = errors.New("no job found")
)

// Worker provides asynchronous job processing using a job processor.
type Worker struct {
workers int
pollInterval time.Duration
workers int
pollInterval time.Duration
activePollPercent float64

processor JobProcessor
logger log.Logger
Expand Down Expand Up @@ -87,13 +90,15 @@ func (w *Worker) Run(baseCtx context.Context) error {
ctx, cancel := context.WithCancel(baseCtx)
defer cancel()

activePollWorkers := (int)(math.Ceil((float64)(w.workers) * w.activePollPercent / 100))

var wg sync.WaitGroup
wg.Add(w.workers)
for i := 0; i < w.workers; i++ {
go func(id int) {
defer wg.Done()

w.runWorker(ctx)
w.runWorker(ctx, id < activePollWorkers)
w.logger.Info("worker exited", "worker_id", id)
}(i)
}
Expand All @@ -103,26 +108,46 @@ func (w *Worker) Run(baseCtx context.Context) error {
return cleanupCtxErr(ctx.Err())
}

func (w *Worker) runWorker(ctx context.Context) {
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
func (w *Worker) runWorker(ctx context.Context, activePoll bool) {
timer := time.NewTimer(w.pollInterval)
defer timer.Stop()

var backoff BackoffStrategy = ConstBackoff{Delay: w.pollInterval}
if !activePoll {
backoff = &ExponentialBackoff{
Multiplier: 1.6,
InitialDelay: w.pollInterval,
MaxDelay: 5 * time.Second,
Jitter: 0.5,
}
}

pollAttempt := 1
for {
select {
case <-ctx.Done():
return

case <-ticker.C:
case <-timer.C:
types := w.getTypes()
if len(types) == 0 {
w.logger.Warn("no job-handler registered, skipping processing")
continue
}

w.logger.Debug("looking for a job", "types", types)
if err := w.processor.Process(ctx, types, w.processJob); err != nil {
w.logger.Error("process job failed", "error", err)
w.logger.Debug("looking for a job", "types", types, "active_poll", activePoll)
switch err := w.processor.Process(ctx, types, w.processJob); {
case err != nil && errors.Is(err, ErrNoJob):
pollAttempt++

case err != nil:
w.logger.Error("process job failed", "err", err)
pollAttempt = 1

default:
pollAttempt = 1
}
timer.Reset(backoff.Backoff(pollAttempt))
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/worker/worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,17 @@ func WithRunConfig(workers int, pollInterval time.Duration) Option {
}
}

func WithActivePollPercent(pct float64) Option {
return func(w *Worker) error {
w.activePollPercent = pct
return nil
}
}

func withDefaults(opts []Option) []Option {
return append([]Option{
WithLogger(nil),
WithRunConfig(1, 1*time.Second),
WithActivePollPercent(20),
}, opts...)
}

0 comments on commit 9b7f898

Please sign in to comment.