Skip to content

Commit

Permalink
fix: GetActiveDeployments fix
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Sep 12, 2024
1 parent 2547351 commit 6d62bdf
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 21 deletions.
21 changes: 6 additions & 15 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,15 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
initialised = true
}

maybeDeployment, err := model.ParseDeploymentKey(msg.Deployment)
deploymentKey, err := model.ParseDeploymentKey(msg.Deployment)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
err = s.dal.UpsertRunner(ctx, dal.Runner{
Key: runnerKey,
Endpoint: msg.Endpoint,
State: dal.RunnerStateFromProto(msg.State),
Deployment: maybeDeployment,
Deployment: deploymentKey,
Labels: msg.Labels.AsMap(),
})
if errors.Is(err, libdal.ErrConflict) {
Expand All @@ -601,7 +601,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
}()
deferredDeregistration = true
}

routes, err := s.dal.GetRoutingTable(ctx, nil)
if errors.Is(err, libdal.ErrNotFound) {
routes = map[string][]dal.Route{}
Expand Down Expand Up @@ -687,21 +686,13 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

// Check if all required deployments are active.
modules, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return nil, err
}
//It's not actually ready until it is in the routes table

Check failure on line 689 in backend/controller/controller.go

View workflow job for this annotation

GitHub Actions / Lint

commentFormatting: put a space between `//` and comment text (gocritic)
routes := s.routes.Load()
var missing []string
nextModule:
for _, module := range s.config.WaitFor {
for _, m := range modules {
replicas, ok := m.Replicas.Get()
if ok && replicas > 0 && m.Module == module {
continue nextModule
}
if _, ok := routes[module]; !ok {
missing = append(missing, module)
}
missing = append(missing, module)
}
if len(missing) == 0 {
return connect.NewResponse(&ftlv1.PingResponse{}), nil
Expand Down
1 change: 0 additions & 1 deletion backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ type RunnerState string
// Runner states.
const (
RunnerStateNew = RunnerState(dalsql.RunnerStateNew)
RunnerStateReserved = RunnerState(dalsql.RunnerStateReserved)
RunnerStateAssigned = RunnerState(dalsql.RunnerStateAssigned)
RunnerStateDead = RunnerState(dalsql.RunnerStateDead)
)
Expand Down
1 change: 0 additions & 1 deletion backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func TestDAL(t *testing.T) {
Key: runnerID,
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateReserved,
Deployment: deploymentKey,
}

Expand Down
1 change: 1 addition & 0 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ ORDER BY r.key;
SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas
FROM deployments d
JOIN modules m ON d.module_id = m.id
LEFT JOIN runners r ON d.id = r.deployment_id
WHERE min_replicas > 0 AND r.state = 'assigned'
LEFT JOIN runners r ON d.id = r.deployment_id AND r.state = 'assigned'
WHERE min_replicas > 0
GROUP BY d.id, m.name, m.language;

-- name: GetDeploymentsWithMinReplicas :many
Expand Down Expand Up @@ -631,12 +631,20 @@ VALUES (
-- Results may not be ready to be scheduled yet due to event consumption delay
-- Sorting ensures that brand new events (that may not be ready for consumption)
-- don't prevent older events from being consumed
-- We also make sure that the subscription belongs to a deployment that has at least one runner
WITH runner_count AS (
SELECT count(r.deployment_id) as runner_count,
r.deployment_id as deployment
FROM runners r WHERE r.state = 'assigned'
GROUP BY deployment HAVING count(r.deployment_id) > 0
)
SELECT
subs.key::subscription_key as key,
curser.key as cursor,
topics.key::topic_key as topic,
subs.name
FROM topic_subscriptions subs
JOIN runner_count on subs.deployment_id = runner_count.deployment
LEFT JOIN topics ON subs.topic_id = topics.id
LEFT JOIN topic_events curser ON subs.cursor = curser.id
WHERE subs.cursor IS DISTINCT FROM topics.head
Expand Down
12 changes: 10 additions & 2 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6d62bdf

Please sign in to comment.