Skip to content

Commit

Permalink
fix: maybe discard crob job if execute job delay
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenzou committed Nov 1, 2023
1 parent 871a6e9 commit f80911a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
4 changes: 2 additions & 2 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func (t *task[T]) ready() bool {

func (t *task[T]) scheduleNextRun() {
t.LastRunTime = t.NextRunTime
t.NextRunTime = t.Expr.Next(t.LastRunTime.In(t.Location)).Truncate(time.Microsecond)
t.NextRunTime = t.Expr.Next(t.LastRunTime.In(t.Location))
}

func (t *task[T]) untilNextRun() time.Duration {
if t.ready() {
return 0
}
return t.NextRunTime.Sub(t.now()).Truncate(time.Microsecond)
return t.NextRunTime.Sub(t.now())
}

type Dispatcher[T any] interface {
Expand Down
32 changes: 18 additions & 14 deletions cron/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,26 @@ func (d *dispatcher[T]) takeReadyTask() (time.Duration, bool) {
d.locker.Lock()
defer d.locker.Unlock()

t, ok := d.heap.Peek()
for {
t, ok := d.heap.Peek()

if !ok {
return maxYieldDuration, false
}
if !t.ready() {
return getYieldDuration(t), false
}
d.logger.Debug("peek task: ",
slog.Int("id", int(t.ID)),
slog.String("next_run_time", t.NextRunTime.String()),
slog.Bool("ready", t.ready()),
)

_, _ = d.heap.Pop()
t.scheduleNextRun()
d.heap.Push(t)
if !ok {
return maxYieldDuration, false
}
if !t.ready() {
return getYieldDuration(t), false
}

_, _ = d.heap.Pop()
t.scheduleNextRun()
d.heap.Push(t)

select {
case d.readyChan <- t.Task:
default:
d.readyChan <- t.Task
}
return 0, true
}
23 changes: 19 additions & 4 deletions example/schedule/main.go → example/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,39 @@ func main() {
callable1 := executors.RunnableFunc(func(ctx context.Context) {
println("1:", time.Now().Format(time.RFC3339))
})

callable2 := executors.RunnableFunc(func(ctx context.Context) {
println("2:", time.Now().Format(time.RFC3339))
})

callable3 := executors.RunnableFunc(func(ctx context.Context) {
println("3:", time.Now().Format(time.RFC3339))
})

_, err := executor.ScheduleAtCronRate(callable1, executors.CRONRule{
Expr: "0 4 * * *",
// Expr: "0/1 * * * *",
// Expr: "*/1 * * * *",
// Expr: "*/1 * * * * * *",
Expr: "*/5 * * * * * *",
Timezone: "Local",
})
if err != nil {
panic(err)
}

_, err = executor.ScheduleAtCronRate(callable2, executors.CRONRule{
Expr: "0 4 * * *",
// Expr: "0 4 * * *",
// Expr: "*/1 * * * *",
Expr: "*/5 * * * * * *",
Timezone: "Local",
})
if err != nil {
panic(err)
}

_, err = executor.ScheduleAtCronRate(callable3, executors.CRONRule{
// Expr: "0 4 * * *",
// Expr: "*/1 * * * *",
// Expr: "*/1 * * * * * *",
Expr: "*/5 * * * * * *",
Timezone: "Local",
})
if err != nil {
Expand Down

0 comments on commit f80911a

Please sign in to comment.