Skip to content

Commit

Permalink
fix: added cron
Browse files Browse the repository at this point in the history
  • Loading branch information
morlay committed May 8, 2024
1 parent d24de22 commit 12069ad
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ require (
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/common/sigv4 v0.1.0 // indirect
github.com/prometheus/procfs v0.14.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/stretchr/testify v1.9.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ github.com/prometheus/prometheus v0.51.2 h1:U0faf1nT4CB9DkBW87XLJCBi2s8nwWXdTbyz
github.com/prometheus/prometheus v0.51.2/go.mod h1:yv4MwOn3yHMQ6MZGHPg/U7Fcyqf+rxqiZfSur6myVtc=
github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0 h1:sadMIsgmHpEOGbUs6VtHBXRR1OHevnj7hLx9ZcdNGW4=
github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0/go.mod h1:jgxiZysxFPM+iWKwQwPR+y+Jvo54ARd4EisXxKYpB5c=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
Expand Down
89 changes: 89 additions & 0 deletions pkg/cron/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package cron

import (
"context"
"log/slog"
"time"

"github.com/go-courier/logr"
"github.com/innoai-tech/infra/pkg/configuration"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
)

type IntervalSchedule struct {
Interval time.Duration
}

func (i IntervalSchedule) Next(t time.Time) time.Time {
return t.Add(i.Interval)
}

type Job struct {
Cron string `flag:",omitempty"`

schedule cron.Schedule
timer *time.Timer

name string
action func(ctx context.Context)
}

func (j *Job) SetDefaults() {
if j.Cron == "" {
// 每周一
// "https://crontab.guru/#0_0_*_*_1"
j.Cron = "0 0 * * 1"
}
}

func (j *Job) ApplyAction(name string, action func(ctx context.Context)) {
j.name = name
j.action = action
}

func (j *Job) Init(ctx context.Context) error {
schedule, err := cron.ParseStandard(j.Cron)
if err != nil {
return errors.Wrapf(err, "parse cron failed: %s", j.Cron)
}
j.schedule = schedule
return nil
}

var _ configuration.Server = (*Job)(nil)

func (j *Job) Serve(ctx context.Context) error {
ci := configuration.ContextInjectorFromContext(ctx)

logr.FromContext(ctx).WithValues(
slog.String("name", j.name),
slog.String("cron", j.Cron),
).Info("waiting")

j.timer = time.NewTimer(5 * time.Second)

for {
now := time.Now()

j.timer.Reset(j.schedule.Next(now).Sub(now))

select {
case <-ctx.Done():
return ctx.Err()
case now = <-j.timer.C:
if j.action != nil {
go func() {
j.action(ci.InjectContext(context.Background()))
}()
}
}
}
}

func (j *Job) Shutdown(context.Context) error {
if j.timer != nil {
j.timer.Stop()
}
return nil
}
41 changes: 41 additions & 0 deletions pkg/cron/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cron

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
)

func TestJob(t *testing.T) {
job := &Job{}
_ = job.Init(context.Background())
job.schedule = IntervalSchedule{
Interval: 50 * time.Millisecond,
}

t.Cleanup(func() {
_ = job.Shutdown(context.Background())
})

v := int64(0)
done := make(chan struct{})

job.ApplyAction("test", func(ctx context.Context) {
defer func() {
if atomic.LoadInt64(&v) >= 5 {
done <- struct{}{}
}
}()

atomic.AddInt64(&v, 1)
fmt.Println(atomic.LoadInt64(&v))
})

go func() {
_ = job.Serve(context.Background())
}()

<-done
}
41 changes: 41 additions & 0 deletions pkg/cron/zz_generated.runtimedoc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Package cron GENERATED BY gengo:runtimedoc
DON'T EDIT THIS FILE
*/
package cron

// nolint:deadcode,unused
func runtimeDoc(v any, names ...string) ([]string, bool) {
if c, ok := v.(interface {
RuntimeDoc(names ...string) ([]string, bool)
}); ok {
return c.RuntimeDoc(names...)
}
return nil, false
}

func (v IntervalSchedule) RuntimeDoc(names ...string) ([]string, bool) {
if len(names) > 0 {
switch names[0] {
case "Interval":
return []string{}, true

}

return nil, false
}
return []string{}, true
}

func (v Job) RuntimeDoc(names ...string) ([]string, bool) {
if len(names) > 0 {
switch names[0] {
case "Cron":
return []string{}, true

}

return nil, false
}
return []string{}, true
}

0 comments on commit 12069ad

Please sign in to comment.