Skip to content

Commit

Permalink
feat: cron service (#3399)
Browse files Browse the repository at this point in the history
Functional but not not completely wired up. Tests pass, metrics aren't
hooked up, no main service binary.
  • Loading branch information
alecthomas authored Nov 15, 2024
1 parent 8b8e468 commit a42ae6d
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 5 deletions.
3 changes: 0 additions & 3 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var (
Deployment *DeploymentMetrics
Ingress *IngressMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
Controller *ControllerTracing
Timeline *TimelineMetrics
)
Expand All @@ -34,8 +33,6 @@ func init() {
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
errs = errors.Join(errs, err)
Controller = initControllerTracing()
Timeline, err = initTimelineMetrics()
errs = errors.Join(errs, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package observability
import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
Expand All @@ -20,6 +21,8 @@ const (

cronJobKilledStatus = "killed"
cronJobFailedStartStatus = "failed_start"

deploymentMeterName = "ftl.deployments.cron"
)

type CronMetrics struct {
Expand All @@ -28,6 +31,16 @@ type CronMetrics struct {
jobLatency metric.Int64Histogram
}

var Cron *CronMetrics

func init() {
var err error
Cron, err = initCronMetrics()
if err != nil {
panic(fmt.Errorf("could not initialize cron metrics: %w", err))
}
}

func initCronMetrics() (*CronMetrics, error) {
result := &CronMetrics{
jobsActive: noop.Int64UpDownCounter{},
Expand Down Expand Up @@ -108,3 +121,11 @@ func cronAttributes(job model.CronJob, maybeStatus optional.Option[string]) metr
}
return metric.WithAttributes(attributes...)
}

func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}
204 changes: 204 additions & 0 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package cron

import (
"context"
"fmt"
"sort"
"time"

"connectrpc.com/connect"
"github.com/jpillora/backoff"
"golang.org/x/sync/errgroup"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/slices"
)

type PullSchemaClient interface {
PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error)
}

type CallClient interface {
Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error)
}

type cronJob struct {
module string
verb *schema.Verb
cronmd *schema.MetadataCronJob
pattern cron.Pattern
next time.Time
}

func (c cronJob) String() string {
desc := fmt.Sprintf("%s.%s (%s)", c.module, c.verb.Name, c.pattern)
var next string
if time.Until(c.next) > 0 {
next = fmt.Sprintf(" (next run in %s)", time.Until(c.next))
}
return desc + next
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, pullSchemaClient PullSchemaClient, verbClient CallClient) error {
wg, ctx := errgroup.WithContext(ctx)
changes := make(chan *ftlv1.PullSchemaResponse, 8)
// Start processing cron jobs and schema changes.
wg.Go(func() error {
return run(ctx, verbClient, changes)
})
// Start watching for schema changes.
wg.Go(func() error {
rpc.RetryStreamingServerStream(ctx, "pull-schema", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error {
changes <- resp
return nil
}, rpc.AlwaysRetry())
return nil
})
err := wg.Wait()
if err != nil {
return fmt.Errorf("cron service stopped: %w", err)
}
return nil
}

func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSchemaResponse) error {
logger := log.FromContext(ctx).Scope("cron")
// Map of cron jobs for each module.
cronJobs := map[string][]cronJob{}
// Cron jobs ordered by next execution.
cronQueue := []cronJob{}

logger.Debugf("Starting cron service")

for {
next, ok := scheduleNext(cronQueue)
var nextCh <-chan time.Time
if ok {
logger.Tracef("Next cron job scheduled in %s", next)
nextCh = time.After(next)
}
select {
case <-ctx.Done():
return fmt.Errorf("cron service stopped: %w", ctx.Err())

case resp := <-changes:
if err := updateCronJobs(cronJobs, resp); err != nil {
logger.Errorf(err, "Failed to update cron jobs")
continue
}
cronQueue = rebuildQueue(cronJobs)

// Execute scheduled cron job
case <-nextCh:
job := cronQueue[0]
logger.Debugf("Executing cron job %s", job)

nextRun, err := cron.Next(job.pattern, false)
if err != nil {
logger.Errorf(err, "Failed to calculate next run time")
continue
}
job.next = nextRun
cronQueue[0] = job
orderQueue(cronQueue)

if err := callCronJob(ctx, verbClient, job); err != nil {
logger.Errorf(err, "Failed to execute cron job")
}
}
}
}

func callCronJob(ctx context.Context, verbClient CallClient, cronJob cronJob) error {
logger := log.FromContext(ctx).Scope("cron")
ref := schema.Ref{Module: cronJob.module, Name: cronJob.verb.Name}
logger.Debugf("Calling cron job %s", cronJob)
resp, err := verbClient.Call(ctx, connect.NewRequest(&ftlv1.CallRequest{
Verb: ref.ToProto().(*schemapb.Ref),
Body: []byte(`{}`),
Metadata: &ftlv1.Metadata{},
}))
if err != nil {
return fmt.Errorf("%s: call to cron job failed: %w", ref, err)
}
switch resp := resp.Msg.Response.(type) {
default:
return nil

case *ftlv1.CallResponse_Error_:
return fmt.Errorf("%s: cron job failed: %s", ref, resp.Error.Message)
}
}

