generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: Isolate artifact access code (#2755)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
31053ff
commit 4346c40
Showing
15 changed files
with
462 additions
and
240 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package artefacts | ||
|
||
import ( | ||
"context" | ||
"io" | ||
|
||
"github.com/TBD54566975/ftl/internal/model" | ||
"github.com/TBD54566975/ftl/internal/sha256" | ||
) | ||
|
||
// Metadata container for an artefact's metadata | ||
type Metadata struct { | ||
Executable bool | ||
Path string | ||
} | ||
|
||
// Artefact container for an artefact's payload and metadata | ||
type Artefact struct { | ||
Digest sha256.SHA256 | ||
Metadata Metadata | ||
Content []byte | ||
} | ||
|
||
type ArtefactKey struct { | ||
Digest sha256.SHA256 | ||
} | ||
|
||
type ReleaseArtefact struct { | ||
Artefact ArtefactKey | ||
Path string | ||
Executable bool | ||
} | ||
|
||
type Registry interface { | ||
// GetDigestsKeys locates the `digests` corresponding `ArtefactKey`s and identifies the missing ones | ||
GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) | ||
// Upload pushes the specified media, and metadata, to the registry and returns the computed digest | ||
Upload(context context.Context, artefact Artefact) (sha256.SHA256, error) | ||
// Download performs a streaming download of the artefact identified by the supplied digest | ||
Download(context context.Context, digest sha256.SHA256) (io.ReadCloser, error) | ||
// GetReleaseArtefacts locates the artefacts metadata corresponding with the specified release | ||
GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ArtefactKey, error) | ||
// AddReleaseArtefact associates the given `release` with the artefact associated with the given `digest` | ||
AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package artefacts | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
|
||
sets "github.com/deckarep/golang-set/v2" | ||
|
||
"github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql" | ||
"github.com/TBD54566975/ftl/backend/libdal" | ||
"github.com/TBD54566975/ftl/internal/model" | ||
"github.com/TBD54566975/ftl/internal/sha256" | ||
"github.com/TBD54566975/ftl/internal/slices" | ||
) | ||
|
||
type ArtefactRow struct { | ||
ID int64 | ||
Digest []byte | ||
} | ||
|
||
type DAL interface { | ||
GetArtefactDigests(ctx context.Context, digests [][]byte) ([]ArtefactRow, error) | ||
CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) | ||
GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) | ||
} | ||
|
||
type Service struct { | ||
*libdal.Handle[Service] | ||
db sql.Querier | ||
} | ||
|
||
func New(conn libdal.Connection) *Service { | ||
return &Service{ | ||
db: sql.New(conn), | ||
Handle: libdal.New(conn, func(h *libdal.Handle[Service]) *Service { | ||
return &Service{ | ||
Handle: h, | ||
db: sql.New(h.Connection), | ||
} | ||
}), | ||
} | ||
} | ||
|
||
func (s *Service) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) { | ||
have, err := s.db.GetArtefactDigests(ctx, sha256esToBytes(digests)) | ||
if err != nil { | ||
return nil, nil, libdal.TranslatePGError(err) | ||
} | ||
keys = slices.Map(have, func(in sql.GetArtefactDigestsRow) ArtefactKey { | ||
return ArtefactKey{Digest: sha256.FromBytes(in.Digest)} | ||
}) | ||
haveStr := slices.Map(keys, func(in ArtefactKey) sha256.SHA256 { | ||
return in.Digest | ||
}) | ||
missing = sets.NewSet(digests...).Difference(sets.NewSet(haveStr...)).ToSlice() | ||
return keys, missing, nil | ||
} | ||
|
||
func (s *Service) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) { | ||
sha256digest := sha256.Sum(artefact.Content) | ||
_, err := s.db.CreateArtefact(ctx, sha256digest[:], artefact.Content) | ||
return sha256digest, libdal.TranslatePGError(err) | ||
} | ||
|
||
func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) { | ||
digests := [][]byte{digest[:]} | ||
rows, err := s.db.GetArtefactDigests(ctx, digests) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to get artefact digests: %w", libdal.TranslatePGError(err)) | ||
} | ||
if len(rows) == 0 { | ||
return nil, fmt.Errorf("no artefact found with digest: %s", digest) | ||
} | ||
return &dalArtefactStream{db: s.db, id: rows[0].ID}, nil | ||
} | ||
|
||
func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) { | ||
rows, err := s.db.GetDeploymentArtefacts(ctx, releaseID) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to get release artefacts: %w", libdal.TranslatePGError(err)) | ||
} | ||
return slices.Map(rows, func(row sql.GetDeploymentArtefactsRow) ReleaseArtefact { | ||
return ReleaseArtefact{ | ||
Artefact: ArtefactKey{Digest: sha256.FromBytes(row.Digest)}, | ||
Path: row.Path, | ||
Executable: row.Executable, | ||
} | ||
}), nil | ||
} | ||
|
||
func (s *Service) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error { | ||
err := s.db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{ | ||
Key: key, | ||
Digest: ra.Artefact.Digest[:], | ||
Executable: ra.Executable, | ||
Path: ra.Path, | ||
}) | ||
if err != nil { | ||
return libdal.TranslatePGError(err) | ||
} | ||
return nil | ||
} | ||
|
||
type dalArtefactStream struct { | ||
db sql.Querier | ||
id int64 | ||
offset int32 | ||
} | ||
|
||
func (s *dalArtefactStream) Close() error { return nil } | ||
|
||
func (s *dalArtefactStream) Read(p []byte) (n int, err error) { | ||
content, err := s.db.GetArtefactContentRange(context.Background(), s.offset+1, int32(len(p)), s.id) | ||
if err != nil { | ||
return 0, libdal.TranslatePGError(err) | ||
} | ||
copy(p, content) | ||
clen := len(content) | ||
s.offset += int32(clen) | ||
if clen == 0 { | ||
err = io.EOF | ||
} | ||
return clen, err | ||
} | ||
|
||
func sha256esToBytes(digests []sha256.SHA256) [][]byte { | ||
return slices.Map(digests, func(digest sha256.SHA256) []byte { return digest[:] }) | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
-- name: GetArtefactDigests :many | ||
-- Return the digests that exist in the database. | ||
SELECT id, digest | ||
FROM artefacts | ||
WHERE digest = ANY (@digests::bytea[]); | ||
|
||
-- name: CreateArtefact :one | ||
-- Create a new artefact and return the artefact ID. | ||
INSERT INTO artefacts (digest, content) | ||
VALUES ($1, $2) | ||
ON CONFLICT (digest) | ||
DO UPDATE SET digest = EXCLUDED.digest | ||
RETURNING id; | ||
|
||
-- name: GetArtefactContentRange :one | ||
SELECT SUBSTRING(a.content FROM @start FOR @count)::BYTEA AS content | ||
FROM artefacts a | ||
WHERE a.id = @id; | ||
|
||
-- name: GetDeploymentArtefacts :many | ||
-- Get all artefacts matching the given digests. | ||
SELECT da.created_at, artefact_id AS id, executable, path, digest, executable | ||
FROM deployment_artefacts da | ||
INNER JOIN artefacts ON artefacts.id = da.artefact_id | ||
WHERE deployment_id = $1; | ||
|
||
-- name: AssociateArtefactWithDeployment :exec | ||
INSERT INTO deployment_artefacts (deployment_id, artefact_id, executable, path) | ||
VALUES ((SELECT id FROM deployments WHERE key = @key::deployment_key), (SELECT id FROM artefacts WHERE digest = @digest::bytea), $3, $4); |
Oops, something went wrong.