Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Isolate artifact access code #2755

Merged
merged 13 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions backend/controller/artefacts/artefact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package artefacts

import (
"context"
"io"

"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/sha256"
)

// Metadata container for an artefact's metadata
type Metadata struct {
Executable bool
Path string
}

// Artefact container for an artefact's payload and metadata
type Artefact struct {
Digest sha256.SHA256
Metadata Metadata
Content []byte
}

type ArtefactKey struct {
Digest sha256.SHA256
id int64
}

type ReleaseArtefact struct {
Artefact ArtefactKey
Path string
Executable bool
}

type Registry interface {
// GetDigestsKeys locates the `digests` corresponding `ArtefactKey`s and identifies the missing ones
GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error)
// Upload pushes the specified media, and metadata, to the registry and returns the computed digest
Upload(context context.Context, artefact Artefact) (sha256.SHA256, error)
// Download performs a streaming download of the artefact identified by the supplied digest
Download(context context.Context, digest sha256.SHA256) (io.ReadCloser, error)
// GetReleaseArtefacts locates the artefacts metadata corresponding with the specified release
GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ArtefactKey, error)
// AddReleaseArtefact associates the given `release` with the artefact associated with the given `digest`
AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error
}
129 changes: 129 additions & 0 deletions backend/controller/artefacts/dal_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package artefacts

import (
"context"
"fmt"
"io"

sets "github.com/deckarep/golang-set/v2"

"github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/sha256"
"github.com/TBD54566975/ftl/internal/slices"
)

type ArtefactRow struct {
ID int64
Digest []byte
}

type DAL interface {
GetArtefactDigests(ctx context.Context, digests [][]byte) ([]ArtefactRow, error)
CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error)
GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error)
}

type Service struct {
*libdal.Handle[Service]
db sql.Querier
}

func New(_ context.Context, conn libdal.Connection) *Service {
jonathanj-square marked this conversation as resolved.
Show resolved Hide resolved
return &Service{
db: sql.New(conn),
Handle: libdal.New(conn, func(h *libdal.Handle[Service]) *Service {
return &Service{
Handle: h,
db: sql.New(h.Connection),
}
}),
}
}

func (s *Service) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
have, err := s.db.GetArtefactDigests(ctx, sha256esToBytes(digests))
if err != nil {
return nil, nil, libdal.TranslatePGError(err)
}
keys = slices.Map(have, func(in sql.GetArtefactDigestsRow) ArtefactKey {
return ArtefactKey{id: in.ID, Digest: sha256.FromBytes(in.Digest)}
})
haveStr := slices.Map(keys, func(in ArtefactKey) sha256.SHA256 {
return in.Digest
})
missing = sets.NewSet(digests...).Difference(sets.NewSet(haveStr...)).ToSlice()
return keys, missing, nil
}

func (s *Service) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
sha256digest := sha256.Sum(artefact.Content)
_, err := s.db.CreateArtefact(ctx, sha256digest[:], artefact.Content)
return sha256digest, libdal.TranslatePGError(err)
}

func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
digests := [][]byte{digest[:]}
rows, err := s.db.GetArtefactDigests(ctx, digests)
if err != nil {
return nil, fmt.Errorf("unable to get artefact digests: %w", libdal.TranslatePGError(err))
}
if len(rows) == 0 {
return nil, fmt.Errorf("no artefact found with digest: %s", digest)
}
return &dalArtefactStream{db: s.db, id: rows[0].ID}, nil
}

func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
rows, err := s.db.GetDeploymentArtefacts(ctx, releaseID)
if err != nil {
return nil, fmt.Errorf("unable to get release artefacts: %w", libdal.TranslatePGError(err))
}
return slices.Map(rows, func(row sql.GetDeploymentArtefactsRow) ReleaseArtefact {
return ReleaseArtefact{
Artefact: ArtefactKey{Digest: sha256.FromBytes(row.Digest), id: row.ID},
Path: row.Path,
Executable: row.Executable,
}
}), nil
}

func (s *Service) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
err := s.db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{
Key: key,
Digest: ra.Artefact.Digest[:],
Executable: ra.Executable,
Path: ra.Path,
})
if err != nil {
return libdal.TranslatePGError(err)
}
return nil
}

type dalArtefactStream struct {
db sql.Querier
id int64
offset int32
}

func (s *dalArtefactStream) Close() error { return nil }

func (s *dalArtefactStream) Read(p []byte) (n int, err error) {
content, err := s.db.GetArtefactContentRange(context.Background(), s.offset+1, int32(len(p)), s.id)
if err != nil {
return 0, libdal.TranslatePGError(err)
}
copy(p, content)
clen := len(content)
s.offset += int32(clen)
if clen == 0 {
err = io.EOF
}
return clen, err
}

func sha256esToBytes(digests []sha256.SHA256) [][]byte {
return slices.Map(digests, func(digest sha256.SHA256) []byte { return digest[:] })
}
31 changes: 31 additions & 0 deletions backend/controller/artefacts/internal/sql/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions backend/controller/artefacts/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions backend/controller/artefacts/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions backend/controller/artefacts/internal/sql/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- name: GetArtefactDigests :many
-- Return the digests that exist in the database.
SELECT id, digest
FROM artefacts
WHERE digest = ANY (@digests::bytea[]);

-- name: CreateArtefact :one
-- Create a new artefact and return the artefact ID.
INSERT INTO artefacts (digest, content)
VALUES ($1, $2)
ON CONFLICT (digest)
DO UPDATE SET digest = EXCLUDED.digest
RETURNING id;

-- name: GetArtefactContentRange :one
SELECT SUBSTRING(a.content FROM @start FOR @count)::BYTEA AS content
FROM artefacts a
WHERE a.id = @id;

-- name: GetDeploymentArtefacts :many
-- Get all artefacts matching the given digests.
SELECT da.created_at, artefact_id AS id, executable, path, digest, executable
FROM deployment_artefacts da
INNER JOIN artefacts ON artefacts.id = da.artefact_id
WHERE deployment_id = $1;

-- name: AssociateArtefactWithDeployment :exec
INSERT INTO deployment_artefacts (deployment_id, artefact_id, executable, path)
VALUES ((SELECT id FROM deployments WHERE key = @key::deployment_key), (SELECT id FROM artefacts WHERE digest = @digest::bytea), $3, $4);
Loading
Loading