Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: new client for each runner after 5 minutes #2134

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@
var _ ftlv1connect.VerbServiceHandler = (*Service)(nil)

type clients struct {
verb ftlv1connect.VerbServiceClient
runner ftlv1connect.RunnerServiceClient
verb ftlv1connect.VerbServiceClient
runner ftlv1connect.RunnerServiceClient
created_at time.Time

Check warning on line 189 in backend/controller/controller.go

View workflow job for this annotation

GitHub Actions / Lint

var-naming: don't use underscores in Go names; struct field created_at should be createdAt (revive)
}

// ControllerListListener is regularly notified of the current list of controllers
Expand Down Expand Up @@ -978,7 +979,7 @@
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
}
route := routes[rand.Intn(len(routes))] //nolint:gosec
client := s.clientsForRunner(route.Runner, route.Endpoint)
client := s.clientsForRunner(ctx, route.Runner, route.Endpoint)

callers, err := headers.GetCallers(req.Header())
if err != nil {
Expand Down Expand Up @@ -1150,14 +1151,16 @@
}

// Return or create the RunnerService and VerbService clients for a Runner.
func (s *Service) clientsForRunner(key model.RunnerKey, endpoint string) clients {
func (s *Service) clientsForRunner(ctx context.Context, key model.RunnerKey, endpoint string) clients {
clientItem := s.clients.Get(key.String())
if clientItem != nil {
if clientItem != nil && clientItem.Value().created_at.Add(time.Minute*5).After(time.Now()) {
return clientItem.Value()
}
log.FromContext(ctx).Tracef("Creating new grpc clients for %s at %s", key, endpoint)
client := clients{
runner: rpc.Dial(ftlv1connect.NewRunnerServiceClient, endpoint, log.Error),
verb: rpc.Dial(ftlv1connect.NewVerbServiceClient, endpoint, log.Error),
runner: rpc.Dial(ftlv1connect.NewRunnerServiceClient, endpoint, log.Error),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually create a new client in terms of the underlying connections? rcp.Dial seems to use a static HTTP client, and AFAIK go clients manage their own connection pools, so I think we may have an implicit connection cache.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The [Client.Transport] typically has internal state (cached TCP
// connections), so Clients should be reused instead of created as
// needed.

It seems so yes. But it also seems that having one http.Client is correct usage 🤔

verb: rpc.Dial(ftlv1connect.NewVerbServiceClient, endpoint, log.Error),
created_at: time.Now(),
}
s.clients.Set(key.String(), client, time.Minute)
return client
Expand Down Expand Up @@ -1452,7 +1455,7 @@
return false, nil
}
runner := runners[rand.Intn(len(runners))] //nolint:gosec
client := s.clientsForRunner(runner.Key, runner.Endpoint)
client := s.clientsForRunner(ctx, runner.Key, runner.Endpoint)
resp, err := client.runner.Terminate(ctx, connect.NewRequest(&ftlv1.TerminateRequest{DeploymentKey: key.String()}))
if err != nil {
return false, err
Expand Down Expand Up @@ -1493,7 +1496,7 @@

err = dal.WithReservation(reservationCtx, claim, func() error {
runner := claim.Runner()
client = s.clientsForRunner(runner.Key, runner.Endpoint)
client = s.clientsForRunner(ctx, runner.Key, runner.Endpoint)
_, err = client.runner.Reserve(reservationCtx, connect.NewRequest(&ftlv1.ReserveRequest{DeploymentKey: reconcile.Key.String()}))
if err != nil {
return fmt.Errorf("failed request to reserve a runner for %s at %s: %w", reconcile.Key, claim.Runner().Endpoint, err)
Expand Down
Loading