Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit module artifacts as blobs #3184

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 42 additions & 117 deletions backend/controller/artefacts/oci_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"strings"

"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2"
"oras.land/oras-go/v2/content/memory"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"
Expand All @@ -24,7 +23,7 @@ import (
)

const (
ModuleArtifactPrefix = "ftl/modules/"
ModuleBlobsPrefix = "ftl/modules/"
)

type ContainerConfig struct {
Expand All @@ -35,8 +34,8 @@ type ContainerConfig struct {
}

type ContainerService struct {
host string
repoConnectionBuilder func(container string) (*remote.Repository, error)
host string
repoFactory func() (*remote.Repository, error)

// in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table
Handle *libdal.Handle[ContainerService]
Expand All @@ -51,10 +50,16 @@ type ArtefactRepository struct {
Size int64
}

type ArtefactBlobs struct {
Digest sha256.SHA256
MediaType string
Size int64
}

func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService {
// Connect the registry targeting the specified container
repoConnectionBuilder := func(path string) (*remote.Repository, error) {
ref := fmt.Sprintf("%s/%s", c.Registry, path)
repoFactory := func() (*remote.Repository, error) {
ref := fmt.Sprintf("%s/%s", c.Registry, ModuleBlobsPrefix)
reg, err := remote.NewRepository(ref)
if err != nil {
return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err)
Expand All @@ -74,34 +79,40 @@ func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerSe
}

return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
host: c.Registry,
repoFactory: repoFactory,
Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService {
return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
Handle: h,
db: sql.New(h.Connection),
host: c.Registry,
repoFactory: repoFactory,
Handle: h,
db: sql.New(h.Connection),
}
}),
}
}

func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
repo, err := s.repoFactory()
if err != nil {
return nil, nil, fmt.Errorf("unable to connect to container registry '%s': %w", s.host, err)
}
set := make(map[sha256.SHA256]bool)
for _, d := range digests {
set[d] = true
}
modules, err := s.DiscoverModuleArtefacts(ctx)
if err != nil {
return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err)
}
keys = make([]ArtefactKey, 0)
for _, m := range modules {
if set[m.ModuleDigest] {
keys = append(keys, ArtefactKey{Digest: m.ModuleDigest})
delete(set, m.ModuleDigest)
blobs := repo.Blobs()
for _, d := range digests {
_, err := blobs.Resolve(ctx, fmt.Sprintf("sha256:%s", d))
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
continue
}
return nil, nil, fmt.Errorf("unable to resolve digest '%s': %w", d, err)
}
keys = append(keys, ArtefactKey{Digest: d})
delete(set, d)
}
missing = make([]sha256.SHA256, 0)
for d := range set {
Expand All @@ -110,43 +121,22 @@ func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.
return keys, missing, nil
}

// Upload uploads the specific artifact as a raw blob and links it to a manifest to prevent GC
func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
ref := fmt.Sprintf("ftl/modules/%s", artefact.Digest)
ms := memory.New()
mediaDescriptor := v1.Descriptor{
MediaType: "application/ftl.module.v1",
Digest: digest.NewDigestFromBytes(digest.SHA256, artefact.Digest[:]),
Size: int64(len(artefact.Content)),
}
err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content))
repo, err := s.repoFactory()
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err)
}
artifactType := "application/ftl.module.artifact"
opts := oras.PackManifestOptions{
Layers: []v1.Descriptor{mediaDescriptor},
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s': %w", s.host, err)
}
tag := "latest"
manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err)
}
if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err)
}
repo, err := s.repoConnectionBuilder(ref)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err)
}
if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err)
desc := content.NewDescriptorFromBytes("application/x-octet-stream", artefact.Content)
if err = repo.Push(ctx, desc, bytes.NewReader(artefact.Content)); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to upload module blob to repository: %w", err)
}
return artefact.Digest, nil
}

func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
ref := createModuleRepositoryPathFromDigest(digest)
registry, err := s.repoConnectionBuilder(ref)
registry, err := s.repoFactory()
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err)
}
Expand All @@ -157,58 +147,6 @@ func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (
return stream, nil
}

func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) {
return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix)
}

