Skip to content

Commit

Permalink
feat: rolling deployments (#2705)
Browse files Browse the repository at this point in the history
fixes #1687
  • Loading branch information
stuartwdouglas authored Sep 18, 2024
1 parent 2e719cf commit 97ba901
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 40 deletions.
30 changes: 27 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,32 +1729,56 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
}

// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) (time.Duration, error) {
func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) {
logger := log.FromContext(ctx)
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
deployments = []dal.Deployment{}
} else if err != nil {
return 0, err
}
tx, err := s.dal.Begin(ctx)
if err != nil {
return 0, fmt.Errorf("failed to start transaction %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

old := s.routes.Load()
newRoutes := map[string]Route{}
for _, v := range deployments {
logger.Tracef("processing deployment %s for route table", v.Key.String())
// Deployments are in order, oldest to newest
// If we see a newer one overwrite an old one that means the new one is read
// And we set its replicas to zero
// It may seem a bit odd to do this here but this is where we are actually updating the routing table
// Which is what makes as a deployment 'live' from a clients POV
optURI, err := s.runnerScaling.GetEndpointForDeployment(ctx, v.Module, v.Key.String())
if err != nil {
logger.Errorf(err, "Failed to get updated endpoint for deployment %s", v.Key.String())
continue
} else if uri, ok := optURI.Get(); ok {
// Check if this is a new route
targetEndpoint := uri.String()
if _, ok := old[v.Module]; !ok {
if oldRoute, oldRouteExists := old[v.Module]; !oldRouteExists || oldRoute.Deployment.String() != v.Key.String() {
// If it is a new route we only add it if we can ping it
// Kube deployments can take a while to come up, so we don't want to add them to the routing table until they are ready.
_, err := s.clientsForEndpoint(targetEndpoint).verb.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err != nil {
logger.Warnf("Unable to ping %s, not adding to route table", v.Key.String())
logger.Tracef("Unable to ping %s, not adding to route table", v.Key.String())
continue
}
logger.Debugf("Adding %s to route table", v.Key.String())
}
if prev, ok := newRoutes[v.Module]; ok {
// We have already seen a route for this module, the existing route must be an old one
// as the deployments are in order
// We have a new route ready to go, so we can just set the old one to 0 replicas
// Do this in a TX so it doesn't happen until the route table is updated
logger.Debugf("Setting %s to zero replicas", v.Key.String())
err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0)
if err != nil {
logger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
}
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
}
Expand Down
22 changes: 13 additions & 9 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
sets "github.com/deckarep/golang-set/v2"
xmaps "golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql"
Expand Down Expand Up @@ -597,10 +598,6 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
if oldDeployment.Key.String() == newDeploymentKey.String() {
return fmt.Errorf("replace deployment failed: deployment already exists from %v to %v: %w", oldDeployment.Key, newDeploymentKey, ErrReplaceDeploymentAlreadyActive)
}
err = tx.db.SetDeploymentDesiredReplicas(ctx, oldDeployment.Key, 0)
if err != nil {
return fmt.Errorf("replace deployment failed to set old deployment replicas from %v to %v: %w", oldDeployment.Key, newDeploymentKey, libdal.TranslatePGError(err))
}
err = tx.db.SetDeploymentDesiredReplicas(ctx, newDeploymentKey, int32(minReplicas))
if err != nil {
return fmt.Errorf("replace deployment failed to set new deployment replicas from %v to %v: %w", oldDeployment.Key, newDeploymentKey, libdal.TranslatePGError(err))
Expand Down Expand Up @@ -694,11 +691,18 @@ func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
if err != nil {
return nil, err
}
sch, err := schema.ValidateSchema(&schema.Schema{
Modules: slices.Map(deployments, func(d Deployment) *schema.Module {
return d.Schema
}),
})

schemaMap := map[string]*schema.Module{}
for _, dep := range deployments {
if _, ok := schemaMap[dep.Module]; !ok {
// We only take the older ones
// If new ones exist they are not live yet
// Or the old ones would be gone
schemaMap[dep.Module] = dep.Schema
}
}
fullSchema := &schema.Schema{Modules: xmaps.Values(schemaMap)}
sch, err := schema.ValidateSchema(fullSchema)
if err != nil {
return nil, fmt.Errorf("could not validate schema: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ FROM deployments d
JOIN modules m ON d.module_id = m.id
LEFT JOIN runners r ON d.id = r.deployment_id
WHERE min_replicas > 0
GROUP BY d.id, m.name, m.language;
GROUP BY d.id, m.name, m.language
ORDER BY d.created_at;

-- name: GetDeploymentsWithMinReplicas :many
SELECT sqlc.embed(d), m.name AS module_name, m.language
FROM deployments d
INNER JOIN modules m on d.module_id = m.id
WHERE min_replicas > 0
ORDER BY d.key;
ORDER BY d.created_at,d.key;

-- name: GetActiveDeploymentSchemas :many
SELECT key, schema FROM deployments WHERE min_replicas > 0;
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions backend/controller/dal/notify.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dal

import (
"bytes"
"context"
"crypto/sha256"
"encoding"
Expand Down Expand Up @@ -72,7 +71,7 @@ func (d *DAL) PollDeployments(ctx context.Context) {
deployments, err := d.GetDeploymentsWithMinReplicas(ctx)
if err != nil {
if ctx.Err() == context.Canceled {
logger.Debugf("Polling stopped: %v", ctx.Err())
logger.Tracef("Polling stopped: %v", ctx.Err())
return
}
logger.Errorf(err, "failed to get deployments when polling")
Expand All @@ -88,16 +87,16 @@ func (d *DAL) PollDeployments(ctx context.Context) {
logger.Errorf(err, "failed to compute deployment state")
continue
}
deploymentName := deployment.Key.String()
currentDeployments[deploymentName] = state

currentDeployments[name] = state

previousState, exists := previousDeployments[name]
previousState, exists := previousDeployments[deploymentName]
if !exists {
logger.Tracef("New deployment: %s", name)
d.DeploymentChanges.Publish(DeploymentNotification{
Message: optional.Some(deployment),
})
} else if !bytes.Equal(previousState.schemaHash, state.schemaHash) || previousState.minReplicas != state.minReplicas || !bytes.Equal(previousState.Key.Suffix, state.Key.Suffix) {
} else if previousState.minReplicas != state.minReplicas {
logger.Tracef("Changed deployment: %s", name)
d.DeploymentChanges.Publish(DeploymentNotification{
Message: optional.Some(deployment),
Expand Down
21 changes: 8 additions & 13 deletions backend/controller/scaling/k8sscaling/deployment_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ type DeploymentProvisioner struct {
Client *kubernetes.Clientset
MyDeploymentName string
Namespace string
// Map of modules to known deployments
// This needs to be re-done when we have proper rolling deployments
KnownModules map[string]string
FTLEndpoint string
// Map of known deployments
KnownDeployments map[string]bool
FTLEndpoint string
}

func (r *DeploymentProvisioner) updateDeployment(ctx context.Context, name string, mod func(deployment *kubeapps.Deployment)) error {
Expand Down Expand Up @@ -122,18 +121,18 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
// Note that a change is now currently usually and add and a delete
// As it should really be called a module changed, not a deployment changed
// This will need to be fixed as part of the support for rolling deployments
r.KnownModules[msg.ModuleName] = msg.DeploymentKey
r.KnownDeployments[msg.DeploymentKey] = true
if deploymentExists {
logger.Infof("updating deployment %s", msg.DeploymentKey)
return r.handleExistingDeployment(ctx, deployment, msg.Schema)
} else {
return r.handleNewDeployment(ctx, msg.Schema, msg.DeploymentKey)
}
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(r.KnownModules, msg.ModuleName)
delete(r.KnownDeployments, msg.DeploymentKey)
if deploymentExists {
logger.Infof("deleting deployment %s", msg.ModuleName)
err := deploymentClient.Delete(ctx, msg.ModuleName, v1.DeleteOptions{})
err := deploymentClient.Delete(ctx, msg.DeploymentKey, v1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete deployment %s: %w", msg.ModuleName, err)
}
Expand Down Expand Up @@ -357,14 +356,10 @@ func (r *DeploymentProvisioner) deleteMissingDeployments(ctx context.Context) {
logger.Errorf(err, "failed to list deployments")
return
}
knownDeployments := map[string]bool{}
for _, deployment := range r.KnownModules {
knownDeployments[deployment] = true
}

for _, deployment := range list.Items {
if !knownDeployments[deployment.Name] {
logger.Infof("deleting deployment %s", deployment.Name)
if !r.KnownDeployments[deployment.Name] {
logger.Infof("deleting deployment %s as it is not a known module", deployment.Name)
err := deploymentClient.Delete(ctx, deployment.Name, v1.DeleteOptions{})
if err != nil {
logger.Errorf(err, "failed to delete deployment %s", deployment.Name)
Expand Down
8 changes: 4 additions & 4 deletions backend/controller/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func (k k8sScaling) Start(ctx context.Context, controller url.URL, leaser leases
}
logger.Infof("using namespace %s", namespace)
deploymentReconciler := &DeploymentProvisioner{
Client: clientset,
Namespace: namespace,
KnownModules: map[string]string{},
FTLEndpoint: controller.String(),
Client: clientset,
Namespace: namespace,
KnownDeployments: map[string]bool{},
FTLEndpoint: controller.String(),
}
scaling.BeginGrpcScaling(ctx, controller, leaser, deploymentReconciler.HandleSchemaChange)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@ package k8sscaling_test

import (
"context"
"fmt"
"strings"
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/atomic"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

in "github.com/TBD54566975/ftl/internal/integration"
)

func TestKubeScaling(t *testing.T) {
failure := atomic.Value[error]{}
done := atomic.Value[bool]{}
done.Store(false)
in.Run(t,
in.WithKubernetes(),
in.CopyModule("echo"),
Expand All @@ -25,10 +30,28 @@ func TestKubeScaling(t *testing.T) {
in.EditFile("echo", func(content []byte) []byte {
return []byte(strings.ReplaceAll(string(content), "Hello", "Bye"))
}, "echo.go"),
func(t testing.TB, ic in.TestContext) {
// Hit the verb constantly to test rolling updates.
go func() {
for !done.Load() {
in.Call("echo", "echo", "Bob", func(t testing.TB, response string) {
if !strings.Contains(response, "Bob") {
failure.Store(fmt.Errorf("unexpected response: %s", response))
return
}
})(t, ic)
}
}()
},
in.Deploy("echo"),
in.Call("echo", "echo", "Bob", func(t testing.TB, response string) {
assert.Equal(t, "Bye, Bob!!!", response)
}),
func(t testing.TB, ic in.TestContext) {
done.Store(true)
err := failure.Load()
assert.NoError(t, err)
},
in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset) {
deps, err := client.AppsV1().Deployments(namespace).List(ctx, v1.ListOptions{})
assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- migrate:up
DROP INDEX deployments_unique_idx;
CREATE INDEX deployments_active_idx ON public.deployments USING btree (module_id) WHERE (min_replicas > 0);
-- migrate:down

1 change: 1 addition & 0 deletions deployment/base/db-migrate/kustomization.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ configMapGenerator:
- ./schema/20240913041619_encrypted_topic_events_payload.sql
- ./schema/20240916015906_remove_runner_state.sql
- ./schema/20240916190209_rename_controller_to_controllers.sql
- ./schema/20240917062716_change_deployments_index.sql
8 changes: 6 additions & 2 deletions internal/buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,11 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
}
}
case change := <-schemaChanges:
if change.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED {
if change.ChangeType == ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED {
continue
}
existingHash, ok := moduleHashes[change.Name]
if !ok {
continue
}

Expand All @@ -460,7 +464,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
continue
}

if bytes.Equal(hash, moduleHashes[change.Name]) {
if bytes.Equal(hash, existingHash) {
logger.Tracef("schema for %s has not changed", change.Name)
continue
}
Expand Down

0 comments on commit 97ba901

Please sign in to comment.