Skip to content

Commit

Permalink
feat: deploymentname as struct
Browse files Browse the repository at this point in the history
See also PR for renaming deployment name -> deployment key: [1080](#1080)

Fixes: #1072
  • Loading branch information
matt2e committed Mar 15, 2024
1 parent e564ab1 commit a9c069f
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 72 deletions.
60 changes: 33 additions & 27 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefa
func runnerFromDB(row sql.GetRunnerRow) Runner {
var deployment optional.Option[model.DeploymentName]
if name, ok := row.DeploymentName.Get(); ok {
deployment = optional.Some(model.DeploymentName(name))
parsed, err := model.ParseDeploymentName(name)
if err != nil {
return Runner{}
}
deployment = optional.Some(parsed)
}
attrs := model.Labels{}
if err := json.Unmarshal(row.Labels, &attrs); err != nil {
Expand Down Expand Up @@ -303,7 +307,11 @@ func (d *DAL) GetStatus(
domainRunners, err := slices.MapErr(runners, func(in sql.GetActiveRunnersRow) (Runner, error) {
var deployment optional.Option[model.DeploymentName]
if name, ok := in.DeploymentName.Get(); ok {
deployment = optional.Some(model.DeploymentName(name))
parsed, err := model.ParseDeploymentName(name)
if err != nil {
return Runner{}, fmt.Errorf("invalid deployment name %q: %w", name, err)
}
deployment = optional.Some(parsed)
}
attrs := model.Labels{}
if err := json.Unmarshal(in.Labels, &attrs); err != nil {
Expand Down Expand Up @@ -408,15 +416,16 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
// Start the transaction
tx, err := d.db.Begin(ctx)
if err != nil {
return "", fmt.Errorf("%s: %w", "could not start transaction", err)
return model.DeploymentName{}, fmt.Errorf("%s: %w", "could not start transaction", err)
}

defer tx.CommitOrRollback(ctx, &err)

existingDeployment, err := d.checkForExistingDeployments(ctx, tx, moduleSchema, artefacts)
var zero model.DeploymentName
if err != nil {
return "", err
} else if existingDeployment != "" {
return model.DeploymentName{}, err
} else if existingDeployment != zero {
logger.Tracef("Returning existing deployment %s", existingDeployment)
return existingDeployment, nil
}
Expand All @@ -427,30 +436,31 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem

schemaBytes, err := proto.Marshal(moduleSchema.ToProto())
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to marshal schema", err)
return model.DeploymentName{}, fmt.Errorf("%s: %w", "failed to marshal schema", err)
}

// TODO(aat): "schema" containing language?
_, err = tx.UpsertModule(ctx, language, moduleSchema.Name)
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to upsert module", translatePGError(err))
return model.DeploymentName{}, fmt.Errorf("%s: %w", "failed to upsert module", translatePGError(err))
}

deploymentName := model.NewDeploymentName(moduleSchema.Name)

// Create the deployment
err = tx.CreateDeployment(ctx, deploymentName, moduleSchema.Name, schemaBytes)
err = tx.CreateDeployment(ctx, moduleSchema.Name, schemaBytes, deploymentName)
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to create deployment", translatePGError(err))
return model.DeploymentName{}, fmt.Errorf("%s: %w", "failed to create deployment", translatePGError(err))
}

uploadedDigests := slices.Map(artefacts, func(in DeploymentArtefact) []byte { return in.Digest[:] })
artefactDigests, err := tx.GetArtefactDigests(ctx, uploadedDigests)
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to get artefact digests", err)
return model.DeploymentName{}, fmt.Errorf("%s: %w", "failed to get artefact digests", err)
}
if len(artefactDigests) != len(artefacts) {
missingDigests := strings.Join(slices.Map(artefacts, func(in DeploymentArtefact) string { return in.Digest.String() }), ", ")
return "", fmt.Errorf("missing %d artefacts: %s", len(artefacts)-len(artefactDigests), missingDigests)
return model.DeploymentName{}, fmt.Errorf("missing %d artefacts: %s", len(artefacts)-len(artefactDigests), missingDigests)
}

// Associate the artefacts with the deployment
Expand All @@ -463,7 +473,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
Path: artefact.Path,
})
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to associate artefact with deployment", translatePGError(err))
return model.DeploymentName{}, fmt.Errorf("%s: %w", "failed to associate artefact with deployment", translatePGError(err))
}
}

