Skip to content

Commit

Permalink
feat: cron job service
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 12, 2024
1 parent 8ca5926 commit 920ad92
Show file tree
Hide file tree
Showing 12 changed files with 859 additions and 73 deletions.
48 changes: 41 additions & 7 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl/backend/controller/cronjobs"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/scaling"
Expand All @@ -49,10 +50,11 @@ import (

// CommonConfig between the production controller and development server.
type CommonConfig struct {
AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"`
NoConsole bool `help:"Disable the console."`
IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"`
WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"`
AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"`
NoConsole bool `help:"Disable the console."`
IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"`
WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"`
CronJobTimeout time.Duration `help:"Timeout for cron jobs." default:"5m"`
}

type Config struct {
Expand Down Expand Up @@ -138,12 +140,18 @@ type clients struct {
runner ftlv1connect.RunnerServiceClient
}

type ControllerListListener interface {
UpdatedControllerList(ctx context.Context, controllers []dal.Controller)
}

type Service struct {
dal *dal.DAL
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink

tasks *scheduledtask.Scheduler
tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
controllerListListeners []ControllerListListener

// Map from endpoint to client.
clients *ttlcache.Cache[string, clients]
Expand All @@ -163,7 +171,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
}
config.SetDefaults()
svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
tasks: scheduledtask.New(ctx, key),
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand All @@ -174,8 +182,14 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
increaseReplicaFailures: map[string]int{},
}

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc)
_, _ = svc.updateControllersList(ctx)

svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes)
svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController)
svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, svc.updateControllersList)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments)
Expand Down Expand Up @@ -422,6 +436,9 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
return nil, fmt.Errorf("could not replace deployment: %w", err)
}
}

s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey)

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}

Expand Down Expand Up @@ -732,11 +749,18 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, nil)
cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema)
if err != nil {
logger.Errorf(err, "Could not generate cron jobs for new deployment")
return nil, fmt.Errorf("%s: %w", "could not generate cron jobs for new deployment", err)
}

dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
}

deploymentLogger := s.getDeploymentLogger(ctx, dkey)
deploymentLogger.Debugf("Created deployment %s", dkey)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil
Expand Down Expand Up @@ -999,7 +1023,17 @@ func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error
return 0, fmt.Errorf("failed to heartbeat controller: %w", err)
}
return time.Second * 3, nil
}

func (s *Service) updateControllersList(ctx context.Context) (time.Duration, error) {
controllers, err := s.dal.GetControllers(ctx, false)
if err != nil {
return 0, err
}
for _, listener := range s.controllerListListeners {
listener.UpdatedControllerList(ctx, controllers)
}
return time.Second * 5, nil
}

func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error {
Expand Down
Loading

0 comments on commit 920ad92

Please sign in to comment.