From 4346c409b8c2e13d64deb6174f770b4c467ff6c7 Mon Sep 17 00:00:00 2001 From: Jon Johnson <113393155+jonathanj-square@users.noreply.github.com> Date: Tue, 1 Oct 2024 21:56:46 -0700 Subject: [PATCH] refactor: Isolate artifact access code (#2755) Co-authored-by: github-actions[bot] --- backend/controller/artefacts/artefact.go | 45 ++++++ backend/controller/artefacts/dal_registry.go | 129 +++++++++++++++ .../controller/artefacts/internal/sql/db.go | 31 ++++ .../artefacts/internal/sql/models.go | 5 + .../artefacts/internal/sql/querier.go | 22 +++ .../artefacts/internal/sql/queries.sql | 29 ++++ .../artefacts/internal/sql/queries.sql.go | 147 ++++++++++++++++++ backend/controller/controller.go | 8 +- backend/controller/dal/dal.go | 94 ++++------- backend/controller/dal/dal_test.go | 7 +- .../controller/dal/internal/sql/querier.go | 8 - .../controller/dal/internal/sql/queries.sql | 30 ---- .../dal/internal/sql/queries.sql.go | 133 ---------------- backend/controller/timeline/timeline_test.go | 7 +- sqlc.yaml | 7 + 15 files changed, 462 insertions(+), 240 deletions(-) create mode 100644 backend/controller/artefacts/artefact.go create mode 100644 backend/controller/artefacts/dal_registry.go create mode 100644 backend/controller/artefacts/internal/sql/db.go create mode 100644 backend/controller/artefacts/internal/sql/models.go create mode 100644 backend/controller/artefacts/internal/sql/querier.go create mode 100644 backend/controller/artefacts/internal/sql/queries.sql create mode 100644 backend/controller/artefacts/internal/sql/queries.sql.go diff --git a/backend/controller/artefacts/artefact.go b/backend/controller/artefacts/artefact.go new file mode 100644 index 0000000000..51b60e378c --- /dev/null +++ b/backend/controller/artefacts/artefact.go @@ -0,0 +1,45 @@ +package artefacts + +import ( + "context" + "io" + + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/sha256" +) + +// Metadata container for an artefact's metadata +type Metadata struct { + Executable bool + Path string +} + +// Artefact container for an artefact's payload and metadata +type Artefact struct { + Digest sha256.SHA256 + Metadata Metadata + Content []byte +} + +type ArtefactKey struct { + Digest sha256.SHA256 +} + +type ReleaseArtefact struct { + Artefact ArtefactKey + Path string + Executable bool +} + +type Registry interface { + // GetDigestsKeys locates the `digests` corresponding `ArtefactKey`s and identifies the missing ones + GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) + // Upload pushes the specified media, and metadata, to the registry and returns the computed digest + Upload(context context.Context, artefact Artefact) (sha256.SHA256, error) + // Download performs a streaming download of the artefact identified by the supplied digest + Download(context context.Context, digest sha256.SHA256) (io.ReadCloser, error) + // GetReleaseArtefacts locates the artefacts metadata corresponding with the specified release + GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ArtefactKey, error) + // AddReleaseArtefact associates the given `release` with the artefact associated with the given `digest` + AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error +} diff --git a/backend/controller/artefacts/dal_registry.go b/backend/controller/artefacts/dal_registry.go new file mode 100644 index 0000000000..42e7e9728e --- /dev/null +++ b/backend/controller/artefacts/dal_registry.go @@ -0,0 +1,129 @@ +package artefacts + +import ( + "context" + "fmt" + "io" + + sets "github.com/deckarep/golang-set/v2" + + "github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql" + "github.com/TBD54566975/ftl/backend/libdal" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/sha256" + "github.com/TBD54566975/ftl/internal/slices" +) + +type ArtefactRow struct { + ID int64 + Digest []byte +} + +type DAL interface { + GetArtefactDigests(ctx context.Context, digests [][]byte) ([]ArtefactRow, error) + CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) + GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) +} + +type Service struct { + *libdal.Handle[Service] + db sql.Querier +} + +func New(conn libdal.Connection) *Service { + return &Service{ + db: sql.New(conn), + Handle: libdal.New(conn, func(h *libdal.Handle[Service]) *Service { + return &Service{ + Handle: h, + db: sql.New(h.Connection), + } + }), + } +} + +func (s *Service) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) { + have, err := s.db.GetArtefactDigests(ctx, sha256esToBytes(digests)) + if err != nil { + return nil, nil, libdal.TranslatePGError(err) + } + keys = slices.Map(have, func(in sql.GetArtefactDigestsRow) ArtefactKey { + return ArtefactKey{Digest: sha256.FromBytes(in.Digest)} + }) + haveStr := slices.Map(keys, func(in ArtefactKey) sha256.SHA256 { + return in.Digest + }) + missing = sets.NewSet(digests...).Difference(sets.NewSet(haveStr...)).ToSlice() + return keys, missing, nil +} + +func (s *Service) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) { + sha256digest := sha256.Sum(artefact.Content) + _, err := s.db.CreateArtefact(ctx, sha256digest[:], artefact.Content) + return sha256digest, libdal.TranslatePGError(err) +} + +func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) { + digests := [][]byte{digest[:]} + rows, err := s.db.GetArtefactDigests(ctx, digests) + if err != nil { + return nil, fmt.Errorf("unable to get artefact digests: %w", libdal.TranslatePGError(err)) + } + if len(rows) == 0 { + return nil, fmt.Errorf("no artefact found with digest: %s", digest) + } + return &dalArtefactStream{db: s.db, id: rows[0].ID}, nil +} + +func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) { + rows, err := s.db.GetDeploymentArtefacts(ctx, releaseID) + if err != nil { + return nil, fmt.Errorf("unable to get release artefacts: %w", libdal.TranslatePGError(err)) + } + return slices.Map(rows, func(row sql.GetDeploymentArtefactsRow) ReleaseArtefact { + return ReleaseArtefact{ + Artefact: ArtefactKey{Digest: sha256.FromBytes(row.Digest)}, + Path: row.Path, + Executable: row.Executable, + } + }), nil +} + +func (s *Service) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error { + err := s.db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{ + Key: key, + Digest: ra.Artefact.Digest[:], + Executable: ra.Executable, + Path: ra.Path, + }) + if err != nil { + return libdal.TranslatePGError(err) + } + return nil +} + +type dalArtefactStream struct { + db sql.Querier + id int64 + offset int32 +} + +func (s *dalArtefactStream) Close() error { return nil } + +func (s *dalArtefactStream) Read(p []byte) (n int, err error) { + content, err := s.db.GetArtefactContentRange(context.Background(), s.offset+1, int32(len(p)), s.id) + if err != nil { + return 0, libdal.TranslatePGError(err) + } + copy(p, content) + clen := len(content) + s.offset += int32(clen) + if clen == 0 { + err = io.EOF + } + return clen, err +} + +func sha256esToBytes(digests []sha256.SHA256) [][]byte { + return slices.Map(digests, func(digest sha256.SHA256) []byte { return digest[:] }) +} diff --git a/backend/controller/artefacts/internal/sql/db.go b/backend/controller/artefacts/internal/sql/db.go new file mode 100644 index 0000000000..0e0973111c --- /dev/null +++ b/backend/controller/artefacts/internal/sql/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/backend/controller/artefacts/internal/sql/models.go b/backend/controller/artefacts/internal/sql/models.go new file mode 100644 index 0000000000..76baffe98e --- /dev/null +++ b/backend/controller/artefacts/internal/sql/models.go @@ -0,0 +1,5 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql diff --git a/backend/controller/artefacts/internal/sql/querier.go b/backend/controller/artefacts/internal/sql/querier.go new file mode 100644 index 0000000000..e51f7cef28 --- /dev/null +++ b/backend/controller/artefacts/internal/sql/querier.go @@ -0,0 +1,22 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" +) + +type Querier interface { + AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error + // Create a new artefact and return the artefact ID. + CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) + GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) + // Return the digests that exist in the database. + GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) + // Get all artefacts matching the given digests. + GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) +} + +var _ Querier = (*Queries)(nil) diff --git a/backend/controller/artefacts/internal/sql/queries.sql b/backend/controller/artefacts/internal/sql/queries.sql new file mode 100644 index 0000000000..9938764fc4 --- /dev/null +++ b/backend/controller/artefacts/internal/sql/queries.sql @@ -0,0 +1,29 @@ +-- name: GetArtefactDigests :many +-- Return the digests that exist in the database. +SELECT id, digest +FROM artefacts +WHERE digest = ANY (@digests::bytea[]); + +-- name: CreateArtefact :one +-- Create a new artefact and return the artefact ID. +INSERT INTO artefacts (digest, content) +VALUES ($1, $2) + ON CONFLICT (digest) +DO UPDATE SET digest = EXCLUDED.digest + RETURNING id; + +-- name: GetArtefactContentRange :one +SELECT SUBSTRING(a.content FROM @start FOR @count)::BYTEA AS content +FROM artefacts a +WHERE a.id = @id; + +-- name: GetDeploymentArtefacts :many +-- Get all artefacts matching the given digests. +SELECT da.created_at, artefact_id AS id, executable, path, digest, executable +FROM deployment_artefacts da + INNER JOIN artefacts ON artefacts.id = da.artefact_id +WHERE deployment_id = $1; + +-- name: AssociateArtefactWithDeployment :exec +INSERT INTO deployment_artefacts (deployment_id, artefact_id, executable, path) +VALUES ((SELECT id FROM deployments WHERE key = @key::deployment_key), (SELECT id FROM artefacts WHERE digest = @digest::bytea), $3, $4); \ No newline at end of file diff --git a/backend/controller/artefacts/internal/sql/queries.sql.go b/backend/controller/artefacts/internal/sql/queries.sql.go new file mode 100644 index 0000000000..d42d7e1ad1 --- /dev/null +++ b/backend/controller/artefacts/internal/sql/queries.sql.go @@ -0,0 +1,147 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: queries.sql + +package sql + +import ( + "context" + "time" + + "github.com/TBD54566975/ftl/internal/model" + "github.com/lib/pq" +) + +const associateArtefactWithDeployment = `-- name: AssociateArtefactWithDeployment :exec +INSERT INTO deployment_artefacts (deployment_id, artefact_id, executable, path) +VALUES ((SELECT id FROM deployments WHERE key = $1::deployment_key), (SELECT id FROM artefacts WHERE digest = $2::bytea), $3, $4) +` + +type AssociateArtefactWithDeploymentParams struct { + Key model.DeploymentKey + Digest []byte + Executable bool + Path string +} + +func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error { + _, err := q.db.ExecContext(ctx, associateArtefactWithDeployment, + arg.Key, + arg.Digest, + arg.Executable, + arg.Path, + ) + return err +} + +const createArtefact = `-- name: CreateArtefact :one +INSERT INTO artefacts (digest, content) +VALUES ($1, $2) + ON CONFLICT (digest) +DO UPDATE SET digest = EXCLUDED.digest + RETURNING id +` + +// Create a new artefact and return the artefact ID. +func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) { + row := q.db.QueryRowContext(ctx, createArtefact, digest, content) + var id int64 + err := row.Scan(&id) + return id, err +} + +const getArtefactContentRange = `-- name: GetArtefactContentRange :one +SELECT SUBSTRING(a.content FROM $1 FOR $2)::BYTEA AS content +FROM artefacts a +WHERE a.id = $3 +` + +func (q *Queries) GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) { + row := q.db.QueryRowContext(ctx, getArtefactContentRange, start, count, iD) + var content []byte + err := row.Scan(&content) + return content, err +} + +const getArtefactDigests = `-- name: GetArtefactDigests :many +SELECT id, digest +FROM artefacts +WHERE digest = ANY ($1::bytea[]) +` + +type GetArtefactDigestsRow struct { + ID int64 + Digest []byte +} + +// Return the digests that exist in the database. +func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) { + rows, err := q.db.QueryContext(ctx, getArtefactDigests, pq.Array(digests)) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetArtefactDigestsRow + for rows.Next() { + var i GetArtefactDigestsRow + if err := rows.Scan(&i.ID, &i.Digest); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getDeploymentArtefacts = `-- name: GetDeploymentArtefacts :many +SELECT da.created_at, artefact_id AS id, executable, path, digest, executable +FROM deployment_artefacts da + INNER JOIN artefacts ON artefacts.id = da.artefact_id +WHERE deployment_id = $1 +` + +type GetDeploymentArtefactsRow struct { + CreatedAt time.Time + ID int64 + Executable bool + Path string + Digest []byte + Executable_2 bool +} + +// Get all artefacts matching the given digests. +func (q *Queries) GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) { + rows, err := q.db.QueryContext(ctx, getDeploymentArtefacts, deploymentID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetDeploymentArtefactsRow + for rows.Next() { + var i GetDeploymentArtefactsRow + if err := rows.Scan( + &i.CreatedAt, + &i.ID, + &i.Executable, + &i.Path, + &i.Digest, + &i.Executable_2, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/backend/controller/controller.go b/backend/controller/controller.go index ac05aabd7b..75dae70dca 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -35,6 +35,7 @@ import ( "github.com/TBD54566975/ftl" "github.com/TBD54566975/ftl/backend/controller/admin" + "github.com/TBD54566975/ftl/backend/controller/artefacts" "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/console" "github.com/TBD54566975/ftl/backend/controller/cronjobs" @@ -221,6 +222,7 @@ type Service struct { tasks *scheduledtask.Scheduler cronJobs *cronjobs.Service pubSub *pubsub.Service + registry *artefacts.Service timeline *timeline.Service controllerListListeners []ControllerListListener @@ -275,6 +277,8 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca pubSub := pubsub.New(conn, encryption, svc.tasks, optional.Some[pubsub.AsyncCallListener](svc)) svc.pubSub = pubSub + svc.registry = artefacts.New(conn) + svc.dal = dal.New(ctx, conn, encryption, pubSub) timelineSvc := timeline.New(ctx, conn, encryption) @@ -1098,7 +1102,7 @@ func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftl if err != nil { return nil, err } - need, err := s.dal.GetMissingArtefacts(ctx, byteDigests) + _, need, err := s.registry.GetDigestsKeys(ctx, byteDigests) if err != nil { return nil, err } @@ -1109,7 +1113,7 @@ func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftl func (s *Service) UploadArtefact(ctx context.Context, req *connect.Request[ftlv1.UploadArtefactRequest]) (*connect.Response[ftlv1.UploadArtefactResponse], error) { logger := log.FromContext(ctx) - digest, err := s.dal.CreateArtefact(ctx, req.Msg.Content) + digest, err := s.registry.Upload(ctx, artefacts.Artefact{Content: req.Msg.Content}) if err != nil { return nil, err } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index f0cd7ff317..cf2a43a40f 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -6,16 +6,14 @@ import ( "encoding/json" "errors" "fmt" - "io" - "strings" "time" "github.com/alecthomas/types/optional" inprocesspubsub "github.com/alecthomas/types/pubsub" - sets "github.com/deckarep/golang-set/v2" xmaps "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" + aregistry "github.com/TBD54566975/ftl/backend/controller/artefacts" dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" @@ -55,10 +53,12 @@ type Reservation interface { func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service) *DAL { var d *DAL + db := dalsql.New(conn) d = &DAL{ leaser: dbleaser.NewDatabaseLeaser(conn), - db: dalsql.New(conn), + db: db, encryption: encryption, + registry: aregistry.New(conn), Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { return &DAL{ Handle: h, @@ -66,6 +66,7 @@ func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Ser leaser: dbleaser.NewDatabaseLeaser(h.Connection), pubsub: pubsub, encryption: d.encryption, + registry: aregistry.New(h.Connection), DeploymentChanges: d.DeploymentChanges, } }), @@ -82,6 +83,7 @@ type DAL struct { leaser *dbleaser.DatabaseLeaser pubsub *pubsub.Service encryption *encryption.Service + registry *aregistry.Service // DeploymentChanges is a Topic that receives changes to the deployments table. DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification] @@ -194,25 +196,6 @@ func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err erro return libdal.TranslatePGError(err) } -// GetMissingArtefacts returns the digests of the artefacts that are missing from the database. -func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error) { - have, err := d.db.GetArtefactDigests(ctx, sha256esToBytes(digests)) - if err != nil { - return nil, libdal.TranslatePGError(err) - } - haveStr := slices.Map(have, func(in dalsql.GetArtefactDigestsRow) sha256.SHA256 { - return sha256.FromBytes(in.Digest) - }) - return sets.NewSet(digests...).Difference(sets.NewSet(haveStr...)).ToSlice(), nil -} - -// CreateArtefact inserts a new artefact into the database and returns its ID. -func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error) { - sha256digest := sha256.Sum(content) - _, err = d.db.CreateArtefact(ctx, sha256digest[:], content) - return sha256digest, libdal.TranslatePGError(err) -} - type IngressRoutingEntry struct { Verb string Method string @@ -281,22 +264,23 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem return model.DeploymentKey{}, fmt.Errorf("failed to create deployment: %w", libdal.TranslatePGError(err)) } - uploadedDigests := slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) []byte { return in.Digest[:] }) - artefactDigests, err := tx.db.GetArtefactDigests(ctx, uploadedDigests) + uploadedDigests := slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) sha256.SHA256 { return sha256.FromBytes(in.Digest[:]) }) + keys, missing, err := tx.registry.GetDigestsKeys(ctx, uploadedDigests) if err != nil { return model.DeploymentKey{}, fmt.Errorf("failed to get artefact digests: %w", err) } - if len(artefactDigests) != len(artefacts) { - missingDigests := strings.Join(slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) string { return in.Digest.String() }), ", ") - return model.DeploymentKey{}, fmt.Errorf("missing %d artefacts: %s", len(artefacts)-len(artefactDigests), missingDigests) + if len(missing) > 0 { + m := slices.Reduce(missing, "", func(join string, in sha256.SHA256) string { + return fmt.Sprintf("%s, %s", join, in.String()) + }) + return model.DeploymentKey{}, fmt.Errorf("missing digests %s", m) } // Associate the artefacts with the deployment - for _, row := range artefactDigests { - artefact := artefactsByDigest[sha256.FromBytes(row.Digest)] - err = tx.db.AssociateArtefactWithDeployment(ctx, dalsql.AssociateArtefactWithDeploymentParams{ - Key: deploymentKey, - ArtefactID: row.ID, + for _, row := range keys { + artefact := artefactsByDigest[row.Digest] + err = tx.registry.AddReleaseArtefact(ctx, deploymentKey, aregistry.ReleaseArtefact{ + Artefact: aregistry.ArtefactKey{Digest: row.Digest}, Executable: artefact.Executable, Path: artefact.Path, }) @@ -701,18 +685,26 @@ func (d *DAL) loadDeployment(ctx context.Context, deployment dalsql.GetDeploymen Key: deployment.Deployment.Key, Schema: deployment.Deployment.Schema, } - artefacts, err := d.db.GetDeploymentArtefacts(ctx, deployment.Deployment.ID) + ras, err := d.registry.GetReleaseArtefacts(ctx, deployment.Deployment.ID) + if err != nil { return nil, libdal.TranslatePGError(err) } - out.Artefacts = slices.Map(artefacts, func(row dalsql.GetDeploymentArtefactsRow) *model.Artefact { - return &model.Artefact{ - Path: row.Path, - Executable: row.Executable, - Content: &artefactReader{id: row.ID, db: d.db}, - Digest: sha256.FromBytes(row.Digest), + out.Artefacts, err = slices.MapErr(ras, func(ra aregistry.ReleaseArtefact) (*model.Artefact, error) { + content, err := d.registry.Download(ctx, ra.Artefact.Digest) + if err != nil { + return nil, fmt.Errorf("artefact download failed: %w", libdal.TranslatePGError(err)) } + return &model.Artefact{ + Path: ra.Path, + Executable: ra.Executable, + Content: content, + Digest: ra.Artefact.Digest, + }, nil }) + if err != nil { + return nil, fmt.Errorf("an artefact download failed: %w", err) + } return out, nil } @@ -779,25 +771,3 @@ func (*DAL) checkForExistingDeployments(ctx context.Context, tx *DAL, moduleSche func sha256esToBytes(digests []sha256.SHA256) [][]byte { return slices.Map(digests, func(digest sha256.SHA256) []byte { return digest[:] }) } - -type artefactReader struct { - id int64 - db dalsql.Querier - offset int32 -} - -func (r *artefactReader) Close() error { return nil } - -func (r *artefactReader) Read(p []byte) (n int, err error) { - content, err := r.db.GetArtefactContentRange(context.Background(), r.offset+1, int32(len(p)), r.id) - if err != nil { - return 0, libdal.TranslatePGError(err) - } - copy(p, content) - clen := len(content) - r.offset += int32(clen) - if clen == 0 { - err = io.EOF - } - return clen, err -} diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index 8cd24b9d52..e909b99eab 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -3,6 +3,7 @@ package dal import ( "bytes" "context" + "github.com/TBD54566975/ftl/backend/controller/artefacts" "io" "sync" "testing" @@ -56,7 +57,7 @@ func TestDAL(t *testing.T) { var testSha sha256.SHA256 t.Run("CreateArtefact", func(t *testing.T) { - testSha, err = dal.CreateArtefact(ctx, testContent) + testSha, err = dal.registry.Upload(ctx, artefacts.Artefact{Content: testContent}) assert.NoError(t, err) }) @@ -100,7 +101,7 @@ func TestDAL(t *testing.T) { t.Run("GetMissingArtefacts", func(t *testing.T) { misshingSHA := sha256.MustParseSHA256("fae7e4cbdca7167bbea4098c05d596f50bbb18062b61c1dfca3705b4a6c2888c") - missing, err := dal.GetMissingArtefacts(ctx, []sha256.SHA256{testSHA, misshingSHA}) + _, missing, err := dal.registry.GetDigestsKeys(ctx, []sha256.SHA256{testSHA, misshingSHA}) assert.NoError(t, err) assert.Equal(t, []sha256.SHA256{misshingSHA}, missing) }) @@ -203,7 +204,7 @@ func TestCreateArtefactConflict(t *testing.T) { defer wg.Done() tx1, err := dal.Begin(ctx) assert.NoError(t, err) - digest, err := tx1.CreateArtefact(ctx, []byte("content")) + digest, err := dal.registry.Upload(ctx, artefacts.Artefact{Content: []byte("content")}) assert.NoError(t, err) time.Sleep(time.Second * 2) err = tx1.Commit(ctx) diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index b295e277a3..b8047ac6f2 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -19,12 +19,9 @@ type Querier interface { // Reserve a pending async call for execution, returning the associated lease // reservation key and accompanying metadata. AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) (AcquireAsyncCallRow, error) - AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error AsyncCallQueueDepth(ctx context.Context) (int64, error) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error CompleteEventForSubscription(ctx context.Context, name string, module string) error - // Create a new artefact and return the artefact ID. - CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error @@ -43,13 +40,8 @@ type Querier interface { GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error) - GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) - // Return the digests that exist in the database. - GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error) - // Get all artefacts matching the given digests. - GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error) // Get all deployments that have artefacts matching the given digests. GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error) diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index 29987c422e..ec47c6fc8e 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -18,31 +18,6 @@ WHERE id = ANY (@ids::BIGINT[]); INSERT INTO deployments (module_id, "schema", "key") VALUES ((SELECT id FROM modules WHERE name = @module_name::TEXT LIMIT 1), @schema::BYTEA, @key::deployment_key); --- name: GetArtefactDigests :many --- Return the digests that exist in the database. -SELECT id, digest -FROM artefacts -WHERE digest = ANY (@digests::bytea[]); - --- name: GetDeploymentArtefacts :many --- Get all artefacts matching the given digests. -SELECT da.created_at, artefact_id AS id, executable, path, digest, executable -FROM deployment_artefacts da - INNER JOIN artefacts ON artefacts.id = da.artefact_id -WHERE deployment_id = $1; - --- name: CreateArtefact :one --- Create a new artefact and return the artefact ID. -INSERT INTO artefacts (digest, content) -VALUES ($1, $2) -ON CONFLICT (digest) -DO UPDATE SET digest = EXCLUDED.digest -RETURNING id; - --- name: AssociateArtefactWithDeployment :exec -INSERT INTO deployment_artefacts (deployment_id, artefact_id, executable, path) -VALUES ((SELECT id FROM deployments WHERE key = @key::deployment_key), $2, $3, $4); - -- name: GetDeployment :one SELECT sqlc.embed(d), m.language, m.name AS module_name, d.min_replicas FROM deployments d @@ -63,11 +38,6 @@ WHERE EXISTS (SELECT 1 HAVING COUNT(*) = @count::BIGINT -- Number of unique digests provided ); --- name: GetArtefactContentRange :one -SELECT SUBSTRING(a.content FROM @start FOR @count)::BYTEA AS content -FROM artefacts a -WHERE a.id = @id; - -- name: UpsertRunner :one -- Upsert a runner and return the deployment ID that it is assigned to, if any. WITH deployment_rel AS ( diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index 9ae6a2fffd..79043ef9e7 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -105,28 +105,6 @@ func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) ( return i, err } -const associateArtefactWithDeployment = `-- name: AssociateArtefactWithDeployment :exec -INSERT INTO deployment_artefacts (deployment_id, artefact_id, executable, path) -VALUES ((SELECT id FROM deployments WHERE key = $1::deployment_key), $2, $3, $4) -` - -type AssociateArtefactWithDeploymentParams struct { - Key model.DeploymentKey - ArtefactID int64 - Executable bool - Path string -} - -func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error { - _, err := q.db.ExecContext(ctx, associateArtefactWithDeployment, - arg.Key, - arg.ArtefactID, - arg.Executable, - arg.Path, - ) - return err -} - const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec WITH event AS ( SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context @@ -161,22 +139,6 @@ func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, return err } -const createArtefact = `-- name: CreateArtefact :one -INSERT INTO artefacts (digest, content) -VALUES ($1, $2) -ON CONFLICT (digest) -DO UPDATE SET digest = EXCLUDED.digest -RETURNING id -` - -// Create a new artefact and return the artefact ID. -func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) { - row := q.db.QueryRowContext(ctx, createArtefact, digest, content) - var id int64 - err := row.Scan(&id) - return id, err -} - const createCronJob = `-- name: CreateCronJob :exec INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) VALUES ( @@ -670,54 +632,6 @@ func (q *Queries) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, return items, nil } -const getArtefactContentRange = `-- name: GetArtefactContentRange :one -SELECT SUBSTRING(a.content FROM $1 FOR $2)::BYTEA AS content -FROM artefacts a -WHERE a.id = $3 -` - -func (q *Queries) GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) { - row := q.db.QueryRowContext(ctx, getArtefactContentRange, start, count, iD) - var content []byte - err := row.Scan(&content) - return content, err -} - -const getArtefactDigests = `-- name: GetArtefactDigests :many -SELECT id, digest -FROM artefacts -WHERE digest = ANY ($1::bytea[]) -` - -type GetArtefactDigestsRow struct { - ID int64 - Digest []byte -} - -// Return the digests that exist in the database. -func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) { - rows, err := q.db.QueryContext(ctx, getArtefactDigests, pq.Array(digests)) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetArtefactDigestsRow - for rows.Next() { - var i GetArtefactDigestsRow - if err := rows.Scan(&i.ID, &i.Digest); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const getCronJobByKey = `-- name: GetCronJobByKey :one SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j @@ -788,53 +702,6 @@ func (q *Queries) GetDeployment(ctx context.Context, key model.DeploymentKey) (G return i, err } -const getDeploymentArtefacts = `-- name: GetDeploymentArtefacts :many -SELECT da.created_at, artefact_id AS id, executable, path, digest, executable -FROM deployment_artefacts da - INNER JOIN artefacts ON artefacts.id = da.artefact_id -WHERE deployment_id = $1 -` - -type GetDeploymentArtefactsRow struct { - CreatedAt time.Time - ID int64 - Executable bool - Path string - Digest []byte - Executable_2 bool -} - -// Get all artefacts matching the given digests. -func (q *Queries) GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) { - rows, err := q.db.QueryContext(ctx, getDeploymentArtefacts, deploymentID) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetDeploymentArtefactsRow - for rows.Next() { - var i GetDeploymentArtefactsRow - if err := rows.Scan( - &i.CreatedAt, - &i.ID, - &i.Executable, - &i.Path, - &i.Digest, - &i.Executable_2, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const getDeploymentsByID = `-- name: GetDeploymentsByID :many SELECT id, created_at, module_id, key, schema, labels, min_replicas FROM deployments diff --git a/backend/controller/timeline/timeline_test.go b/backend/controller/timeline/timeline_test.go index b58a9d346e..c61843d0f9 100644 --- a/backend/controller/timeline/timeline_test.go +++ b/backend/controller/timeline/timeline_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "github.com/TBD54566975/ftl/backend/controller/artefacts" "io" "net/http" "net/url" @@ -34,6 +35,7 @@ func TestTimeline(t *testing.T) { assert.NoError(t, err) timeline := New(ctx, conn, encryption) + registry := artefacts.New(conn) scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub) @@ -48,7 +50,7 @@ func TestTimeline(t *testing.T) { var testSha sha256.SHA256 t.Run("CreateArtefact", func(t *testing.T) { - testSha, err = controllerDAL.CreateArtefact(ctx, testContent) + testSha, err = registry.Upload(ctx, artefacts.Artefact{Content: testContent}) assert.NoError(t, err) }) @@ -221,6 +223,7 @@ func TestDeleteOldEvents(t *testing.T) { assert.NoError(t, err) timeline := New(ctx, conn, encryption) + registry := artefacts.New(conn) scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub) @@ -229,7 +232,7 @@ func TestDeleteOldEvents(t *testing.T) { var testSha sha256.SHA256 t.Run("CreateArtefact", func(t *testing.T) { - testSha, err = controllerDAL.CreateArtefact(ctx, testContent) + testSha, err = registry.Upload(ctx, artefacts.Artefact{Content: testContent}) assert.NoError(t, err) }) diff --git a/sqlc.yaml b/sqlc.yaml index 3ac3d13605..f98ef0aee9 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -168,6 +168,13 @@ sql: go: <<: *gengo out: "internal/configuration/dal/internal/sql" + - <<: *daldir + queries: + - backend/controller/artefacts/internal/sql/queries.sql + gen: + go: + <<: *gengo + out: "backend/controller/artefacts/internal/sql" - <<: *daldir queries: - backend/controller/leases/dbleaser/internal/sql/queries.sql