diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 0a5ae5ace6..96bec9e845 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -575,7 +575,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre initialised = true } - maybeDeployment, err := model.ParseDeploymentKey(msg.Deployment) + deploymentKey, err := model.ParseDeploymentKey(msg.Deployment) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) } @@ -583,7 +583,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre Key: runnerKey, Endpoint: msg.Endpoint, State: dal.RunnerStateFromProto(msg.State), - Deployment: maybeDeployment, + Deployment: deploymentKey, Labels: msg.Labels.AsMap(), }) if errors.Is(err, libdal.ErrConflict) { @@ -601,7 +601,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre }() deferredDeregistration = true } - routes, err := s.dal.GetRoutingTable(ctx, nil) if errors.Is(err, libdal.ErrNotFound) { routes = map[string][]dal.Route{} @@ -687,21 +686,13 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque return connect.NewResponse(&ftlv1.PingResponse{}), nil } - // Check if all required deployments are active. - modules, err := s.dal.GetActiveDeployments(ctx) - if err != nil { - return nil, err - } + // It's not actually ready until it is in the routes table + routes := s.routes.Load() var missing []string -nextModule: for _, module := range s.config.WaitFor { - for _, m := range modules { - replicas, ok := m.Replicas.Get() - if ok && replicas > 0 && m.Module == module { - continue nextModule - } + if _, ok := routes[module]; !ok { + missing = append(missing, module) } - missing = append(missing, module) } if len(missing) == 0 { return connect.NewResponse(&ftlv1.PingResponse{}), nil diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 943de4d4ea..ca20a7c46f 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -114,7 +114,6 @@ type RunnerState string // Runner states. const ( RunnerStateNew = RunnerState(dalsql.RunnerStateNew) - RunnerStateReserved = RunnerState(dalsql.RunnerStateReserved) RunnerStateAssigned = RunnerState(dalsql.RunnerStateAssigned) RunnerStateDead = RunnerState(dalsql.RunnerStateDead) ) diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index a8b648e3c2..ffe5255a21 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -130,7 +130,6 @@ func TestDAL(t *testing.T) { Key: runnerID, Labels: labels, Endpoint: "http://localhost:8080", - State: RunnerStateReserved, Deployment: deploymentKey, } diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index b8b39d5f1e..eb0d5074ce 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -76,6 +76,7 @@ type Querier interface { // Results may not be ready to be scheduled yet due to event consumption delay // Sorting ensures that brand new events (that may not be ready for consumption) // don't prevent older events from being consumed + // We also make sure that the subscription belongs to a deployment that has at least one runner GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index f7da6b4c88..715f2d4961 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -123,8 +123,8 @@ ORDER BY r.key; SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas FROM deployments d JOIN modules m ON d.module_id = m.id - LEFT JOIN runners r ON d.id = r.deployment_id -WHERE min_replicas > 0 AND r.state = 'assigned' + LEFT JOIN runners r ON d.id = r.deployment_id AND r.state = 'assigned' +WHERE min_replicas > 0 GROUP BY d.id, m.name, m.language; -- name: GetDeploymentsWithMinReplicas :many @@ -631,12 +631,20 @@ VALUES ( -- Results may not be ready to be scheduled yet due to event consumption delay -- Sorting ensures that brand new events (that may not be ready for consumption) -- don't prevent older events from being consumed +-- We also make sure that the subscription belongs to a deployment that has at least one runner +WITH runner_count AS ( + SELECT count(r.deployment_id) as runner_count, + r.deployment_id as deployment + FROM runners r WHERE r.state = 'assigned' + GROUP BY deployment HAVING count(r.deployment_id) > 0 +) SELECT subs.key::subscription_key as key, curser.key as cursor, topics.key::topic_key as topic, subs.name FROM topic_subscriptions subs +JOIN runner_count on subs.deployment_id = runner_count.deployment LEFT JOIN topics ON subs.topic_id = topics.id LEFT JOIN topic_events curser ON subs.cursor = curser.id WHERE subs.cursor IS DISTINCT FROM topics.head diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index d13bbcfc63..becf8e13ab 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -557,8 +557,8 @@ const getActiveDeployments = `-- name: GetActiveDeployments :many SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, m.name AS module_name, m.language, COUNT(r.id) AS replicas FROM deployments d JOIN modules m ON d.module_id = m.id - LEFT JOIN runners r ON d.id = r.deployment_id -WHERE min_replicas > 0 AND r.state = 'assigned' + LEFT JOIN runners r ON d.id = r.deployment_id AND r.state = 'assigned' +WHERE min_replicas > 0 GROUP BY d.id, m.name, m.language ` @@ -1530,12 +1530,19 @@ func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 s } const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many +WITH runner_count AS ( + SELECT count(r.deployment_id) as runner_count, + r.deployment_id as deployment + FROM runners r WHERE r.state = 'assigned' + GROUP BY deployment HAVING count(r.deployment_id) > 0 +) SELECT subs.key::subscription_key as key, curser.key as cursor, topics.key::topic_key as topic, subs.name FROM topic_subscriptions subs +JOIN runner_count on subs.deployment_id = runner_count.deployment LEFT JOIN topics ON subs.topic_id = topics.id LEFT JOIN topic_events curser ON subs.cursor = curser.id WHERE subs.cursor IS DISTINCT FROM topics.head @@ -1555,6 +1562,7 @@ type GetSubscriptionsNeedingUpdateRow struct { // Results may not be ready to be scheduled yet due to event consumption delay // Sorting ensures that brand new events (that may not be ready for consumption) // don't prevent older events from being consumed +// We also make sure that the subscription belongs to a deployment that has at least one runner func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { rows, err := q.db.QueryContext(ctx, getSubscriptionsNeedingUpdate) if err != nil { diff --git a/backend/controller/leases/lease_integration_test.go b/backend/controller/leases/lease_integration_test.go index 6dacd74ffd..bb7ddb23c2 100644 --- a/backend/controller/leases/lease_integration_test.go +++ b/backend/controller/leases/lease_integration_test.go @@ -35,6 +35,9 @@ func TestLease(t *testing.T) { Verb: &schemapb.Ref{Module: "leases", Name: "acquire"}, Body: []byte("{}"), })) + if err != nil { + return err + } assert.NoError(t, err) if respErr := resp.Msg.GetError(); respErr != nil { return fmt.Errorf("received error on first call: %v", respErr) diff --git a/internal/buildengine/deploy.go b/internal/buildengine/deploy.go index 1955cc6a4a..60e1cbbbd0 100644 --- a/internal/buildengine/deploy.go +++ b/internal/buildengine/deploy.go @@ -96,7 +96,7 @@ func Deploy(ctx context.Context, module Module, replicas int32, waitForDeployOnl if waitForDeployOnline { logger.Debugf("Waiting for deployment %s to become ready", resp.Msg.DeploymentKey) - err = checkReadiness(ctx, client, resp.Msg.DeploymentKey, replicas) + err = checkReadiness(ctx, client, resp.Msg.DeploymentKey, replicas, moduleSchema) if err != nil { return err } @@ -235,10 +235,17 @@ func relToCWD(path string) string { return rel } -func checkReadiness(ctx context.Context, client DeployClient, deploymentKey string, replicas int32) error { - ticker := time.NewTicker(time.Second) +func checkReadiness(ctx context.Context, client DeployClient, deploymentKey string, replicas int32, schema *schemapb.Module) error { + ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() + hasVerbs := false + for _, dec := range schema.Decls { + if dec.GetVerb() != nil { + hasVerbs = true + break + } + } for { select { case <-ticker.C: @@ -250,7 +257,17 @@ func checkReadiness(ctx context.Context, client DeployClient, deploymentKey stri for _, deployment := range status.Msg.Deployments { if deployment.Key == deploymentKey { if deployment.Replicas >= replicas { - return nil + if hasVerbs { + // Also verify the routing table is ready + for _, route := range status.Msg.Routes { + if route.Deployment == deploymentKey { + return nil + } + } + + } else { + return nil + } } } }