Skip to content

Commit

Permalink
Merge branch 'main' into dli/decl-type-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
deniseli authored Sep 27, 2024
2 parents ed5bcfc + 909706c commit 691700d
Show file tree
Hide file tree
Showing 64 changed files with 2,354 additions and 2,082 deletions.
12 changes: 7 additions & 5 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ live-rebuild:

# Run "ftl dev" with live-reloading whenever source changes.
dev *args:
watchexec -r {{WATCHEXEC_ARGS}} -- "just build-sqlc && ftl dev {{args}}"
watchexec -r {{WATCHEXEC_ARGS}} -- "just build-sqlc && ftl dev --plain {{args}}"

# Build everything
build-all: build-protos-unconditionally build-backend build-backend-tests build-frontend build-generate build-sqlc build-zips lsp-generate build-java generate-kube-migrations
Expand Down Expand Up @@ -82,11 +82,13 @@ export DATABASE_URL := "postgres://postgres:secret@localhost:15432/ftl?sslmode=d
init-db:
dbmate drop || true
dbmate create
dbmate --migrations-dir backend/controller/sql/schema up
dbmate --no-dump-schema --migrations-dir backend/controller/sql/schema up

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/async_queries.sql backend/controller/cronjobs/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk $(eval echo $(yq '.sql[].gen.go.out + "/{db.go,models.go,querier.go,queries.sql.go}"' sqlc.yaml)) \
: sqlc.yaml $(yq '.sql[].queries[]' sqlc.yaml) \
-- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips:
Expand Down Expand Up @@ -138,14 +140,14 @@ integration-tests *test:
#!/bin/bash
set -euo pipefail
testName=${1:-}
for i in {1..3}; do go test -fullpath -count 1 -v -tags integration -run "$testName" -p 1 $(find . -type f -name '*_test.go' -print0 | xargs -0 grep -r -l "$testName" | xargs grep -l '//go:build integration' | xargs -I {} dirname './{}') && break; done
for i in {1..3}; do go test -fullpath -count 1 -v -tags integration -run "$testName" -p 1 $(find . -type f -name '*_test.go' -print0 | xargs -0 grep -r -l "$testName" | xargs grep -l '//go:build integration' | xargs -I {} dirname './{}') && break || true; done

# Run integration test(s)
infrastructure-tests *test:
#!/bin/bash
set -euo pipefail
testName=${1:-}
for i in {1..3}; do go test -fullpath -count 1 -v -tags infrastructure -run "$testName" -p 1 $(find . -type f -name '*_test.go' -print0 | xargs -0 grep -r -l "$testName" | xargs grep -l '//go:build infrastructure' | xargs -I {} dirname './{}') && break; done
for i in {1..3}; do go test -fullpath -count 1 -v -tags infrastructure -run "$testName" -p 1 $(find . -type f -name '*_test.go' -print0 | xargs -0 grep -r -l "$testName" | xargs grep -l '//go:build infrastructure' | xargs -I {} dirname './{}') && break || true; done

# Run README doc tests
test-readme *args:
Expand Down
159 changes: 70 additions & 89 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/modulecontext"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
ftlreflect "github.com/TBD54566975/ftl/internal/reflect"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/sha256"
Expand Down Expand Up @@ -229,9 +228,8 @@ type Service struct {
clients *ttlcache.Cache[string, clients]

// Complete schema synchronised from the database.
schema atomic.Value[*schema.Schema]
schemaState atomic.Value[schemaState]

routes atomic.Value[map[string]Route]
config Config

increaseReplicaFailures map[string]int
Expand Down Expand Up @@ -270,9 +268,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
increaseReplicaFailures: map[string]int{},
runnerScaling: runnerScaling,
}
svc.routes.Store(map[string]Route{})
svc.schema.Store(&schema.Schema{})

svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

Expand All @@ -286,15 +282,22 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

