Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: timeline events for scheduled cron jobs #2860

Merged
merged 14 commits into from
Oct 2, 2024
22 changes: 22 additions & 0 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter
eventTypes = append(eventTypes, timeline.EventTypeDeploymentUpdated)
case pbconsole.EventType_EVENT_TYPE_INGRESS:
eventTypes = append(eventTypes, timeline.EventTypeIngress)
case pbconsole.EventType_EVENT_TYPE_CRON_SCHEDULED:
eventTypes = append(eventTypes, timeline.EventTypeCronScheduled)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
Expand Down Expand Up @@ -492,6 +494,26 @@ func eventDALToProto(event timeline.Event) *pbconsole.Event {
},
}

case *timeline.CronScheduledEvent:
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_CronScheduled{
CronScheduled: &pbconsole.CronScheduledEvent{
DeploymentKey: event.DeploymentKey.String(),
VerbRef: &schemapb.Ref{
Module: event.Verb.Module,
Name: event.Verb.Name,
},
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
ScheduledAt: timestamppb.New(event.ScheduledAt),
Schedule: event.Schedule,
Error: event.Error.Ptr(),
},
},
}

default:
panic(fmt.Errorf("unknown event type %T", event))
}
Expand Down
5 changes: 3 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
runnerScaling: runnerScaling,
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

pubSub := pubsub.New(conn, encryption, svc.tasks, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub
Expand All @@ -284,6 +282,9 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn)
svc.cronJobs = cronSvc

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

// Use min, max backoff if we are running in production, otherwise use
Expand Down
61 changes: 45 additions & 16 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"database/sql"
"errors"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal"
encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -20,24 +23,26 @@ import (
)

type Service struct {
key model.ControllerKey
requestSource string
dal dal.DAL
encryption *encryptionsvc.Service
clock clock.Clock
key model.ControllerKey
requestSource string
dal dal.DAL
encryption *encryptionsvc.Service
timelineService *timeline.Service
clock clock.Clock
}

func New(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, conn *sql.DB) *Service {
return NewForTesting(ctx, key, requestSource, encryption, *dal.New(conn), clock.New())
func New(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, timeline *timeline.Service, conn *sql.DB) *Service {
return NewForTesting(ctx, key, requestSource, encryption, timeline, *dal.New(conn), clock.New())
}

func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, dal dal.DAL, clock clock.Clock) *Service {
func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, timeline *timeline.Service, dal dal.DAL, clock clock.Clock) *Service {
svc := &Service{
key: key,
requestSource: requestSource,
dal: dal,
encryption: encryption,
clock: clock,
key: key,
requestSource: requestSource,
dal: dal,
encryption: encryption,
timelineService: timeline,
clock: clock,
}
return svc
}
Expand Down Expand Up @@ -115,8 +120,16 @@ func (s *Service) scheduleCronJobs(ctx context.Context) (err error) {
}
logger.Tracef("Scheduling %d cron jobs", len(jobs))
for _, job := range jobs {
err = s.scheduleCronJob(ctx, tx, job)
err = s.scheduleCronJob(ctx, tx, job, now)
if err != nil {
s.timelineService.EnqueueEvent(ctx, &timeline.CronScheduled{
DeploymentKey: job.DeploymentKey,
Verb: job.Verb,
Time: now,
ScheduledAt: job.NextExecution,
Schedule: job.Schedule,
Error: optional.Some(err.Error()),
})
return fmt.Errorf("failed to schedule cron job %q: %w", job.Key, err)
}
}
Expand All @@ -129,6 +142,7 @@ func (s *Service) scheduleCronJobs(ctx context.Context) (err error) {
func (s *Service) OnJobCompletion(ctx context.Context, key model.CronJobKey, failed bool) (err error) {
logger := log.FromContext(ctx).Scope("cron")
logger.Tracef("Cron job %q completed with failed=%v", key, failed)
now := s.clock.Now().UTC()

tx, err := s.dal.Begin(ctx)
if err != nil {
Expand All @@ -140,15 +154,23 @@ func (s *Service) OnJobCompletion(ctx context.Context, key model.CronJobKey, fai
if err != nil {
return fmt.Errorf("failed to get cron job %q: %w", key, err)
}
err = s.scheduleCronJob(ctx, tx, job)
err = s.scheduleCronJob(ctx, tx, job, now)
if err != nil {
s.timelineService.EnqueueEvent(ctx, &timeline.CronScheduled{
DeploymentKey: job.DeploymentKey,
Verb: job.Verb,
Time: now,
ScheduledAt: job.NextExecution,
Schedule: job.Schedule,
Error: optional.Some(err.Error()),
})
return fmt.Errorf("failed to schedule cron job %q: %w", key, err)
}
return nil
}

// scheduleCronJob schedules the next execution of a single cron job.
func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.CronJob) error {
func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.CronJob, startTime time.Time) error {
logger := log.FromContext(ctx).Scope("cron").Module(job.Verb.Module)
now := s.clock.Now().UTC()
pending, err := tx.IsCronJobPending(ctx, job.Key, now)
Expand Down Expand Up @@ -206,5 +228,12 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr
if err != nil {
return fmt.Errorf("failed to update cron job %q: %w", job.Key, err)
}
safeer marked this conversation as resolved.
Show resolved Hide resolved
s.timelineService.EnqueueEvent(ctx, &timeline.CronScheduled{
DeploymentKey: job.DeploymentKey,
Verb: job.Verb,
Time: startTime,
ScheduledAt: nextAttemptForJob,
Schedule: job.Schedule,
})
return nil
}
5 changes: 4 additions & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
Expand Down Expand Up @@ -57,9 +58,11 @@ func TestNewCronJobsForModule(t *testing.T) {
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)

// Progress so that start_time is valid
clk.Add(time.Second)
cjs := NewForTesting(ctx, key, "test.com", encryption, *dal, clk)
cjs := NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)
// All jobs need to be scheduled
expectUnscheduledJobs(t, dal, clk, 2)
unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- migrate:up

ALTER TYPE event_type ADD VALUE IF NOT EXISTS 'cron_scheduled';
safeer marked this conversation as resolved.
Show resolved Hide resolved

-- migrate:down

76 changes: 76 additions & 0 deletions backend/controller/timeline/events_cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package timeline

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/alecthomas/types/optional"

ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
)

