diff --git a/backend/controller/artefacts/oci_registry.go b/backend/controller/artefacts/oci_registry.go index b8cdb881d3..5180870203 100644 --- a/backend/controller/artefacts/oci_registry.go +++ b/backend/controller/artefacts/oci_registry.go @@ -4,15 +4,14 @@ import ( "bytes" "context" "encoding/hex" - "encoding/json" + "errors" "fmt" "io" - "strings" "github.com/opencontainers/go-digest" - v1 "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2" - "oras.land/oras-go/v2/content/memory" + "oras.land/oras-go/v2/content" + "oras.land/oras-go/v2/errdef" "oras.land/oras-go/v2/registry/remote" "oras.land/oras-go/v2/registry/remote/auth" "oras.land/oras-go/v2/registry/remote/retry" @@ -24,7 +23,7 @@ import ( ) const ( - ModuleArtifactPrefix = "ftl/modules/" + ModuleBlobsPrefix = "ftl/modules/" ) type ContainerConfig struct { @@ -35,8 +34,8 @@ type ContainerConfig struct { } type ContainerService struct { - host string - repoConnectionBuilder func(container string) (*remote.Repository, error) + host string + repoFactory func() (*remote.Repository, error) // in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table Handle *libdal.Handle[ContainerService] @@ -51,10 +50,16 @@ type ArtefactRepository struct { Size int64 } +type ArtefactBlobs struct { + Digest sha256.SHA256 + MediaType string + Size int64 +} + func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService { // Connect the registry targeting the specified container - repoConnectionBuilder := func(path string) (*remote.Repository, error) { - ref := fmt.Sprintf("%s/%s", c.Registry, path) + repoFactory := func() (*remote.Repository, error) { + ref := fmt.Sprintf("%s/%s", c.Registry, ModuleBlobsPrefix) reg, err := remote.NewRepository(ref) if err != nil { return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err) @@ -74,34 +79,40 @@ func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerSe } return &ContainerService{ - host: c.Registry, - repoConnectionBuilder: repoConnectionBuilder, + host: c.Registry, + repoFactory: repoFactory, Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService { return &ContainerService{ - host: c.Registry, - repoConnectionBuilder: repoConnectionBuilder, - Handle: h, - db: sql.New(h.Connection), + host: c.Registry, + repoFactory: repoFactory, + Handle: h, + db: sql.New(h.Connection), } }), } } func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) { + repo, err := s.repoFactory() + if err != nil { + return nil, nil, fmt.Errorf("unable to connect to container registry '%s': %w", s.host, err) + } set := make(map[sha256.SHA256]bool) for _, d := range digests { set[d] = true } - modules, err := s.DiscoverModuleArtefacts(ctx) - if err != nil { - return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err) - } keys = make([]ArtefactKey, 0) - for _, m := range modules { - if set[m.ModuleDigest] { - keys = append(keys, ArtefactKey{Digest: m.ModuleDigest}) - delete(set, m.ModuleDigest) + blobs := repo.Blobs() + for _, d := range digests { + _, err := blobs.Resolve(ctx, fmt.Sprintf("sha256:%s", d)) + if err != nil { + if errors.Is(err, errdef.ErrNotFound) { + continue + } + return nil, nil, fmt.Errorf("unable to resolve digest '%s': %w", d, err) } + keys = append(keys, ArtefactKey{Digest: d}) + delete(set, d) } missing = make([]sha256.SHA256, 0) for d := range set { @@ -110,43 +121,22 @@ func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256. return keys, missing, nil } +// Upload uploads the specific artifact as a raw blob and links it to a manifest to prevent GC func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) { - ref := fmt.Sprintf("ftl/modules/%s", artefact.Digest) - ms := memory.New() - mediaDescriptor := v1.Descriptor{ - MediaType: "application/ftl.module.v1", - Digest: digest.NewDigestFromBytes(digest.SHA256, artefact.Digest[:]), - Size: int64(len(artefact.Content)), - } - err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content)) + repo, err := s.repoFactory() if err != nil { - return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err) - } - artifactType := "application/ftl.module.artifact" - opts := oras.PackManifestOptions{ - Layers: []v1.Descriptor{mediaDescriptor}, + return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s': %w", s.host, err) } - tag := "latest" - manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts) - if err != nil { - return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err) - } - if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil { - return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err) - } - repo, err := s.repoConnectionBuilder(ref) - if err != nil { - return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err) - } - if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil { - return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err) + desc := content.NewDescriptorFromBytes("application/x-octet-stream", artefact.Content) + if err = repo.Push(ctx, desc, bytes.NewReader(artefact.Content)); err != nil { + return sha256.SHA256{}, fmt.Errorf("unable to upload module blob to repository: %w", err) } return artefact.Digest, nil } func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) { ref := createModuleRepositoryPathFromDigest(digest) - registry, err := s.repoConnectionBuilder(ref) + registry, err := s.repoFactory() if err != nil { return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err) } @@ -157,58 +147,6 @@ func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) ( return stream, nil } -func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) { - return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix) -} - -func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) { - registry, err := remote.NewRegistry(s.host) - if err != nil { - return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err) - } - registry.PlainHTTP = true - result := make([]ArtefactRepository, 0) - err = registry.Repositories(ctx, "", func(repos []string) error { - for _, path := range repos { - if !strings.HasPrefix(path, prefix) { - continue - } - d, err := getDigestFromModuleRepositoryPath(path) - if err != nil { - return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err) - } - repo, err := registry.Repository(ctx, path) - if err != nil { - return fmt.Errorf("unable to connect to repository '%s': %w", path, err) - } - desc, err := repo.Resolve(ctx, "latest") - if err != nil { - return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err) - } - _, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions) - if err != nil { - return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err) - } - var manifest v1.Manifest - if err := json.Unmarshal(data, &manifest); err != nil { - return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err) - } - result = append(result, ArtefactRepository{ - ModuleDigest: d, - MediaType: manifest.Layers[0].MediaType, - ArtefactType: manifest.ArtifactType, - RepositoryDigest: desc.Digest, - Size: desc.Size, - }) - } - return nil - }) - if err != nil { - return nil, fmt.Errorf("unable to discover artefacts: %w", err) - } - return result, nil -} - func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) { return getDatabaseReleaseArtefacts(ctx, s.db, releaseID) } @@ -219,23 +157,10 @@ func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.Dep // createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string { - return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:])) + return fmt.Sprintf("%s/%s:latest", ModuleBlobsPrefix, hex.EncodeToString(digest[:])) } // createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string { return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest)) } - -// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/:latest -func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) { - slash := strings.LastIndex(repository, "/") - if slash == -1 { - return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository) - } - d, err := sha256.ParseSHA256(repository[slash+1:]) - if err != nil { - return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err) - } - return d, nil -} diff --git a/frontend/cli/cmd_release.go b/frontend/cli/cmd_release.go index 0c5f23f525..8b2def3b8d 100644 --- a/frontend/cli/cmd_release.go +++ b/frontend/cli/cmd_release.go @@ -9,6 +9,8 @@ import ( "github.com/TBD54566975/ftl/backend/controller/artefacts" internalobservability "github.com/TBD54566975/ftl/internal/observability" + sh "github.com/TBD54566975/ftl/internal/sha256" + "github.com/TBD54566975/ftl/internal/slices" ) type releaseCmd struct { @@ -18,7 +20,7 @@ type releaseCmd struct { MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"` Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."` - List releaseListCmd `cmd:"" help:"Lists all published releases."` + Exists releaseExistsCmd `cmd:"" help:"Indicates whether modules, with the specified digests, have been published."` } type releasePublishCmd struct { @@ -45,30 +47,30 @@ func (d *releasePublishCmd) Run(release *releaseCmd) error { return nil } -type releaseListCmd struct { +type releaseExistsCmd struct { + Digests []string `help:"Digest sha256:hex" default:""` } -func (d *releaseListCmd) Run(release *releaseCmd) error { +func (d *releaseExistsCmd) Run(release *releaseCmd) error { svc, err := createContainerService(release) if err != nil { return fmt.Errorf("failed to create container service: %w", err) } - modules, err := svc.DiscoverModuleArtefacts(context.Background()) + digests := slices.Map(slices.Unique(d.Digests), sh.MustParseSHA256) + keys, missing, err := svc.GetDigestsKeys(context.Background(), digests) if err != nil { - return fmt.Errorf("failed to discover module artefacts: %w", err) + return fmt.Errorf("failed to get keys: %w", err) } - if len(modules) == 0 { - fmt.Println("No module artefacts found.") - return nil + fmt.Printf("\033[31m%d\033[0m FTL module blobs located\n", len(keys)) + for i, key := range keys { + fmt.Printf(" \u001B[34m%02d\u001B[0m - sha256:\u001B[32m%s\u001B[0m\n", i+1, key.Digest) } - - format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n" - fmt.Printf("Found %d module artefacts:\n", len(modules)) - for i, m := range modules { - fmt.Printf("\033[31m Artefact %d\033[0m\n", i) - fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType) + if len(missing) > 0 { + fmt.Printf("\033[31m%d\033[0m FTL module blobs keys \033[31mnot found\033[0m\n", len(missing)) + for i, key := range missing { + fmt.Printf(" \u001B[34m%02d\u001B[0m - sha256:\u001B[31m%s\u001B[0m\n", i+1, key) + } } - return nil } diff --git a/go.mod b/go.mod index 704b62c616..1afc8e73f1 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,6 @@ require ( github.com/mattn/go-isatty v0.0.20 github.com/multiformats/go-base36 v0.2.0 github.com/opencontainers/go-digest v1.0.0 - github.com/opencontainers/image-spec v1.1.0 github.com/otiai10/copy v1.14.0 github.com/posener/complete v1.2.3 github.com/radovskyb/watcher v1.0.7 @@ -134,6 +133,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/onsi/gomega v1.33.1 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect