Skip to content

Commit

Permalink
Merge pull request #1 from golang-cz/update
Browse files Browse the repository at this point in the history
update
  • Loading branch information
david-littlefarmer authored May 27, 2024
2 parents 74e72fb + 3790ff7 commit 64609f0
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 63 deletions.
7 changes: 0 additions & 7 deletions locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ var (
ErrFailedToCheckLockExistence = errors.New("looper - failed to check lock existence")
)

type lockerKind int

const (
lockerNop lockerKind = iota
lockerRedis
)

// Lock if an error is returned by lock, the job will not be scheduled.
type locker interface {
lock(ctx context.Context, key string, timeout time.Duration) (lock, error)
Expand Down
124 changes: 68 additions & 56 deletions looper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func SetPanicHandler(handler PanicHandlerFunc) {
// Looper
type Looper struct {
running bool
jobs []*Job
startupTime time.Duration
locker locker
hooks hooks
mu sync.RWMutex
jobs []*Job
locker locker
}

type (
Expand All @@ -46,18 +46,13 @@ type hooks struct {
}

type Config struct {
// Locker for jobs
//
// Options:
// PostgresLocker(ctx context.Context, db *sql.DB, table string)
// RedisLocker(ctx context.Context, rc redis.UniversalClient)
Locker locker

// Startup time ensuring a consistent delay between registered jobs on start of looper.
//
// StartupTime = 1 second; 5 registered jobs; Jobs would be initiated
// with 200ms delay
StartupTime time.Duration

Locker locker
}

type JobFn func(ctx context.Context) error
Expand Down Expand Up @@ -226,7 +221,7 @@ func (l *Looper) StartJobByName(jobName string) error {
found = true
if j.Active && !j.Started {
j.Started = true
go j.start()
go j.startLoop()
}
}

Expand Down Expand Up @@ -263,7 +258,7 @@ func (l *Looper) startJobs() {
j.mu.Lock()
if j.Active && !j.Started {
j.Started = true
go j.start()
go j.startLoop()
time.Sleep(delay)
}

Expand Down Expand Up @@ -299,18 +294,14 @@ func (l *Looper) Stop() {
l.mu.Unlock()
}

func (j *Job) start() {
func (j *Job) startLoop() {
defer func() {
j.mu.Lock()
j.Started = false
j.contextCancel = nil
j.mu.Unlock()
}()

var errLock error
var err error
ctxLock := context.Background()

for {
j.mu.RLock()
if !j.Active || !j.Started {
Expand All @@ -319,65 +310,86 @@ func (j *Job) start() {
}
j.mu.RUnlock()

ctx, cancel := context.WithTimeout(context.Background(), j.Timeout)

j.mu.Lock()
j.contextCancel = cancel
j.Running = true

var lo lock

if j.WithLocker {
lo, errLock = j.locker.lock(ctxLock, j.Name, j.Timeout)
if errors.Is(errLock, ErrFailedToObtainLock) {
time.Sleep(time.Duration(time.Second))
j.Running = false
cancel()
j.mu.Unlock()
continue
}
start := time.Now()

if errLock != nil {
err = errLock
}
j.BeforeJob(j.Name)
err := j.start()
if err != nil {
j.AfterJobError(j.Name, time.Since(start), err)
time.Sleep(j.WaitAfterError)
} else {
j.AfterJob(j.Name, time.Since(start))
time.Sleep(j.WaitAfterSuccess)
}
}
}

j.BeforeJob(j.Name)
func (j *Job) start() error {
defer func() {
j.mu.Lock()
j.Running = false
j.mu.Unlock()
}()

start := time.Now()
if err == nil {
err = j.Run(ctx)
}
j.mu.Lock()
j.Running = true
j.mu.Unlock()

if j.WithLocker && errLock == nil {
errLock = lo.unlock(ctxLock)
lo, err := j.lock()
if err != nil {
if errors.Is(err, ErrFailedToObtainLock) {
time.Sleep(time.Second)
return nil
}

if err != nil || errLock != nil {
if err != nil {
j.AfterJobError(j.Name, time.Since(start), err)
} else {
j.AfterJobError(j.Name, time.Since(start), errLock)
}
return err
}

time.Sleep(j.WaitAfterError)
} else {
j.AfterJob(j.Name, time.Since(start))
time.Sleep(j.WaitAfterSuccess)
ctx, cancel := context.WithTimeout(context.Background(), j.Timeout)
defer cancel()

j.contextCancel = cancel

err = j.run(ctx)
if err != nil {
errLock := j.unlock(lo)
if errLock != nil {
return errors.Join(err, errLock)
}

cancel()
return err
}

err = j.unlock(lo)
if err != nil {
return err
}

return nil
}

func (j *Job) lock() (lo lock, err error) {
if j.WithLocker {
lo, err = j.locker.lock(context.Background(), j.Name, j.Timeout)
}

return lo, err
}

func (j *Job) Run(ctx context.Context) (err error) {
func (j *Job) unlock(lo lock) (err error) {
if j.WithLocker {
return lo.unlock(context.Background())
}

return nil
}

func (j *Job) run(ctx context.Context) (err error) {
defer func() {
j.mu.Lock()
defer j.mu.Unlock()

j.LastRun = time.Now()
j.Running = false

r := recover()
if r != nil {
Expand Down

0 comments on commit 64609f0

Please sign in to comment.