go svc.syncSchema(ctx)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
// runtime is capped at 1s.
maybeDevelTask := func(job scheduledtask.Job, maxNext, minDelay, maxDelay time.Duration, develBackoff ...backoff.Backoff) (backoff.Backoff, scheduledtask.Job) {
maybeDevelTask := func(job scheduledtask.Job, name string, maxNext, minDelay, maxDelay time.Duration, develBackoff ...backoff.Backoff) (backoff.Backoff, scheduledtask.Job) {
if len(develBackoff) > 1 {
panic("too many devel backoffs")
}
chain := job

// Trace controller operations
job = func(ctx context.Context) (time.Duration, error) {
ctx, span := observability.Controller.BeginSpan(ctx, name)
defer span.End()
return chain(ctx)
}

if devel {
chain := job
job = func(ctx context.Context) (time.Duration, error) {
Expand All @@ -310,22 +313,32 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
return makeBackoff(minDelay, maxDelay), job
}

parallelTask := func(job scheduledtask.Job, name string, maxNext, minDelay, maxDelay time.Duration, develBackoff ...backoff.Backoff) {
maybeDevelJob, backoff := maybeDevelTask(job, name, maxNext, minDelay, maxDelay, develBackoff...)
svc.tasks.Parallel(name, maybeDevelJob, backoff)
}

singletonTask := func(job scheduledtask.Job, name string, maxNext, minDelay, maxDelay time.Duration, develBackoff ...backoff.Backoff) {
maybeDevelJob, backoff := maybeDevelTask(job, name, maxNext, minDelay, maxDelay, develBackoff...)
svc.tasks.Singleton(name, maybeDevelJob, backoff)
}

// Parallel tasks.
svc.tasks.Parallel(maybeDevelTask(svc.syncRoutes, time.Second, time.Second, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.heartbeatController, time.Second, time.Second*3, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.updateControllersList, time.Second, time.Second*5, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.executeAsyncCalls, time.Second, time.Second*5, time.Second*10))
parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5)
parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5)
parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5)
parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10)

// This should be a singleton task, but because this is the task that
// actually expires the leases used to run singleton tasks, it must be
// parallel.
svc.tasks.Parallel(maybeDevelTask(svc.expireStaleLeases, time.Second*2, time.Second, time.Second*5))
parallelTask(svc.expireStaleLeases, "expire-stale-leases", time.Second*2, time.Second, time.Second*5)

// Singleton tasks use leases to only run on a single controller.
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleControllers, time.Second*2, time.Second*20, time.Second*20))
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleRunners, time.Second*2, time.Second, time.Second*10))
svc.tasks.Singleton(maybeDevelTask(svc.reapCallEvents, time.Minute*5, time.Minute, time.Minute*30))
svc.tasks.Singleton(maybeDevelTask(svc.reapAsyncCalls, time.Second*5, time.Second, time.Second*5))
singletonTask(svc.reapStaleControllers, "reap-stale-controllers", time.Second*2, time.Second*20, time.Second*20)
singletonTask(svc.reapStaleRunners, "reap-stale-runners", time.Second*2, time.Second, time.Second*10)
singletonTask(svc.reapCallEvents, "reap-call-events", time.Minute*5, time.Minute, time.Minute*30)
singletonTask(svc.reapAsyncCalls, "reap-async-calls", time.Second*5, time.Second, time.Second*5)
return svc, nil
}

Expand All @@ -342,14 +355,8 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("failed to resolve route from dal"))
return
}
sch, err := s.dal.GetActiveSchema(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("could not get active schema"))
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(start, sch, requestKey, routes, w, r, s.timeline, s.callWithRequest)
ingress.Handle(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.timeline, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -392,7 +399,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
if err != nil {
return nil, fmt.Errorf("could not get status: %w", err)
}
sroutes := s.routes.Load()
sroutes := s.schemaState.Load().routes
routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) {
return &ftlv1.StatusResponse_Route{
Module: route.Module,
Expand Down Expand Up @@ -621,7 +628,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
}()
deferredDeregistration = true
}
_, err = s.syncRoutes(ctx)
_, err = s.syncRoutesAndSchema(ctx)
if err != nil {
return nil, fmt.Errorf("could not sync routes: %w", err)
}
Expand Down Expand Up @@ -691,7 +698,7 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
}

