Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introducing oci artifact registry impl #2989

Merged
merged 17 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.hermit/
.vscode/*
.registry/
!/.vscode/extensions.json
!/.vscode/settings.json
!/.vscode/launch.json
Expand Down
12 changes: 10 additions & 2 deletions backend/controller/artefacts/dal_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCl
}

func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
rows, err := s.db.GetDeploymentArtefacts(ctx, releaseID)
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}

func getDatabaseReleaseArtefacts(ctx context.Context, db sql.Querier, releaseID int64) ([]ReleaseArtefact, error) {
rows, err := db.GetDeploymentArtefacts(ctx, releaseID)
if err != nil {
return nil, fmt.Errorf("unable to get release artefacts: %w", libdal.TranslatePGError(err))
}
Expand All @@ -90,7 +94,11 @@ func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]R
}

func (s *Service) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
err := s.db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{
return addReleaseArtefacts(ctx, s.db, key, ra)
}

func addReleaseArtefacts(ctx context.Context, db sql.Querier, key model.DeploymentKey, ra ReleaseArtefact) error {
err := db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{
Key: key,
Digest: ra.Artefact.Digest[:],
Executable: ra.Executable,
Expand Down
242 changes: 242 additions & 0 deletions backend/controller/artefacts/oci_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package artefacts

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"strings"

"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2"
"oras.land/oras-go/v2/content/memory"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"

"github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/sha256"
)

const (
ModuleArtifactPrefix = "ftl/modules/"
)

type ContainerConfig struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the typical pattern for config is to tag them with Kong tags, and embed them in the CLI so they can be automatically passed through from the CLI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Registry string
Username string
Password string
AllowPlainHTTP bool
}

type ContainerService struct {
host string
repoConnectionBuilder func(container string) (*remote.Repository, error)

// in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table
Handle *libdal.Handle[ContainerService]
db sql.Querier
}

type ArtefactRepository struct {
ModuleDigest sha256.SHA256
MediaType string
ArtefactType string
RepositoryDigest digest.Digest
Size int64
}

func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService {
// Connect the registry targeting the specified container
repoConnectionBuilder := func(path string) (*remote.Repository, error) {
ref := fmt.Sprintf("%s/%s", c.Registry, path)
reg, err := remote.NewRepository(ref)
if err != nil {
return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err)
}

reg.Client = &auth.Client{
Client: retry.DefaultClient,
Cache: auth.NewCache(),
Credential: auth.StaticCredential(c.Registry, auth.Credential{
Username: c.Username,
Password: c.Password,
}),
}
reg.PlainHTTP = c.AllowPlainHTTP

return reg, nil
}

return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService {
return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
Handle: h,
db: sql.New(h.Connection),
}
}),
}
}

func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
set := make(map[sha256.SHA256]bool)
for _, d := range digests {
set[d] = true
}
modules, err := s.DiscoverModuleArtefacts(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be explicitly checking the registry for each digest, not listing the full registry then iterating over the list. Ideally Oras should be sending a HEAD request to /v2/<name>/blobs/<digest> for each artifact to check.

Even then I am not sure if this is the right approach, we can still store in the database which artifact we know about, as for JVM applications even the HEAD approach will likely be slower than we would want.

We could still potentially use the HEAD approach if we don't know about the artifact, as it would allow us to re-use jars from other clusters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great call and thanks for catching it at this stage. Some rethinking is due here, this ticket will track that.
#3076

if err != nil {
return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err)
}
keys = make([]ArtefactKey, 0)
for _, m := range modules {
if set[m.ModuleDigest] {
keys = append(keys, ArtefactKey{Digest: m.ModuleDigest})
delete(set, m.ModuleDigest)
}
}
missing = make([]sha256.SHA256, 0)
for d := range set {
missing = append(missing, d)
}
return keys, missing, nil
}

func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
hash := sha256.Sum(artefact.Content)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't Artefact already have a sha256 sum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right you are, using that instead

ref := fmt.Sprintf("ftl/modules/%s", hash)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a config option. Basically the general location format should probably be something like <host>/<configured-repo>:ftl-artifact-<digest>.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design doc should probably be updated to cover exactly how these artifacts are stored in the registry in terms of the naming scheme.

Copy link
Contributor Author

@jonathanj-square jonathanj-square Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, I need to be more explicit about how release and module compilation artifacts (binaries, jars, schema DSLs) get mapped to OCI format concepts and their layout:

Created a tracking item here: #3077

ms := memory.New()
mediaDescriptor := v1.Descriptor{
MediaType: "application/ftl.module.v1",
Digest: digest.NewDigestFromBytes(digest.SHA256, hash[:]),
Size: int64(len(artefact.Content)),
}
err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content))
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err)
}
artifactType := "application/ftl.module.artifact"
opts := oras.PackManifestOptions{
Layers: []v1.Descriptor{mediaDescriptor},
}
tag := "latest"
manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err)
}
if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err)
}
repo, err := s.repoConnectionBuilder(ref)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err)
}
if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err)
}
return hash, nil
}

func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
ref := createModuleRepositoryPathFromDigest(digest)
registry, err := s.repoConnectionBuilder(ref)
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err)
}
_, stream, err := oras.Fetch(ctx, registry, createModuleRepositoryReferenceFromDigest(s.host, digest), oras.DefaultFetchOptions)
if err != nil {
return nil, fmt.Errorf("unable to download artefact: %w", err)
}
return stream, nil
}

func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) {
return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix)
}

func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) {
registry, err := remote.NewRegistry(s.host)
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err)
}
registry.PlainHTTP = true
result := make([]ArtefactRepository, 0)
err = registry.Repositories(ctx, "", func(repos []string) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this iterating over all repositories? That won't be scalable will it? eg. in Cash's cloud there will be thousands if not hundreds of thousands of repositories.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this operation is actually part of the OCI standard either. The underlying implementation queries "%s://%s/v2/_catalog" which is not part of the OCI distribution spec.

Either way there should be a single configured repository where we upload these artifacts, and we should not rely on the discovery APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue to track fully spec'ing out the approach.

for _, path := range repos {
if !strings.HasPrefix(path, prefix) {
continue
}
d, err := getDigestFromModuleRepositoryPath(path)
if err != nil {
return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err)
}
repo, err := registry.Repository(ctx, path)
if err != nil {
return fmt.Errorf("unable to connect to repository '%s': %w", path, err)
}
desc, err := repo.Resolve(ctx, "latest")
if err != nil {
return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err)
}
_, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions)
if err != nil {
return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err)
}
var manifest v1.Manifest
if err := json.Unmarshal(data, &manifest); err != nil {
return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err)
}
result = append(result, ArtefactRepository{
ModuleDigest: d,
MediaType: manifest.Layers[0].MediaType,
ArtefactType: manifest.ArtifactType,
RepositoryDigest: desc.Digest,
Size: desc.Size,
})
}
return nil
})
if err != nil {
return nil, fmt.Errorf("unable to discover artefacts: %w", err)
}
return result, nil
}

func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}

func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
return addReleaseArtefacts(ctx, s.db, key, ra)
}

// createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root
func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:]))
}

// createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository
func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest))
}

// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/<digest>:latest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be /ftl/modules/<modul>/<digest> no?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this path actually defined BTW, I don't see it anywhere.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above, but basically this should be a config option and not per module so we can re-use artifacts.

func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) {
slash := strings.LastIndex(repository, "/")
if slash == -1 {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository)
}
d, err := sha256.ParseSHA256(repository[slash+1:])
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err)
}
return d, nil
}
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ services:
environment:
SERVICES: secretsmanager
DEBUG: 1
registry:
image: registry:2
ports:
- "5001:5000"
volumes:
- ./.registry:/var/lib/registry

volumes:
grafana-storage: {}
90 changes: 90 additions & 0 deletions frontend/cli/cmd_release.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"context"
"crypto/sha256"
"fmt"

"github.com/google/uuid"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
)

type releaseCmd struct {
Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."`
List releaseListCmd `cmd:"" help:"Lists all published releases."`
}

type releasePublishCmd struct {
Registry string `help:"Registry host:port" default:"127.0.0.1:5001"`
DSN string `help:"DAL DSN." default:"postgres://127.0.0.1:15432/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"`
MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"`
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"`
}

func (d *releasePublishCmd) Run() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to do 'releases' to a container registry we will need to push some kind of manifest with the contents of the release, as releases are multiple files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, the transition to the OCI registries involves multiple steps, the first is to simply dump the payloads there instead of the database. from there it will be refined to take advantage of the container formats capabilities

svc, err := createContainerService(d.DSN, d.Registry, d.MaxOpenDBConnections, d.MaxIdleDBConnections)
if err != nil {
return fmt.Errorf("failed to create container service: %w", err)
}
content := uuid.New()
contentBytes := content[:]
hash, err := svc.Upload(context.Background(), artefacts.Artefact{
Digest: sha256.Sum256(contentBytes),
Metadata: artefacts.Metadata{
Path: fmt.Sprintf("random/%s", content),
},
Content: contentBytes,
})
if err != nil {
return fmt.Errorf("failed upload artefact: %w", err)
}
fmt.Printf("Artefact published with hash: \033[31m%s\033[0m\n", hash)
return nil
}

type releaseListCmd struct {
Registry string `help:"Registry host:port" default:"127.0.0.1:5001"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These all seem to be shared, just put them in releaseCmd and inject that into the Run() method as a parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat, updated

DSN string `help:"DAL DSN." default:"postgres://127.0.0.1:15432/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"`
MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"`
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"`
}

func (d *releaseListCmd) Run() error {
svc, err := createContainerService(d.DSN, d.Registry, d.MaxOpenDBConnections, d.MaxIdleDBConnections)
if err != nil {
return fmt.Errorf("failed to create container service: %w", err)
}
modules, err := svc.DiscoverModuleArtefacts(context.Background())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would basically translate to the OCI list operation, which is often really slow and not scalable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, captured in #3076

if err != nil {
return fmt.Errorf("failed to discover module artefacts: %w", err)
}
if len(modules) == 0 {
fmt.Println("No module artefacts found.")
return nil
}

format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n"
fmt.Printf("Found %d module artefacts:\n", len(modules))
for i, m := range modules {
fmt.Printf("\033[31m Artefact %d\033[0m\n", i)
fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType)
}

return nil
}

func createContainerService(dsn string, reg string, maxOpenConn int, maxIdleCon int) (*artefacts.ContainerService, error) {
conn, err := internalobservability.OpenDBAndInstrument(dsn)
if err != nil {
return nil, fmt.Errorf("failed to open DB connection: %w", err)
}
conn.SetMaxIdleConns(maxIdleCon)
conn.SetMaxOpenConns(maxOpenConn)

return artefacts.NewContainerService(artefacts.ContainerConfig{
Registry: reg,
AllowPlainHTTP: true,
}, conn), nil
}
1 change: 1 addition & 0 deletions frontend/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type InteractiveCLI struct {
Secret secretCmd `cmd:"" help:"Manage secrets."`
Config configCmd `cmd:"" help:"Manage configuration."`
Pubsub pubsubCmd `cmd:"" help:"Manage pub/sub."`
Release releaseCmd `cmd:"" help:"Manage releases."`
}

type CLI struct {
Expand Down
Loading