type CronScheduledEvent struct {
ID int64
Duration time.Duration
CronScheduled
}

func (e *CronScheduledEvent) GetID() int64 { return e.ID }
func (e *CronScheduledEvent) event() {}

type CronScheduled struct {
DeploymentKey model.DeploymentKey
Verb schema.Ref

Time time.Time
ScheduledAt time.Time
Schedule string
Error optional.Option[string]
}

func (*CronScheduled) inEvent() {}

type eventCronScheduledJSON struct {
DurationMS int64 `json:"duration_ms"`
ScheduledAt time.Time `json:"scheduled_at"`
Schedule string `json:"schedule"`
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduled) error {
cronJSON := eventCronScheduledJSON{
DurationMS: time.Since(event.Time).Milliseconds(),
ScheduledAt: event.ScheduledAt,
Schedule: event.Schedule,
Error: event.Error,
}

data, err := json.Marshal(cronJSON)
if err != nil {
return fmt.Errorf("failed to marshal cron JSON: %w", err)
}

var payload ftlencryption.EncryptedTimelineColumn
err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
if err != nil {
return fmt.Errorf("failed to encrypt cron JSON: %w", err)
}

err = libdal.TranslatePGError(querier.InsertTimelineCronScheduledEvent(ctx, sql.InsertTimelineCronScheduledEventParams{
DeploymentKey: event.DeploymentKey,
TimeStamp: event.Time,
Module: event.Verb.Module,
Verb: event.Verb.Name,
Payload: payload,
}))
if err != nil {
return fmt.Errorf("failed to insert cron event: %w", err)
}
return err
}
1 change: 1 addition & 0 deletions backend/controller/timeline/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/timeline/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions backend/controller/timeline/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ VALUES (
sqlc.arg('payload')
);

-- name: InsertTimelineCronScheduledEvent :exec
INSERT INTO timeline (
deployment_id,
time_stamp,
type,
custom_key_1,
custom_key_2,
payload
)
VALUES (
(SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
sqlc.arg('time_stamp')::TIMESTAMPTZ,
'cron_scheduled',
sqlc.arg('module')::TEXT,
sqlc.arg('verb')::TEXT,
sqlc.arg('payload')
);

-- name: DeleteOldTimelineEvents :one
WITH deleted AS (
DELETE FROM timeline
Expand Down
38 changes: 38 additions & 0 deletions backend/controller/timeline/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions backend/controller/timeline/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,23 @@ func (s *Service) transformRowsToTimelineEvents(deploymentKeys map[int64]model.D
ResponseHeader: jsonPayload.ResponseHeader,
Error: jsonPayload.Error,
})
case sql.EventTypeCronScheduled:
var jsonPayload eventCronScheduledJSON
if err := s.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt cron scheduled event: %w", err)
}
out = append(out, &CronScheduledEvent{
ID: row.ID,
Duration: time.Duration(jsonPayload.DurationMS) * time.Millisecond,
CronScheduled: CronScheduled{
DeploymentKey: row.DeploymentKey,
Verb: schema.Ref{Module: row.CustomKey1.MustGet(), Name: row.CustomKey2.MustGet()},
Time: row.TimeStamp,
ScheduledAt: jsonPayload.ScheduledAt,
Schedule: jsonPayload.Schedule,
Error: jsonPayload.Error,
},
})

default:
panic("unknown event type: " + row.Type)
Expand Down
Loading
Loading