func scheduleNext(cronQueue []cronJob) (time.Duration, bool) {
if len(cronQueue) == 0 {
return 0, false
}
return time.Until(cronQueue[0].next), true
}

func updateCronJobs(cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaResponse) error {
switch resp.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(cronJobs, resp.ModuleName)

case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
moduleSchema, err := schema.ModuleFromProto(resp.Schema)
if err != nil {
return fmt.Errorf("failed to extract module schema: %w", err)
}
moduleJobs, err := extractCronJobs(moduleSchema)
if err != nil {
return fmt.Errorf("failed to extract cron jobs: %w", err)
}
cronJobs[resp.ModuleName] = moduleJobs
}
return nil
}

func orderQueue(queue []cronJob) {
sort.SliceStable(queue, func(i, j int) bool {
return queue[i].next.Before(queue[j].next)
})
}

func rebuildQueue(cronJobs map[string][]cronJob) []cronJob {
queue := make([]cronJob, 0, len(cronJobs)*2) // Assume 2 cron jobs per module.
for _, jobs := range cronJobs {
queue = append(queue, jobs...)
}
orderQueue(queue)
return queue
}

func extractCronJobs(module *schema.Module) ([]cronJob, error) {
cronJobs := []cronJob{}
for verb := range slices.FilterVariants[*schema.Verb](module.Decls) {
cronmd, ok := slices.FindVariant[*schema.MetadataCronJob](verb.Metadata)
if !ok {
continue
}
pattern, err := cron.Parse(cronmd.Cron)
if err != nil {
return nil, fmt.Errorf("%s: %w", cronmd.Pos, err)
}
next, err := cron.Next(pattern, false)
if err != nil {
return nil, fmt.Errorf("%s: %w", cronmd.Pos, err)
}
cronJobs = append(cronJobs, cronJob{
module: module.Name,
verb: verb,
cronmd: cronmd,
pattern: pattern,
next: next,
})
}
return cronJobs, nil
}
109 changes: 109 additions & 0 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package cron

import (
"context"
"os"
"sort"
"testing"
"time"

"connectrpc.com/connect"
"golang.org/x/sync/errgroup"

"github.com/alecthomas/assert/v2"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
)

type verbClient struct {
requests chan *ftlv1.CallRequest
}

var _ CallClient = (*verbClient)(nil)

func (v *verbClient) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
v.requests <- req.Msg
return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: []byte("{}")}}), nil
}

func TestCron(t *testing.T) {
changes := make(chan *ftlv1.PullSchemaResponse, 8)
module := &schema.Module{
Name: "echo",
Decls: []schema.Decl{
&schema.Verb{
Name: "echo",
Request: &schema.Unit{},
Response: &schema.Unit{},
Metadata: []schema.Metadata{
&schema.MetadataCronJob{Cron: "*/2 * * * * *"},
},
},
&schema.Verb{
Name: "time",
Request: &schema.Unit{},
Response: &schema.Unit{},
Metadata: []schema.Metadata{
&schema.MetadataCronJob{Cron: "*/2 * * * * *"},
},
},
},
}
changes <- &ftlv1.PullSchemaResponse{
ModuleName: "echo",
Schema: module.ToProto().(*schemapb.Module), //nolint:forcetypeassert
}

ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace}))
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
t.Cleanup(cancel)

wg, ctx := errgroup.WithContext(ctx)

requestsch := make(chan *ftlv1.CallRequest, 8)
client := &verbClient{
requests: requestsch,
}

wg.Go(func() error { return run(ctx, client, changes) })

requests := make([]*ftlv1.CallRequest, 0, 2)

done:
for range 2 {
select {
case <-ctx.Done():
t.Fatalf("timed out: %s", ctx.Err())

case request := <-requestsch:
requests = append(requests, request)
if len(requests) == 2 {
break done
}
}
}

cancel()

sort.SliceStable(requests, func(i, j int) bool {
return requests[i].Verb.Name < requests[j].Verb.Name
})
assert.Equal(t, []*ftlv1.CallRequest{
{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.Ref{Module: "echo", Name: "echo"},
Body: []byte("{}"),
},
{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.Ref{Module: "echo", Name: "time"},
Body: []byte("{}"),
},
}, requests, assert.Exclude[*schemapb.Position]())

err := wg.Wait()
assert.IsError(t, err, context.Canceled)
}
1 change: 1 addition & 0 deletions internal/cron/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
parser = participle.MustBuild[Pattern](parserOptions...)
)

// Pattern represents a cron schedule.
type Pattern struct {
Duration *string `parser:"@(Number ('s' | 'm' | 'h'))"`
DayOfWeek *DayOfWeek `parser:"| @('Mon' | 'Tue' | 'Wed' | 'Thu' | 'Fri' | 'Sat' | 'Sun')"`
Expand Down
4 changes: 2 additions & 2 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func AlwaysRetry() func(error) bool {
}

// RetryStreamingServerStream will repeatedly call handler with responses from
// the stream returned by "rpc" until handler returns an error or the context is
// cancelled.
// the stream returned by "rpc" until either the context is cancelled or the
// errorRetryCallback returns false.
func RetryStreamingServerStream[Req, Resp any](
ctx context.Context,
name string,
Expand Down
Loading

0 comments on commit a42ae6d

Please sign in to comment.