Skip to content

Commit

Permalink
chore(db): transparently transcode module schema to/from proto bytes (#…
Browse files Browse the repository at this point in the history
…863)

Just a nice little QoL change. I'll be needing to store schema Types
soon for config, and will use the same pattern there.
  • Loading branch information
alecthomas authored Feb 1, 2024
1 parent 485e256 commit 3b36e78
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 55 deletions.
44 changes: 4 additions & 40 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/schema"
)

var (
Expand Down Expand Up @@ -283,14 +282,6 @@ func (d *DAL) GetStatus(
return Status{}, fmt.Errorf("%s: %w", "could not get routing table", translatePGError(err))
}
statusDeployments, err := slices.MapErr(deployments, func(in sql.GetActiveDeploymentsRow) (Deployment, error) {
protoSchema := &schemapb.Module{}
if err := proto.Unmarshal(in.Deployment.Schema, protoSchema); err != nil {
return Deployment{}, fmt.Errorf("%q: could not unmarshal schema: %w", in.ModuleName, err)
}
modelSchema, err := schema.ModuleFromProto(protoSchema)
if err != nil {
return Deployment{}, fmt.Errorf("%q: invalid schema in database: %w", in.ModuleName, err)
}
labels := model.Labels{}
err = json.Unmarshal(in.Deployment.Labels, &labels)
if err != nil {
Expand All @@ -301,7 +292,7 @@ func (d *DAL) GetStatus(
Module: in.ModuleName,
Language: in.Language,
MinReplicas: int(in.Deployment.MinReplicas),
Schema: modelSchema,
Schema: in.Deployment.Schema,
Labels: labels,
}, nil
})
Expand Down Expand Up @@ -730,20 +721,12 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error) {
return nil, translatePGError(err)
}
return slices.MapErr(rows, func(in sql.GetActiveDeploymentsRow) (Deployment, error) {
protoSchema := &schemapb.Module{}
if err := proto.Unmarshal(in.Deployment.Schema, protoSchema); err != nil {
return Deployment{}, fmt.Errorf("%q: could not unmarshal schema: %w", in.ModuleName, err)
}
modelSchema, err := schema.ModuleFromProto(protoSchema)
if err != nil {
return Deployment{}, fmt.Errorf("%q: invalid schema in database: %w", in.ModuleName, err)
}
return Deployment{
Name: in.Deployment.Name,
Module: in.ModuleName,
Language: in.Language,
MinReplicas: int(in.Deployment.MinReplicas),
Schema: modelSchema,
Schema: in.Deployment.Schema,
CreatedAt: in.Deployment.CreatedAt,
}, nil
})
Expand All @@ -754,17 +737,7 @@ func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module,
if err != nil {
return nil, fmt.Errorf("%s: %w", "could not get active deployments", translatePGError(err))
}
return slices.MapErr(rows, func(in sql.GetActiveDeploymentSchemasRow) (*schema.Module, error) {
protoSchema := &schemapb.Module{}
if err := proto.Unmarshal(in.Schema, protoSchema); err != nil {
return nil, fmt.Errorf("%q: could not unmarshal schema: %w", in.Name, err)
}
modelSchema, err := schema.ModuleFromProto(protoSchema)
if err != nil {
return nil, fmt.Errorf("%q: invalid schema in database: %w", in.Name, err)
}
return modelSchema, nil
})
return slices.MapErr(rows, func(in sql.GetActiveDeploymentSchemasRow) (*schema.Module, error) { return in.Schema, nil })
}

type ProcessRunner struct {
Expand Down Expand Up @@ -917,20 +890,11 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
}

func (d *DAL) loadDeployment(ctx context.Context, deployment sql.GetDeploymentRow) (*model.Deployment, error) {
pm := &schemapb.Module{}
err := proto.Unmarshal(deployment.Deployment.Schema, pm)
if err != nil {
return nil, err
}
module, err := schema.ModuleFromProto(pm)
if err != nil {
return nil, err
}
out := &model.Deployment{
Module: deployment.ModuleName,
Language: deployment.Language,
Name: deployment.Deployment.Name,
Schema: module,
Schema: deployment.Deployment.Schema,
}
artefacts, err := d.db.GetDeploymentArtefacts(ctx, deployment.Deployment.ID)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions backend/controller/dal/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/schema"
)

// NotificationPayload is a row from the database.
Expand Down Expand Up @@ -100,15 +99,11 @@ func (d *DAL) publishNotification(ctx context.Context, notification event, logge
if err != nil {
return Deployment{}, optional.None[model.DeploymentName](), translatePGError(err)
}
moduleSchema, err := schema.ModuleFromBytes(row.Deployment.Schema)
if err != nil {
return Deployment{}, optional.None[model.DeploymentName](), err
}
return Deployment{
CreatedAt: row.Deployment.CreatedAt,
Name: row.Deployment.Name,
Module: row.ModuleName,
Schema: moduleSchema,
Schema: row.Deployment.Schema,
MinReplicas: int(row.Deployment.MinReplicas),
Language: row.Language,
}, optional.None[model.DeploymentName](), nil
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/sql/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

//go:embed schema
var schema embed.FS
var migrationSchema embed.FS

// Migrate the database.
func Migrate(ctx context.Context, dsn string) error {
Expand All @@ -30,7 +30,7 @@ func Migrate(ctx context.Context, dsn string) error {
defer conn.Close()

db := dbmate.New(u)
db.FS = schema
db.FS = migrationSchema
db.Log = log.FromContext(ctx).Scope("migrate").WriterAt(log.Debug)
db.MigrationsDir = []string{"schema"}
err = db.CreateAndMigrate()
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ CREATE TABLE modules
name VARCHAR UNIQUE NOT NULL
);

-- Proto-encoded module schema.
CREATE DOMAIN module_schema_pb AS BYTEA;

CREATE TABLE deployments
(
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
module_id BIGINT NOT NULL REFERENCES modules (id) ON DELETE CASCADE,
-- Unique name for this deployment in the form <module-name>-<random>.
"name" VARCHAR UNIQUE NOT NULL,
-- Proto-encoded module schema.
"schema" BYTEA NOT NULL,
"schema" module_schema_pb NOT NULL,
-- Labels are used to match deployments to runners.
"labels" JSONB NOT NULL DEFAULT '{}',
min_replicas INT NOT NULL DEFAULT 0
Expand Down
19 changes: 19 additions & 0 deletions backend/schema/module.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package schema

import (
"database/sql"
"database/sql/driver"
"fmt"
"sort"
"strings"
Expand All @@ -22,6 +24,23 @@ type Module struct {

var _ Node = (*Module)(nil)
var _ Decl = (*Module)(nil)
var _ sql.Scanner = (*Module)(nil)
var _ driver.Valuer = (*Module)(nil)

func (m *Module) Value() (driver.Value, error) { return proto.Marshal(m.ToProto()) }
func (m *Module) Scan(src any) error {
switch src := src.(type) {
case []byte:
module, err := ModuleFromBytes(src)
if err != nil {
return err
}
*m = *module
return nil
default:
return fmt.Errorf("cannot scan %T", src)
}
}

// Scope returns a scope containing all the declarations in this module.
func (m *Module) Scope() Scope {
Expand Down
2 changes: 2 additions & 0 deletions sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ sql:
nullable: true
go_type:
type: "NullDuration"
- db_type: "module_schema_pb"
go_type: "*github.com/TBD54566975/ftl/backend/schema.Module"
- db_type: "timestamptz"
nullable: true
go_type:
Expand Down

0 comments on commit 3b36e78

Please sign in to comment.