Skip to content

Commit

Permalink
Fix unique scheduled jobs (#6)
Browse files Browse the repository at this point in the history
* add more comments

* add test for EnqueueUniqueInByKey

* fix removing from inprocess queue

* update comment
  • Loading branch information
Darkemon authored Dec 1, 2023
1 parent e68231b commit de7d308
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 1 deletion.
1 change: 1 addition & 0 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string)
}
}

// findDeadPools returns a map of dead worker pool IDs to the job types that were running in them.
func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) {
conn := r.pool.Get()
defer conn.Close()
Expand Down
56 changes: 55 additions & 1 deletion enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestOrderEnqueueUniqueByKey(t *testing.T) {
require.NoError(t, err)
}

func EnqueueUniqueInByKey(t *testing.T) {
func TestEnqueueUniqueInByKey(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
Expand All @@ -446,6 +446,8 @@ func EnqueueUniqueInByKey(t *testing.T) {
assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt)
}

assert.True(t, exists(pool, job.UniqueKey), "unique keys exists")

job, err = enqueuer.EnqueueUniqueInByKey("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
assert.Nil(t, job)
Expand All @@ -465,3 +467,55 @@ func EnqueueUniqueInByKey(t *testing.T) {
assert.NoError(t, j.ArgError())
assert.True(t, j.Unique)
}

func TestRunEnqueueUniqueInByKey(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)

// Enqueue two unique jobs -- ensure one job sticks.
job, err := enqueuer.EnqueueUniqueInByKey("wat", 1, Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
assert.NotNil(t, job)

doneCh := make(chan struct{})
var argA float64
var argB string

wp := NewWorkerPool(TestContext{}, 3, ns, pool)
wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
argA = job.Args["a"].(float64)
argB = job.Args["b"].(string)

close(doneCh)

return nil
})

wp.Start()

select {
case <-time.After(5 * time.Second):
require.FailNow(t, "timed out")
case <-doneCh:
wp.Drain()
wp.Stop()
}

// Make sure the job has run.
require.EqualValues(t, 1.0, argA)
require.Equal(t, "cool", argB)

// Nothing in retries or dead.
assert.EqualValues(t, 0, zsetSize(pool, redisKeyRetry(ns)), "retry queue must be empty")
assert.EqualValues(t, 0, zsetSize(pool, redisKeyDead(ns)), "dead queue must be empty")

// Nothing in the queues or in-progress queues.
assert.EqualValues(t, 0, listSize(pool, redisKeyScheduled(ns)), "scheduled queue must be empty")
assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, "wat")), "jobs queue must be empty")
assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, "wat")), "inprocess queue must be empty")

// No unique keys.
assert.False(t, exists(pool, job.UniqueKey), "unique keys must be empty")
}
17 changes: 17 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
)

