generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: introducing oci artifact registry impl (#2989)
fixes #2954 Introduces the OCI implementation of the module registry. The controller still uses the database module registry; so to exercise the change the `ftl release` commands have been introduced to demonstrate pushing module artifacts to the registry and discovering them (and metadata). The CLI implementations will change with the subsequent controller integration work. --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
e563ce1
commit b4606de
Showing
8 changed files
with
351 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
.hermit/ | ||
.vscode/* | ||
.registry/ | ||
!/.vscode/extensions.json | ||
!/.vscode/settings.json | ||
!/.vscode/launch.json | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
package artefacts | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/hex" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"strings" | ||
|
||
"github.com/opencontainers/go-digest" | ||
v1 "github.com/opencontainers/image-spec/specs-go/v1" | ||
"oras.land/oras-go/v2" | ||
"oras.land/oras-go/v2/content/memory" | ||
"oras.land/oras-go/v2/registry/remote" | ||
"oras.land/oras-go/v2/registry/remote/auth" | ||
"oras.land/oras-go/v2/registry/remote/retry" | ||
|
||
"github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql" | ||
"github.com/TBD54566975/ftl/backend/libdal" | ||
"github.com/TBD54566975/ftl/internal/model" | ||
"github.com/TBD54566975/ftl/internal/sha256" | ||
) | ||
|
||
const ( | ||
ModuleArtifactPrefix = "ftl/modules/" | ||
) | ||
|
||
type ContainerConfig struct { | ||
Registry string `help:"OCI container registry host:port" env:"FTL_ARTEFACTS_REGISTRY"` | ||
Username string `help:"OCI container registry username" env:"FTL_ARTEFACTS_USER"` | ||
Password string `help:"OCI container registry password" env:"FTL_ARTEFACTS_PWD"` | ||
AllowPlainHTTP bool `help:"Allows OCI container requests to accept plain HTTP responses" env:"FTL_ARTEFACTS_ALLOW_HTTP"` | ||
} | ||
|
||
type ContainerService struct { | ||
host string | ||
repoConnectionBuilder func(container string) (*remote.Repository, error) | ||
|
||
// in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table | ||
Handle *libdal.Handle[ContainerService] | ||
db sql.Querier | ||
} | ||
|
||
type ArtefactRepository struct { | ||
ModuleDigest sha256.SHA256 | ||
MediaType string | ||
ArtefactType string | ||
RepositoryDigest digest.Digest | ||
Size int64 | ||
} | ||
|
||
func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService { | ||
// Connect the registry targeting the specified container | ||
repoConnectionBuilder := func(path string) (*remote.Repository, error) { | ||
ref := fmt.Sprintf("%s/%s", c.Registry, path) | ||
reg, err := remote.NewRepository(ref) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err) | ||
} | ||
|
||
reg.Client = &auth.Client{ | ||
Client: retry.DefaultClient, | ||
Cache: auth.NewCache(), | ||
Credential: auth.StaticCredential(c.Registry, auth.Credential{ | ||
Username: c.Username, | ||
Password: c.Password, | ||
}), | ||
} | ||
reg.PlainHTTP = c.AllowPlainHTTP | ||
|
||
return reg, nil | ||
} | ||
|
||
return &ContainerService{ | ||
host: c.Registry, | ||
repoConnectionBuilder: repoConnectionBuilder, | ||
Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService { | ||
return &ContainerService{ | ||
host: c.Registry, | ||
repoConnectionBuilder: repoConnectionBuilder, | ||
Handle: h, | ||
db: sql.New(h.Connection), | ||
} | ||
}), | ||
} | ||
} | ||
|
||
func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) { | ||
set := make(map[sha256.SHA256]bool) | ||
for _, d := range digests { | ||
set[d] = true | ||
} | ||
modules, err := s.DiscoverModuleArtefacts(ctx) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err) | ||
} | ||
keys = make([]ArtefactKey, 0) | ||
for _, m := range modules { | ||
if set[m.ModuleDigest] { | ||
keys = append(keys, ArtefactKey{Digest: m.ModuleDigest}) | ||
delete(set, m.ModuleDigest) | ||
} | ||
} | ||
missing = make([]sha256.SHA256, 0) | ||
for d := range set { | ||
missing = append(missing, d) | ||
} | ||
return keys, missing, nil | ||
} | ||
|
||
func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) { | ||
ref := fmt.Sprintf("ftl/modules/%s", artefact.Digest) | ||
ms := memory.New() | ||
mediaDescriptor := v1.Descriptor{ | ||
MediaType: "application/ftl.module.v1", | ||
Digest: digest.NewDigestFromBytes(digest.SHA256, artefact.Digest[:]), | ||
Size: int64(len(artefact.Content)), | ||
} | ||
err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content)) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err) | ||
} | ||
artifactType := "application/ftl.module.artifact" | ||
opts := oras.PackManifestOptions{ | ||
Layers: []v1.Descriptor{mediaDescriptor}, | ||
} | ||
tag := "latest" | ||
manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err) | ||
} | ||
if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err) | ||
} | ||
repo, err := s.repoConnectionBuilder(ref) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err) | ||
} | ||
if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err) | ||
} | ||
return artefact.Digest, nil | ||
} | ||
|
||
func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) { | ||
ref := createModuleRepositoryPathFromDigest(digest) | ||
registry, err := s.repoConnectionBuilder(ref) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err) | ||
} | ||
_, stream, err := oras.Fetch(ctx, registry, createModuleRepositoryReferenceFromDigest(s.host, digest), oras.DefaultFetchOptions) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to download artefact: %w", err) | ||
} | ||
return stream, nil | ||
} | ||
|
||
func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) { | ||
return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix) | ||
} | ||
|
||
func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) { | ||
registry, err := remote.NewRegistry(s.host) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err) | ||
} | ||
registry.PlainHTTP = true | ||
result := make([]ArtefactRepository, 0) | ||
err = registry.Repositories(ctx, "", func(repos []string) error { | ||
for _, path := range repos { | ||
if !strings.HasPrefix(path, prefix) { | ||
continue | ||
} | ||
d, err := getDigestFromModuleRepositoryPath(path) | ||
if err != nil { | ||
return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err) | ||
} | ||
repo, err := registry.Repository(ctx, path) | ||
if err != nil { | ||
return fmt.Errorf("unable to connect to repository '%s': %w", path, err) | ||
} | ||
desc, err := repo.Resolve(ctx, "latest") | ||
if err != nil { | ||
return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err) | ||
} | ||
_, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions) | ||
if err != nil { | ||
return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err) | ||
} | ||
var manifest v1.Manifest | ||
if err := json.Unmarshal(data, &manifest); err != nil { | ||
return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err) | ||
} | ||
result = append(result, ArtefactRepository{ | ||
ModuleDigest: d, | ||
MediaType: manifest.Layers[0].MediaType, | ||
ArtefactType: manifest.ArtifactType, | ||
RepositoryDigest: desc.Digest, | ||
Size: desc.Size, | ||
}) | ||
} | ||
return nil | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to discover artefacts: %w", err) | ||
} | ||
return result, nil | ||
} | ||
|
||
func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) { | ||
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID) | ||
} | ||
|
||
func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error { | ||
return addReleaseArtefacts(ctx, s.db, key, ra) | ||
} | ||
|
||
// createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root | ||
func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string { | ||
return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:])) | ||
} | ||
|
||
// createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository | ||
func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string { | ||
return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest)) | ||
} | ||
|
||
// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/<digest>:latest | ||
func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) { | ||
slash := strings.LastIndex(repository, "/") | ||
if slash == -1 { | ||
return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository) | ||
} | ||
d, err := sha256.ParseSHA256(repository[slash+1:]) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err) | ||
} | ||
return d, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"crypto/sha256" | ||
"fmt" | ||
|
||
"github.com/google/uuid" | ||
|
||
"github.com/TBD54566975/ftl/backend/controller/artefacts" | ||
internalobservability "github.com/TBD54566975/ftl/internal/observability" | ||
) | ||
|
||
type releaseCmd struct { | ||
Registry string `help:"Registry host:port" default:"127.0.0.1:5001"` | ||
DSN string `help:"DAL DSN." default:"postgres://127.0.0.1:15432/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"` | ||
MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"` | ||
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"` | ||
|
||
Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."` | ||
List releaseListCmd `cmd:"" help:"Lists all published releases."` | ||
} | ||
|
||
type releasePublishCmd struct { | ||
} | ||
|
||
func (d *releasePublishCmd) Run(release *releaseCmd) error { | ||
svc, err := createContainerService(release) | ||
if err != nil { | ||
return fmt.Errorf("failed to create container service: %w", err) | ||
} | ||
content := uuid.New() | ||
contentBytes := content[:] | ||
hash, err := svc.Upload(context.Background(), artefacts.Artefact{ | ||
Digest: sha256.Sum256(contentBytes), | ||
Metadata: artefacts.Metadata{ | ||
Path: fmt.Sprintf("random/%s", content), | ||
}, | ||
Content: contentBytes, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed upload artefact: %w", err) | ||
} | ||
fmt.Printf("Artefact published with hash: \033[31m%s\033[0m\n", hash) | ||
return nil | ||
} | ||
|
||
type releaseListCmd struct { | ||
} | ||
|
||
func (d *releaseListCmd) Run(release *releaseCmd) error { | ||
svc, err := createContainerService(release) | ||
if err != nil { | ||
return fmt.Errorf("failed to create container service: %w", err) | ||
} | ||
modules, err := svc.DiscoverModuleArtefacts(context.Background()) | ||
if err != nil { | ||
return fmt.Errorf("failed to discover module artefacts: %w", err) | ||
} | ||
if len(modules) == 0 { | ||
fmt.Println("No module artefacts found.") | ||
return nil | ||
} | ||
|
||
format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n" | ||
fmt.Printf("Found %d module artefacts:\n", len(modules)) | ||
for i, m := range modules { | ||
fmt.Printf("\033[31m Artefact %d\033[0m\n", i) | ||
fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func createContainerService(release *releaseCmd) (*artefacts.ContainerService, error) { | ||
conn, err := internalobservability.OpenDBAndInstrument(release.DSN) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to open DB connection: %w", err) | ||
} | ||
conn.SetMaxIdleConns(release.MaxIdleDBConnections) | ||
conn.SetMaxOpenConns(release.MaxOpenDBConnections) | ||
|
||
return artefacts.NewContainerService(artefacts.ContainerConfig{ | ||
Registry: release.Registry, | ||
AllowPlainHTTP: true, | ||
}, conn), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.