Expand All @@ -476,7 +486,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
Verb: ingressRoute.Verb,
})
if err != nil {
return "", fmt.Errorf("%s: %w", "failed to create ingress route", translatePGError(err))
return model.DeploymentName{}, fmt.Errorf("%s: %w", "failed to create ingress route", translatePGError(err))
}
}

Expand All @@ -496,10 +506,6 @@ func (d *DAL) GetDeployment(ctx context.Context, name model.DeploymentName) (*mo
// ErrConflict will be returned if a runner with the same endpoint and a
// different key already exists.
func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error {
var pgDeploymentName optional.Option[string]
if dkey, ok := runner.Deployment.Get(); ok {
pgDeploymentName = optional.Some(dkey.String())
}
attrBytes, err := json.Marshal(runner.Labels)
if err != nil {
return fmt.Errorf("%s: %w", "failed to JSON encode runner labels", err)
Expand All @@ -508,7 +514,7 @@ func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error {
Key: runner.Key,
Endpoint: runner.Endpoint,
State: sql.RunnerState(runner.State),
DeploymentName: pgDeploymentName,
DeploymentName: runner.Deployment,
Labels: attrBytes,
})
if err != nil {
Expand Down Expand Up @@ -609,7 +615,7 @@ func (p *postgresClaim) Rollback(ctx context.Context) error {
func (p *postgresClaim) Runner() Runner { return p.runner }

// SetDeploymentReplicas activates the given deployment.
func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentName, minReplicas int) error {
func (d *DAL) SetDeploymentReplicas(ctx context.Context, name model.DeploymentName, minReplicas int) error {
// Start the transaction
tx, err := d.db.Begin(ctx)
if err != nil {
Expand All @@ -618,18 +624,18 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentNam

defer tx.CommitOrRollback(ctx, &err)

deployment, err := d.db.GetDeployment(ctx, key)
deployment, err := d.db.GetDeployment(ctx, name)
if err != nil {
return translatePGError(err)
}

err = d.db.SetDeploymentDesiredReplicas(ctx, key, int32(minReplicas))
err = d.db.SetDeploymentDesiredReplicas(ctx, name, int32(minReplicas))
if err != nil {
return translatePGError(err)
}

err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{
DeploymentName: string(key),
DeploymentName: name,
MinReplicas: int32(minReplicas),
PrevMinReplicas: deployment.MinReplicas,
})
Expand Down Expand Up @@ -678,7 +684,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentName model.Dep
}

err = tx.InsertDeploymentCreatedEvent(ctx, sql.InsertDeploymentCreatedEventParams{
DeploymentName: newDeploymentName.String(),
DeploymentName: newDeploymentName,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
MinReplicas: int32(minReplicas),
Expand Down Expand Up @@ -955,7 +961,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
requestName = optional.Some(string(rn))
}
return translatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{
DeploymentName: call.DeploymentName.String(),
DeploymentName: call.DeploymentName,
RequestName: requestName,
TimeStamp: call.Time,
SourceModule: sourceModule,
Expand Down Expand Up @@ -984,20 +990,20 @@ func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error) {
func (*DAL) checkForExistingDeployments(ctx context.Context, tx *sql.Tx, moduleSchema *schema.Module, artefacts []DeploymentArtefact) (model.DeploymentName, error) {
schemaBytes, err := schema.ModuleToBytes(moduleSchema)
if err != nil {
return "", fmt.Errorf("failed to marshal schema: %w", err)
return model.DeploymentName{}, fmt.Errorf("failed to marshal schema: %w", err)
}
existing, err := tx.GetDeploymentsWithArtefacts(ctx,
sha256esToBytes(slices.Map(artefacts, func(in DeploymentArtefact) sha256.SHA256 { return in.Digest })),
schemaBytes,
int64(len(artefacts)),
)
if err != nil {
return "", fmt.Errorf("%s: %w", "couldn't check for existing deployment", err)
return model.DeploymentName{}, fmt.Errorf("%s: %w", "couldn't check for existing deployment", err)
}
if len(existing) > 0 {
return existing[0].DeploymentName, nil
}
return "", nil
return model.DeploymentName{}, nil
}

func sha256esToBytes(digests []sha256.SHA256) [][]byte {
Expand Down
4 changes: 3 additions & 1 deletion backend/controller/dal/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter
if err := rows.Scan(&id, &name); err != nil {
return nil, err
}
deploymentName, _ := model.ParseDeploymentName(name)

deploymentIDs = append(deploymentIDs, id)
deploymentNames[id] = model.DeploymentName(name)
deploymentNames[id] = deploymentName
}

q += fmt.Sprintf(` AND e.deployment_id = ANY($%d::BIGINT[])`, param(deploymentIDs))
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ FROM modules
WHERE id = ANY (@ids::BIGINT[]);

-- name: CreateDeployment :exec
INSERT INTO deployments (module_id, "schema", name)
VALUES ((SELECT id FROM modules WHERE name = @module_name::TEXT LIMIT 1), @schema::BYTEA, $1);
INSERT INTO deployments (module_id, "schema", "name")
VALUES ((SELECT id FROM modules WHERE name = @module_name::TEXT LIMIT 1), @schema::BYTEA, sqlc.arg('name'));

-- name: GetArtefactDigests :many
-- Return the digests that exist in the database.
Expand Down Expand Up @@ -86,11 +86,11 @@ WITH deployment_rel AS (
-- there is no corresponding deployment, then the deployment ID is -1
-- and the parent statement will fail due to a foreign key constraint.
SELECT CASE
WHEN sqlc.narg('deployment_name')::TEXT IS NULL
WHEN sqlc.narg('deployment_name')::deployment_name IS NULL
THEN NULL
ELSE COALESCE((SELECT id
FROM deployments d
WHERE d.name = sqlc.narg('deployment_name')
WHERE d.name = sqlc.narg('deployment_name')::deployment_name
LIMIT 1), -1) END AS id)
INSERT
INTO runners (key, endpoint, state, labels, deployment_id, last_seen)
Expand Down Expand Up @@ -210,7 +210,7 @@ SET state = 'reserved',
-- and the update will fail due to a FK constraint.
deployment_id = COALESCE((SELECT id
FROM deployments d
WHERE d.name = sqlc.arg('deployment_name')
WHERE d.name = sqlc.arg('deployment_name')::deployment_name
LIMIT 1), -1)
WHERE id = (SELECT id
FROM runners r
Expand Down Expand Up @@ -274,7 +274,7 @@ FROM rows;

-- name: InsertLogEvent :exec
INSERT INTO events (deployment_id, request_id, time_stamp, custom_key_1, type, payload)
VALUES ((SELECT id FROM deployments d WHERE d.name = sqlc.arg('deployment_name') LIMIT 1),
VALUES ((SELECT id FROM deployments d WHERE d.name = sqlc.arg('deployment_name')::deployment_name LIMIT 1),
(CASE
WHEN sqlc.narg('request_name')::TEXT IS NULL THEN NULL
ELSE (SELECT id FROM requests ir WHERE ir.name = sqlc.narg('request_name')::TEXT LIMIT 1)
Expand All @@ -293,7 +293,7 @@ VALUES ((SELECT id FROM deployments d WHERE d.name = sqlc.arg('deployment_name')
INSERT INTO events (deployment_id, type, custom_key_1, custom_key_2, payload)
VALUES ((SELECT id
FROM deployments
WHERE deployments.name = sqlc.arg('deployment_name')::TEXT),
WHERE deployments.name = sqlc.arg('deployment_name')::deployment_name),
'deployment_created',
sqlc.arg('language')::TEXT,
sqlc.arg('module_name')::TEXT,
Expand All @@ -306,7 +306,7 @@ VALUES ((SELECT id
INSERT INTO events (deployment_id, type, custom_key_1, custom_key_2, payload)
VALUES ((SELECT id
FROM deployments
WHERE deployments.name = sqlc.arg('deployment_name')::TEXT),
WHERE deployments.name = sqlc.arg('deployment_name')::deployment_name),
'deployment_updated',
sqlc.arg('language')::TEXT,
sqlc.arg('module_name')::TEXT,
Expand All @@ -318,7 +318,7 @@ VALUES ((SELECT id
-- name: InsertCallEvent :exec
INSERT INTO events (deployment_id, request_id, time_stamp, type,
custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload)
VALUES ((SELECT id FROM deployments WHERE deployments.name = sqlc.arg('deployment_name')::TEXT),
VALUES ((SELECT id FROM deployments WHERE deployments.name = sqlc.arg('deployment_name')::deployment_name),
(CASE
WHEN sqlc.narg('request_name')::TEXT IS NULL THEN NULL
ELSE (SELECT id FROM requests ir WHERE ir.name = sqlc.narg('request_name')::TEXT)
Expand Down
30 changes: 15 additions & 15 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a9c069f

Please sign in to comment.