Skip to content

Commit

Permalink
controller inserts into requests db
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 12, 2024
1 parent 74e6afe commit d42180a
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 37 deletions.
19 changes: 6 additions & 13 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/atomic"
"github.com/alecthomas/types/optional"
Expand Down Expand Up @@ -58,7 +57,6 @@ type hashRingState struct {

type DAL interface {
GetCronJobs(ctx context.Context) ([]dal.CronJob, error)
CreateCronRequest(ctx context.Context, job dal.CronJob, addr string) (model.RequestKey, error)
StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error)
EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error)
GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error)
Expand All @@ -70,7 +68,7 @@ type Scheduler interface {
}

type CallExecuter interface {
Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error)
CallWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error)
}

type Service struct {
Expand Down Expand Up @@ -193,18 +191,13 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
Body: requestJSON,
})

requestKey, err := s.dal.CreateCronRequest(ctx, job, s.originURL.Host)
requestKey := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Module, job.Verb))

callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()
_, err = s.executor.CallWithRequest(callCtx, req, optional.Some(requestKey), s.originURL.Host)
if err != nil {
logger.Errorf(err, "failed to execute cron job %v:%v", job.DeploymentKey, job.Verb)
} else {
headers.SetRequestName(req.Header(), requestKey)

callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()
_, err = s.executor.Call(callCtx, req)
if err != nil {
logger.Errorf(err, "failed to execute cron job %v:%v", job.DeploymentKey, job.Verb)
}
}

schedule, err := cron.Parse(job.Schedule)
Expand Down
7 changes: 2 additions & 5 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
)
Expand Down Expand Up @@ -62,10 +63,6 @@ func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) {
return -1, fmt.Errorf("job not found")
}

func (d *mockDAL) CreateCronRequest(ctx context.Context, job dal.CronJob, addr string) (model.RequestKey, error) {
return model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Module, job.Verb)), nil
}

func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) {
d.lock.Lock()
defer d.lock.Unlock()
Expand Down Expand Up @@ -147,7 +144,7 @@ type mockExecutor struct {
clock *clock.Mock
}

func (e *mockExecutor) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
func (e *mockExecutor) CallWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(req.Msg.Verb)

e.lock.Lock()
Expand Down
4 changes: 0 additions & 4 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,10 +1058,6 @@ func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr stri
return d.db.CreateRequest(ctx, sql.OriginIngress, key, addr)
}

func (d *DAL) CreateCronRequest(ctx context.Context, job CronJob, addr string) (model.RequestKey, error) {
key := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Module, job.Verb))
err := d.db.CreateIngressRequest(ctx, sql.OriginCron, key, addr)
return key, err
func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error) {
routes, err := d.db.GetIngressRoutes(ctx, method)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,6 @@ VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

-- name: CreateCronRequest :exec
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

-- name: UpsertController :one
INSERT INTO controller (key, endpoint)
VALUES ($1, $2)
Expand Down
20 changes: 10 additions & 10 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d42180a

Please sign in to comment.