From 12069adfe35c6cc6f8b222a7c3c50e8365ae0533 Mon Sep 17 00:00:00 2001 From: Morlay Date: Wed, 8 May 2024 12:10:32 +0800 Subject: [PATCH] fix: added cron --- go.mod | 1 + go.sum | 2 + pkg/cron/job.go | 89 +++++++++++++++++++++++++++++ pkg/cron/job_test.go | 41 +++++++++++++ pkg/cron/zz_generated.runtimedoc.go | 41 +++++++++++++ 5 files changed, 174 insertions(+) create mode 100644 pkg/cron/job.go create mode 100644 pkg/cron/job_test.go create mode 100644 pkg/cron/zz_generated.runtimedoc.go diff --git a/go.mod b/go.mod index bb4b19a..8d6b13b 100644 --- a/go.mod +++ b/go.mod @@ -84,6 +84,7 @@ require ( github.com/prometheus/common v0.53.0 // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect github.com/prometheus/procfs v0.14.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/stretchr/testify v1.9.0 // indirect go.opentelemetry.io/proto/otlp v1.2.0 // indirect go.uber.org/atomic v1.11.0 // indirect diff --git a/go.sum b/go.sum index 0380ccb..c46ffb6 100644 --- a/go.sum +++ b/go.sum @@ -416,6 +416,8 @@ github.com/prometheus/prometheus v0.51.2 h1:U0faf1nT4CB9DkBW87XLJCBi2s8nwWXdTbyz github.com/prometheus/prometheus v0.51.2/go.mod h1:yv4MwOn3yHMQ6MZGHPg/U7Fcyqf+rxqiZfSur6myVtc= github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0 h1:sadMIsgmHpEOGbUs6VtHBXRR1OHevnj7hLx9ZcdNGW4= github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0/go.mod h1:jgxiZysxFPM+iWKwQwPR+y+Jvo54ARd4EisXxKYpB5c= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= diff --git a/pkg/cron/job.go b/pkg/cron/job.go new file mode 100644 index 0000000..661daea --- /dev/null +++ b/pkg/cron/job.go @@ -0,0 +1,89 @@ +package cron + +import ( + "context" + "log/slog" + "time" + + "github.com/go-courier/logr" + "github.com/innoai-tech/infra/pkg/configuration" + "github.com/pkg/errors" + "github.com/robfig/cron/v3" +) + +type IntervalSchedule struct { + Interval time.Duration +} + +func (i IntervalSchedule) Next(t time.Time) time.Time { + return t.Add(i.Interval) +} + +type Job struct { + Cron string `flag:",omitempty"` + + schedule cron.Schedule + timer *time.Timer + + name string + action func(ctx context.Context) +} + +func (j *Job) SetDefaults() { + if j.Cron == "" { + // 每周一 + // "https://crontab.guru/#0_0_*_*_1" + j.Cron = "0 0 * * 1" + } +} + +func (j *Job) ApplyAction(name string, action func(ctx context.Context)) { + j.name = name + j.action = action +} + +func (j *Job) Init(ctx context.Context) error { + schedule, err := cron.ParseStandard(j.Cron) + if err != nil { + return errors.Wrapf(err, "parse cron failed: %s", j.Cron) + } + j.schedule = schedule + return nil +} + +var _ configuration.Server = (*Job)(nil) + +func (j *Job) Serve(ctx context.Context) error { + ci := configuration.ContextInjectorFromContext(ctx) + + logr.FromContext(ctx).WithValues( + slog.String("name", j.name), + slog.String("cron", j.Cron), + ).Info("waiting") + + j.timer = time.NewTimer(5 * time.Second) + + for { + now := time.Now() + + j.timer.Reset(j.schedule.Next(now).Sub(now)) + + select { + case <-ctx.Done(): + return ctx.Err() + case now = <-j.timer.C: + if j.action != nil { + go func() { + j.action(ci.InjectContext(context.Background())) + }() + } + } + } +} + +func (j *Job) Shutdown(context.Context) error { + if j.timer != nil { + j.timer.Stop() + } + return nil +} diff --git a/pkg/cron/job_test.go b/pkg/cron/job_test.go new file mode 100644 index 0000000..846890d --- /dev/null +++ b/pkg/cron/job_test.go @@ -0,0 +1,41 @@ +package cron + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" +) + +func TestJob(t *testing.T) { + job := &Job{} + _ = job.Init(context.Background()) + job.schedule = IntervalSchedule{ + Interval: 50 * time.Millisecond, + } + + t.Cleanup(func() { + _ = job.Shutdown(context.Background()) + }) + + v := int64(0) + done := make(chan struct{}) + + job.ApplyAction("test", func(ctx context.Context) { + defer func() { + if atomic.LoadInt64(&v) >= 5 { + done <- struct{}{} + } + }() + + atomic.AddInt64(&v, 1) + fmt.Println(atomic.LoadInt64(&v)) + }) + + go func() { + _ = job.Serve(context.Background()) + }() + + <-done +} diff --git a/pkg/cron/zz_generated.runtimedoc.go b/pkg/cron/zz_generated.runtimedoc.go new file mode 100644 index 0000000..9f93aee --- /dev/null +++ b/pkg/cron/zz_generated.runtimedoc.go @@ -0,0 +1,41 @@ +/* +Package cron GENERATED BY gengo:runtimedoc +DON'T EDIT THIS FILE +*/ +package cron + +// nolint:deadcode,unused +func runtimeDoc(v any, names ...string) ([]string, bool) { + if c, ok := v.(interface { + RuntimeDoc(names ...string) ([]string, bool) + }); ok { + return c.RuntimeDoc(names...) + } + return nil, false +} + +func (v IntervalSchedule) RuntimeDoc(names ...string) ([]string, bool) { + if len(names) > 0 { + switch names[0] { + case "Interval": + return []string{}, true + + } + + return nil, false + } + return []string{}, true +} + +func (v Job) RuntimeDoc(names ...string) ([]string, bool) { + if len(names) > 0 { + switch names[0] { + case "Cron": + return []string{}, true + + } + + return nil, false + } + return []string{}, true +}