// redisNamespacePrefix returns "<namespace>:" or "" if namespace is empty.
func redisNamespacePrefix(namespace string) string {
l := len(namespace)
if (l > 0) && (namespace[l-1] != ':') {
Expand All @@ -14,6 +15,7 @@ func redisNamespacePrefix(namespace string) string {
return namespace
}

// redisKeyKnownJobs returns "<namespace>:known_jobs".
func redisKeyKnownJobs(namespace string) string {
return redisNamespacePrefix(namespace) + "known_jobs"
}
Expand All @@ -24,54 +26,68 @@ func redisKeyJobsPrefix(namespace string) string {
return redisNamespacePrefix(namespace) + "jobs:"
}

// redisKeyJobs returns "<namespace>:jobs:<jobName>".
func redisKeyJobs(namespace, jobName string) string {
return redisKeyJobsPrefix(namespace) + jobName
}

// redisKeyJobsInProgress returns "<namespace>:jobs:<jobName>:<poolID>:inprogress".
func redisKeyJobsInProgress(namespace, poolID, jobName string) string {
return fmt.Sprintf("%s:%s:inprogress", redisKeyJobs(namespace, jobName), poolID)
}

// redisKeyRetry returns "<namespace>:retry".
func redisKeyRetry(namespace string) string {
return redisNamespacePrefix(namespace) + "retry"
}

// redisKeyDead returns "<namespace>:dead".
func redisKeyDead(namespace string) string {
return redisNamespacePrefix(namespace) + "dead"
}

// redisKeyScheduled returns "<namespace>:scheduled".
func redisKeyScheduled(namespace string) string {
return redisNamespacePrefix(namespace) + "scheduled"
}

// redisKeyWorkerObservation returns "<namespace>:worker:<workerID>".
func redisKeyWorkerObservation(namespace, workerID string) string {
return redisNamespacePrefix(namespace) + "worker:" + workerID
}

// redisKeyWorkerPools returns "<namespace>:worker_pools".
func redisKeyWorkerPools(namespace string) string {
return redisNamespacePrefix(namespace) + "worker_pools"
}

// redisKeyHeartbeat returns "<namespace>:worker_pools:<workerPoolID>".
func redisKeyHeartbeat(namespace, workerPoolID string) string {
return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID
}

// redisKeyJobsPaused returns "<namespace>:jobs:<jobName>:paused".
func redisKeyJobsPaused(namespace, jobName string) string {
return redisKeyJobs(namespace, jobName) + ":paused"
}

// redisKeyJobsLock returns "<namespace>:jobs:<jobName>:lock".
func redisKeyJobsLock(namespace, jobName string) string {
return redisKeyJobs(namespace, jobName) + ":lock"
}

// redisKeyJobsLockInfo returns "<namespace>:jobs:<jobName>:lock_info".
func redisKeyJobsLockInfo(namespace, jobName string) string {
return redisKeyJobs(namespace, jobName) + ":lock_info"
}

// redisKeyJobsConcurrency returns "<namespace>:jobs:<jobName>:max_concurrency".
func redisKeyJobsConcurrency(namespace, jobName string) string {
return redisKeyJobs(namespace, jobName) + ":max_concurrency"
}

// redisKeyUniqueJob returns a unique key for a job name and arguments.
// It has the form "<namespace>:unique:<jobName>:<args>".
func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) {
var buf bytes.Buffer

Expand All @@ -90,6 +106,7 @@ func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (
return buf.String(), nil
}

// redisKeyLastPeriodicEnqueue returns "<namespace>:last_periodic_enqueue".
func redisKeyLastPeriodicEnqueue(namespace string) string {
return redisNamespacePrefix(namespace) + "last_periodic_enqueue"
}
Expand Down
25 changes: 25 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (w *worker) loop() {
}
}

// fetchJob returns a job, or nil if there are no jobs.
// It looks for any of the registered jobs. As soon it finds one, it
// extracts the job from the queue <namespace>:jobs:<jobName> and puts it to the queue
// <namespace>:jobs:<jobName>:<poolID>:inprogress. For more details see redisLuaFetchJob lua script.
// The found job is returned as a Job struct.
func (w *worker) fetchJob() (*Job, error) {
// resort queues
// NOTE: we could optimize this to only resort every second, or something.
Expand Down Expand Up @@ -262,6 +267,26 @@ func (w *worker) getUniqueJob(job *Job) *Job {
return nil
}

// This is a hack to fix the following problem.
// If a job is scheduled with a unique key (EnqueueUniqueInByKey), it's added in 2 places in redis:
// scheduled queue and under unique key.
// A requeuer loop calls a lua script, which extracts the job from the scheduled queue and
// puts it to the jobs queue. Also the script adds a new field to the json body of the job using cjson library.
// It encodes json with a different field order than the golang encoding/json.
// Later on, a worker loop moves the job from the jobs queue to the inprocess queue.
// The worker after processing the job, deletes the job from the inprocess queue and the unique key,
// but
// for deletion it uses rawJson from unique job received from unique key, which doesn't match the json body of the job
// in the inprocess queue. Without this hack we'd get memory leak in redis, because the job would never be deleted
// from the inprocess queue.
//
// EnqueueUniqueInByKey -> scheduled queue -> (json body is modified) -> jobs queue -> inprocess queue -> (handle job) -> delete from inprocess queue
// -> unique key using rawJson from unique key
//
// NOTE: this field is used only to delete the job from the inprocess queue.
// job.rawJSON is the original json body of the job coming from jobs queue.
jobWithArgs.rawJSON = job.rawJSON

return jobWithArgs
}

Expand Down
11 changes: 11 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,17 @@ func getInt64(pool *redis.Pool, key string) int64 {
return v
}

func exists(pool *redis.Pool, key string) bool {
conn := pool.Get()
defer conn.Close()

v, err := redis.Bool(conn.Do("EXISTS", key))
if err != nil {
panic("could not EXISTS: " + err.Error())
}
return v
}

func hgetInt64(pool *redis.Pool, redisKey, hashKey string) int64 {
conn := pool.Get()
defer conn.Close()
Expand Down

0 comments on commit de7d308

Please sign in to comment.