generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: simple in memory provisioning scheduler (#2815)
Rough domain model for provisioning deployments. - `Task` is a single executable task to bring the infra from one state to another - `Deployment` is a list of sequentially executable tasks - `Provisioner` is an interface for calling provisioners, wrapping calls to provisioner plugins This is a WIP, and there are a lot of `TODO`s around, but I wanted to get this out to keep PRs small. --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
93de931
commit 1297472
Showing
6 changed files
with
461 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
package deployment | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" | ||
) | ||
|
||
type TaskState string | ||
|
||
const ( | ||
TaskStatePending TaskState = "" | ||
TaskStateRunning TaskState = "running" | ||
TaskStateDone TaskState = "done" | ||
TaskStateFailed TaskState = "failed" | ||
) | ||
|
||
// Task is a unit of work for a deployment | ||
type Task struct { | ||
Handler Provisioner | ||
Module string | ||
State TaskState | ||
Desired []*provisioner.Resource | ||
Existing []*provisioner.Resource | ||
// populated only when the task is done | ||
Output []*provisioner.Resource | ||
|
||
// set if the task is currently running | ||
RunningToken string | ||
} | ||
|
||
func (t *Task) Start(ctx context.Context) error { | ||
if t.State != TaskStatePending { | ||
return fmt.Errorf("task state is not pending: %s", t.State) | ||
} | ||
t.State = TaskStateRunning | ||
token, err := t.Handler.Provision(ctx, t.Module, t.constructResourceContext(t.Desired), t.Existing) | ||
if err != nil { | ||
t.State = TaskStateFailed | ||
return fmt.Errorf("error provisioning resources: %w", err) | ||
} | ||
if token == "" { | ||
// no changes | ||
t.State = TaskStateDone | ||
t.Output = t.Desired | ||
} | ||
t.RunningToken = token | ||
return nil | ||
} | ||
|
||
func (t *Task) constructResourceContext(r []*provisioner.Resource) []*provisioner.ResourceContext { | ||
result := make([]*provisioner.ResourceContext, len(r)) | ||
for i, res := range r { | ||
result[i] = &provisioner.ResourceContext{ | ||
Resource: res, | ||
// TODO: Collect previously constructed resources from a dependency graph here | ||
} | ||
} | ||
return result | ||
} | ||
|
||
func (t *Task) Progress(ctx context.Context) error { | ||
if t.State != TaskStateRunning { | ||
return fmt.Errorf("task state is not running: %s", t.State) | ||
} | ||
state, output, err := t.Handler.State(ctx, t.RunningToken, t.Desired) | ||
if err != nil { | ||
return fmt.Errorf("error getting state: %w", err) | ||
} | ||
if state == TaskStateDone { | ||
t.State = TaskStateDone | ||
t.Output = output | ||
} | ||
return nil | ||
} | ||
|
||
// Deployment is a single deployment of resources for a single module | ||
type Deployment struct { | ||
Module string | ||
Tasks []*Task | ||
} | ||
|
||
// next running or pending task. Nil if all tasks are done. | ||
func (d *Deployment) next() *Task { | ||
for _, t := range d.Tasks { | ||
if t.State == TaskStatePending || t.State == TaskStateRunning || t.State == TaskStateFailed { | ||
return t | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Progress the deployment. Returns true if there are still tasks running or pending. | ||
func (d *Deployment) Progress(ctx context.Context) (bool, error) { | ||
next := d.next() | ||
if next == nil { | ||
return false, nil | ||
} | ||
|
||
if next.State == TaskStatePending { | ||
err := next.Start(ctx) | ||
if err != nil { | ||
return true, err | ||
} | ||
} | ||
err := next.Progress(ctx) | ||
return d.next() != nil, err | ||
} | ||
|
||
type DeploymentState struct { | ||
Pending []*Task | ||
Running *Task | ||
Failed *Task | ||
Done []*Task | ||
} | ||
|
||
func (d *Deployment) State() *DeploymentState { | ||
result := &DeploymentState{} | ||
for _, t := range d.Tasks { | ||
switch t.State { | ||
case TaskStatePending: | ||
result.Pending = append(result.Pending, t) | ||
case TaskStateRunning: | ||
result.Running = t | ||
case TaskStateFailed: | ||
result.Failed = t | ||
case TaskStateDone: | ||
result.Done = append(result.Done, t) | ||
} | ||
} | ||
return result | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package deployment_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" | ||
"github.com/TBD54566975/ftl/backend/provisioner/deployment" | ||
"github.com/alecthomas/assert/v2" | ||
) | ||
|
||
// MockProvisioner is a mock implementation of the Provisioner interface | ||
type MockProvisioner struct { | ||
Token string | ||
stateCalls int | ||
} | ||
|
||
var _ deployment.Provisioner = (*MockProvisioner)(nil) | ||
|
||
func (m *MockProvisioner) Provision( | ||
ctx context.Context, | ||
module string, | ||
desired []*provisioner.ResourceContext, | ||
existing []*provisioner.Resource, | ||
) (string, error) { | ||
return m.Token, nil | ||
} | ||
|
||
func (m *MockProvisioner) State( | ||
ctx context.Context, | ||
token string, | ||
desired []*provisioner.Resource, | ||
) (deployment.TaskState, []*provisioner.Resource, error) { | ||
m.stateCalls++ | ||
if m.stateCalls <= 1 { | ||
return deployment.TaskStateRunning, nil, nil | ||
} | ||
return deployment.TaskStateDone, desired, nil | ||
} | ||
|
||
func TestDeployment_Progress(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
t.Run("no tasks", func(t *testing.T) { | ||
deployment := &deployment.Deployment{} | ||
progress, err := deployment.Progress(ctx) | ||
assert.NoError(t, err) | ||
assert.False(t, progress) | ||
}) | ||
|
||
t.Run("progresses each provisioner in order", func(t *testing.T) { | ||
registry := deployment.ProvisionerRegistry{} | ||
registry.Register(&MockProvisioner{Token: "foo"}, deployment.ResourceTypePostgres) | ||
registry.Register(&MockProvisioner{Token: "bar"}, deployment.ResourceTypeMysql) | ||
|
||
dpl := registry.CreateDeployment( | ||
"test-module", | ||
[]*provisioner.Resource{{ | ||
ResourceId: "a", | ||
Resource: &provisioner.Resource_Mysql{}, | ||
}, { | ||
ResourceId: "b", | ||
Resource: &provisioner.Resource_Postgres{}, | ||
}}, | ||
[]*provisioner.Resource{}, | ||
) | ||
|
||
assert.Equal(t, 2, len(dpl.State().Pending)) | ||
|
||
_, err := dpl.Progress(ctx) | ||
assert.NoError(t, err) | ||
assert.Equal(t, 1, len(dpl.State().Pending)) | ||
assert.NotZero(t, dpl.State().Running) | ||
|
||
_, err = dpl.Progress(ctx) | ||
assert.NoError(t, err) | ||
assert.Equal(t, 1, len(dpl.State().Pending)) | ||
assert.Zero(t, dpl.State().Running) | ||
assert.Equal(t, 1, len(dpl.State().Done)) | ||
|
||
_, err = dpl.Progress(ctx) | ||
assert.NoError(t, err) | ||
assert.Equal(t, 0, len(dpl.State().Pending)) | ||
assert.NotZero(t, dpl.State().Running) | ||
assert.Equal(t, 1, len(dpl.State().Done)) | ||
|
||
running, err := dpl.Progress(ctx) | ||
assert.NoError(t, err) | ||
assert.Equal(t, 2, len(dpl.State().Done)) | ||
assert.False(t, running) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
package deployment | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"connectrpc.com/connect" | ||
|
||
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" | ||
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect" | ||
"github.com/TBD54566975/ftl/backend/schema" | ||
"github.com/TBD54566975/ftl/common/plugin" | ||
"github.com/TBD54566975/ftl/internal/log" | ||
) | ||
|
||
// ResourceType is a type of resource used to configure provisioners | ||
type ResourceType string | ||
|
||
const ( | ||
ResourceTypeUnknown ResourceType = "unknown" | ||
ResourceTypePostgres ResourceType = "postgres" | ||
ResourceTypeMysql ResourceType = "mysql" | ||
) | ||
|
||
// Provisioner is a runnable process to provision resources | ||
type Provisioner interface { | ||
Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) | ||
State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) | ||
} | ||
|
||
type provisionerConfig struct { | ||
provisioner Provisioner | ||
types []ResourceType | ||
} | ||
|
||
// ProvisionerRegistry contains all known resource handlers in the order they should be executed | ||
type ProvisionerRegistry struct { | ||
Provisioners []*provisionerConfig | ||
} | ||
|
||
// Register to the registry, to be executed after all the previously added handlers | ||
func (reg *ProvisionerRegistry) Register(handler Provisioner, types ...ResourceType) { | ||
reg.Provisioners = append(reg.Provisioners, &provisionerConfig{ | ||
provisioner: handler, | ||
types: types, | ||
}) | ||
} | ||
|
||
// CreateDeployment to take the system to the desired state | ||
func (reg *ProvisionerRegistry) CreateDeployment(module string, desiredResources, existingResources []*provisioner.Resource) *Deployment { | ||
var result []*Task | ||
|
||
existingByHandler := reg.groupByProvisioner(existingResources) | ||
desiredByHandler := reg.groupByProvisioner(desiredResources) | ||
|
||
for handler, desired := range desiredByHandler { | ||
existing := existingByHandler[handler] | ||
result = append(result, &Task{ | ||
Handler: handler, | ||
Desired: desired, | ||
Existing: existing, | ||
}) | ||
} | ||
return &Deployment{Tasks: result, Module: module} | ||
} | ||
|
||
// ExtractResources from a module schema | ||
func ExtractResources(sch *schema.Module) ([]*provisioner.Resource, error) { | ||
var result []*provisioner.Resource | ||
for _, decl := range sch.Decls { | ||
if db, ok := decl.(*schema.Database); ok { | ||
if db.Type == "postgres" { | ||
result = append(result, &provisioner.Resource{ | ||
ResourceId: decl.GetName(), | ||
Resource: &provisioner.Resource_Postgres{}, | ||
}) | ||
} else if db.Type == "mysql" { | ||
result = append(result, &provisioner.Resource{ | ||
ResourceId: decl.GetName(), | ||
Resource: &provisioner.Resource_Mysql{}, | ||
}) | ||
} else { | ||
return nil, fmt.Errorf("unknown db type: %s", db.Type) | ||
} | ||
} | ||
} | ||
return result, nil | ||
} | ||
|
||
func (reg *ProvisionerRegistry) groupByProvisioner(resources []*provisioner.Resource) map[Provisioner][]*provisioner.Resource { | ||
result := map[Provisioner][]*provisioner.Resource{} | ||
for _, r := range resources { | ||
for _, cfg := range reg.Provisioners { | ||
for _, t := range cfg.types { | ||
typed := typeOf(r) | ||
if t == typed { | ||
result[cfg.provisioner] = append(result[cfg.provisioner], r) | ||
break | ||
} | ||
} | ||
} | ||
} | ||
return result | ||
} | ||
|
||
func typeOf(r *provisioner.Resource) ResourceType { | ||
if _, ok := r.Resource.(*provisioner.Resource_Mysql); ok { | ||
return ResourceTypeMysql | ||
} else if _, ok := r.Resource.(*provisioner.Resource_Postgres); ok { | ||
return ResourceTypePostgres | ||
} | ||
return ResourceTypeUnknown | ||
} | ||
|
||
// PluginProvisioner delegates provisioning to an external plugin | ||
type PluginProvisioner struct { | ||
cmdCtx context.Context | ||
client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient] | ||
} | ||
|
||
func NewPluginProvisioner(ctx context.Context, name, dir, exe string) (*PluginProvisioner, error) { | ||
client, cmdCtx, err := plugin.Spawn( | ||
ctx, | ||
log.Debug, | ||
name, | ||
dir, | ||
exe, | ||
provisionerconnect.NewProvisionerPluginServiceClient, | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("error spawning plugin: %w", err) | ||
} | ||
|
||
return &PluginProvisioner{ | ||
cmdCtx: cmdCtx, | ||
client: client, | ||
}, nil | ||
} | ||
|
||
func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) { | ||
resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{ | ||
DesiredResources: desired, | ||
ExistingResources: existing, | ||
FtlClusterId: "ftl", | ||
Module: module, | ||
})) | ||
if err != nil { | ||
return "", fmt.Errorf("error calling plugin: %w", err) | ||
} | ||
if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED { | ||
return resp.Msg.ProvisioningToken, nil | ||
} | ||
return "", nil | ||
} | ||
|
||
func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) { | ||
resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{ | ||
ProvisioningToken: token, | ||
})) | ||
if err != nil { | ||
return "", nil, fmt.Errorf("error getting status from plugin: %w", err) | ||
} | ||
if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok { | ||
return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage) | ||
} else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok { | ||
return TaskStateDone, success.Success.UpdatedResources, nil | ||
} | ||
return TaskStateRunning, nil, nil | ||
} | ||
|
||
var _ Provisioner = (*PluginProvisioner)(nil) |
Oops, something went wrong.