func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) {
registry, err := remote.NewRegistry(s.host)
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err)
}
registry.PlainHTTP = true
result := make([]ArtefactRepository, 0)
err = registry.Repositories(ctx, "", func(repos []string) error {
for _, path := range repos {
if !strings.HasPrefix(path, prefix) {
continue
}
d, err := getDigestFromModuleRepositoryPath(path)
if err != nil {
return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err)
}
repo, err := registry.Repository(ctx, path)
if err != nil {
return fmt.Errorf("unable to connect to repository '%s': %w", path, err)
}
desc, err := repo.Resolve(ctx, "latest")
if err != nil {
return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err)
}
_, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions)
if err != nil {
return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err)
}
var manifest v1.Manifest
if err := json.Unmarshal(data, &manifest); err != nil {
return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err)
}
result = append(result, ArtefactRepository{
ModuleDigest: d,
MediaType: manifest.Layers[0].MediaType,
ArtefactType: manifest.ArtifactType,
RepositoryDigest: desc.Digest,
Size: desc.Size,
})
}
return nil
})
if err != nil {
return nil, fmt.Errorf("unable to discover artefacts: %w", err)
}
return result, nil
}

func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}
Expand All @@ -219,23 +157,10 @@ func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.Dep

// createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root
func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:]))
return fmt.Sprintf("%s/%s:latest", ModuleBlobsPrefix, hex.EncodeToString(digest[:]))
}

// createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository
func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest))
}

// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/<digest>:latest
func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) {
slash := strings.LastIndex(repository, "/")
if slash == -1 {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository)
}
d, err := sha256.ParseSHA256(repository[slash+1:])
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err)
}
return d, nil
}
32 changes: 17 additions & 15 deletions frontend/cli/cmd_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/TBD54566975/ftl/backend/controller/artefacts"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
sh "github.com/TBD54566975/ftl/internal/sha256"
"github.com/TBD54566975/ftl/internal/slices"
)

type releaseCmd struct {
Expand All @@ -18,7 +20,7 @@ type releaseCmd struct {
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"`

Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."`
List releaseListCmd `cmd:"" help:"Lists all published releases."`
Exists releaseExistsCmd `cmd:"" help:"Indicates whether modules, with the specified digests, have been published."`
}

type releasePublishCmd struct {
Expand All @@ -45,30 +47,30 @@ func (d *releasePublishCmd) Run(release *releaseCmd) error {
return nil
}

type releaseListCmd struct {
type releaseExistsCmd struct {
Digests []string `help:"Digest sha256:hex" default:""`
}

func (d *releaseListCmd) Run(release *releaseCmd) error {
func (d *releaseExistsCmd) Run(release *releaseCmd) error {
svc, err := createContainerService(release)
if err != nil {
return fmt.Errorf("failed to create container service: %w", err)
}
modules, err := svc.DiscoverModuleArtefacts(context.Background())
digests := slices.Map(slices.Unique(d.Digests), sh.MustParseSHA256)
keys, missing, err := svc.GetDigestsKeys(context.Background(), digests)
if err != nil {
return fmt.Errorf("failed to discover module artefacts: %w", err)
return fmt.Errorf("failed to get keys: %w", err)
}
if len(modules) == 0 {
fmt.Println("No module artefacts found.")
return nil
fmt.Printf("\033[31m%d\033[0m FTL module blobs located\n", len(keys))
for i, key := range keys {
fmt.Printf(" \u001B[34m%02d\u001B[0m - sha256:\u001B[32m%s\u001B[0m\n", i+1, key.Digest)
}

format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n"
fmt.Printf("Found %d module artefacts:\n", len(modules))
for i, m := range modules {
fmt.Printf("\033[31m Artefact %d\033[0m\n", i)
fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType)
if len(missing) > 0 {
fmt.Printf("\033[31m%d\033[0m FTL module blobs keys \033[31mnot found\033[0m\n", len(missing))
for i, key := range missing {
fmt.Printf(" \u001B[34m%02d\u001B[0m - sha256:\u001B[31m%s\u001B[0m\n", i+1, key)
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ require (
github.com/mattn/go-isatty v0.0.20
github.com/multiformats/go-base36 v0.2.0
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
github.com/otiai10/copy v1.14.0
github.com/posener/complete v1.2.3
github.com/radovskyb/watcher v1.0.7
Expand Down Expand Up @@ -134,6 +133,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/onsi/gomega v1.33.1 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
Expand Down