// It's not actually ready until it is in the routes table
routes := s.routes.Load()
routes := s.schemaState.Load().routes
var missing []string
for _, module := range s.config.WaitFor {
if _, ok := routes[module]; !ok {
Expand Down Expand Up @@ -861,7 +868,7 @@ func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *d

var candidates []string

sch := s.schema.Load()
sch := s.schemaState.Load().schema

updateCandidates := func(ref *schema.Ref) (brk bool, err error) {
verb := &schema.Verb{}
Expand Down Expand Up @@ -928,7 +935,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err))
}
defer tx.CommitOrRollback(ctx, &err)
sch := s.schema.Load()
sch := s.schemaState.Load().schema
msg := req.Msg
fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg)
if err != nil {
Expand Down Expand Up @@ -990,16 +997,13 @@ func (s *Service) callWithRequest(
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required"))
}

sch, err := s.dal.GetActiveSchema(ctx)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("schema retrieval failed"))
return nil, err
}
sstate := s.schemaState.Load()
sch := sstate.schema

verbRef := schema.RefFromProto(req.Msg.Verb)
verb := &schema.Verb{}

if err = sch.ResolveToType(verbRef, verb); err != nil {
if err := sch.ResolveToType(verbRef, verb); err != nil {
if errors.Is(err, schema.ErrNotFound) {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb not found"))
return nil, connect.NewError(connect.CodeNotFound, err)
Expand All @@ -1008,14 +1012,14 @@ func (s *Service) callWithRequest(
return nil, err
}

err = ingress.ValidateCallBody(req.Msg.Body, verb, sch)
err := ingress.ValidateCallBody(req.Msg.Body, verb, sch)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body"))
return nil, err
}

module := verbRef.Module
route, ok := s.routes.Load()[module]
route, ok := sstate.routes[module]
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
Expand Down Expand Up @@ -1365,7 +1369,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

sch := s.schema.Load()
sch := s.schemaState.Load().schema

verb := &schema.Verb{}
if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil {
Expand Down Expand Up @@ -1543,7 +1547,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
return nil
}

sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm := &schema.FSM{}
err = sch.ResolveToType(origin.FSM.ToRef(), fsm)
Expand Down Expand Up @@ -1578,7 +1582,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
}

func (s *Service) resolveFSMEvent(msg *ftlv1.SendFSMEventRequest) (fsm *schema.FSM, eventType schema.Type, fsmKey schema.RefKey, err error) {
sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm = &schema.FSM{}
if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil {
Expand Down Expand Up @@ -1757,8 +1761,10 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs)
}

// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) {
// Periodically sync the routing table and schema from the DB.
// We do this in a single function so the routing table and schema are always consistent
// And they both need the same info from the DB
func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) {
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
deployments = []dalmodel.Deployment{}
Expand All @@ -1771,8 +1777,15 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
defer tx.CommitOrRollback(ctx, &err)

old := s.routes.Load()
old := s.schemaState.Load().routes
newRoutes := map[string]Route{}
modulesByName := map[string]*schema.Module{}

builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert
modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins)
if err != nil {
return 0, fmt.Errorf("failed to convert builtins to schema: %w", err)
}
for _, v := range deployments {
deploymentLogger := s.getDeploymentLogger(ctx, v.Key)
deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String())
Expand Down Expand Up @@ -1811,54 +1824,17 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}
}
s.routes.Store(newRoutes)
return time.Second, nil
}

// Synchronises Service.schema from the database.
func (s *Service) syncSchema(ctx context.Context) {
logger := log.FromContext(ctx)
modulesByName := map[string]*schema.Module{}
retry := backoff.Backoff{Max: time.Second * 5}
for {
err := s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error {
switch response.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
moduleSchema, err := schema.ModuleFromProto(response.Schema)
if err != nil {
return err
}
modulesByName[moduleSchema.Name] = moduleSchema

case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(modulesByName, response.ModuleName)
}

orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schema.Store(ftlreflect.DeepCopy(combined))
return nil
})
if err != nil {
next := retry.Duration()
if ctx.Err() == nil {
// Don't log when the context is done
logger.Warnf("Failed to watch module changes, retrying in %s: %s", next, err)
}
select {
case <-time.After(next):
case <-ctx.Done():
return
}
} else {
retry.Reset()
}
}
orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes})
return time.Second, nil
}

func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
Expand Down Expand Up @@ -1927,6 +1903,11 @@ type Route struct {
Endpoint string
}

type schemaState struct {
schema *schema.Schema
routes map[string]Route
}

func (r Route) String() string {
return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint)
}
Loading

0 comments on commit 691700d

Please sign in to comment.