From 6d62bdff6f6e5561217cd2c44a5c8cc257d79e01 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 12 Sep 2024 12:36:16 +1000 Subject: [PATCH] fix: GetActiveDeployments fix --- backend/controller/controller.go | 21 ++++++------------- backend/controller/dal/dal.go | 1 - backend/controller/dal/dal_test.go | 1 - .../controller/dal/internal/sql/querier.go | 1 + .../controller/dal/internal/sql/queries.sql | 12 +++++++++-- .../dal/internal/sql/queries.sql.go | 12 +++++++++-- 6 files changed, 27 insertions(+), 21 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 0a5ae5ace6..e59c795dc4 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -575,7 +575,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre initialised = true } - maybeDeployment, err := model.ParseDeploymentKey(msg.Deployment) + deploymentKey, err := model.ParseDeploymentKey(msg.Deployment) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) } @@ -583,7 +583,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre Key: runnerKey, Endpoint: msg.Endpoint, State: dal.RunnerStateFromProto(msg.State), - Deployment: maybeDeployment, + Deployment: deploymentKey, Labels: msg.Labels.AsMap(), }) if errors.Is(err, libdal.ErrConflict) { @@ -601,7 +601,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre }() deferredDeregistration = true } - routes, err := s.dal.GetRoutingTable(ctx, nil) if errors.Is(err, libdal.ErrNotFound) { routes = map[string][]dal.Route{} @@ -687,21 +686,13 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque return connect.NewResponse(&ftlv1.PingResponse{}), nil } - // Check if all required deployments are active. - modules, err := s.dal.GetActiveDeployments(ctx) - if err != nil { - return nil, err - } + //It's not actually ready until it is in the routes table + routes := s.routes.Load() var missing []string -nextModule: for _, module := range s.config.WaitFor { - for _, m := range modules { - replicas, ok := m.Replicas.Get() - if ok && replicas > 0 && m.Module == module { - continue nextModule - } + if _, ok := routes[module]; !ok { + missing = append(missing, module) } - missing = append(missing, module) } if len(missing) == 0 { return connect.NewResponse(&ftlv1.PingResponse{}), nil diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 943de4d4ea..ca20a7c46f 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -114,7 +114,6 @@ type RunnerState string // Runner states. const ( RunnerStateNew = RunnerState(dalsql.RunnerStateNew) - RunnerStateReserved = RunnerState(dalsql.RunnerStateReserved) RunnerStateAssigned = RunnerState(dalsql.RunnerStateAssigned) RunnerStateDead = RunnerState(dalsql.RunnerStateDead) ) diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index a8b648e3c2..ffe5255a21 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -130,7 +130,6 @@ func TestDAL(t *testing.T) { Key: runnerID, Labels: labels, Endpoint: "http://localhost:8080", - State: RunnerStateReserved, Deployment: deploymentKey, } diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index b8b39d5f1e..eb0d5074ce 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -76,6 +76,7 @@ type Querier interface { // Results may not be ready to be scheduled yet due to event consumption delay // Sorting ensures that brand new events (that may not be ready for consumption) // don't prevent older events from being consumed + // We also make sure that the subscription belongs to a deployment that has at least one runner GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index f7da6b4c88..715f2d4961 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -123,8 +123,8 @@ ORDER BY r.key; SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas FROM deployments d JOIN modules m ON d.module_id = m.id - LEFT JOIN runners r ON d.id = r.deployment_id -WHERE min_replicas > 0 AND r.state = 'assigned' + LEFT JOIN runners r ON d.id = r.deployment_id AND r.state = 'assigned' +WHERE min_replicas > 0 GROUP BY d.id, m.name, m.language; -- name: GetDeploymentsWithMinReplicas :many @@ -631,12 +631,20 @@ VALUES ( -- Results may not be ready to be scheduled yet due to event consumption delay -- Sorting ensures that brand new events (that may not be ready for consumption) -- don't prevent older events from being consumed +-- We also make sure that the subscription belongs to a deployment that has at least one runner +WITH runner_count AS ( + SELECT count(r.deployment_id) as runner_count, + r.deployment_id as deployment + FROM runners r WHERE r.state = 'assigned' + GROUP BY deployment HAVING count(r.deployment_id) > 0 +) SELECT subs.key::subscription_key as key, curser.key as cursor, topics.key::topic_key as topic, subs.name FROM topic_subscriptions subs +JOIN runner_count on subs.deployment_id = runner_count.deployment LEFT JOIN topics ON subs.topic_id = topics.id LEFT JOIN topic_events curser ON subs.cursor = curser.id WHERE subs.cursor IS DISTINCT FROM topics.head diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index d13bbcfc63..becf8e13ab 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -557,8 +557,8 @@ const getActiveDeployments = `-- name: GetActiveDeployments :many SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, m.name AS module_name, m.language, COUNT(r.id) AS replicas FROM deployments d JOIN modules m ON d.module_id = m.id - LEFT JOIN runners r ON d.id = r.deployment_id -WHERE min_replicas > 0 AND r.state = 'assigned' + LEFT JOIN runners r ON d.id = r.deployment_id AND r.state = 'assigned' +WHERE min_replicas > 0 GROUP BY d.id, m.name, m.language ` @@ -1530,12 +1530,19 @@ func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 s } const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many +WITH runner_count AS ( + SELECT count(r.deployment_id) as runner_count, + r.deployment_id as deployment + FROM runners r WHERE r.state = 'assigned' + GROUP BY deployment HAVING count(r.deployment_id) > 0 +) SELECT subs.key::subscription_key as key, curser.key as cursor, topics.key::topic_key as topic, subs.name FROM topic_subscriptions subs +JOIN runner_count on subs.deployment_id = runner_count.deployment LEFT JOIN topics ON subs.topic_id = topics.id LEFT JOIN topic_events curser ON subs.cursor = curser.id WHERE subs.cursor IS DISTINCT FROM topics.head @@ -1555,6 +1562,7 @@ type GetSubscriptionsNeedingUpdateRow struct { // Results may not be ready to be scheduled yet due to event consumption delay // Sorting ensures that brand new events (that may not be ready for consumption) // don't prevent older events from being consumed +// We also make sure that the subscription belongs to a deployment that has at least one runner func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { rows, err := q.db.QueryContext(ctx, getSubscriptionsNeedingUpdate) if err != nil {