-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introducing oci artifact registry impl #2989
Changes from all commits
63b6c30
e7a480d
02d4279
3ae28f8
66552ff
aa8f92c
941e5a3
bc52571
9521ee0
ea77771
2d0618a
6b87750
2cc617a
1d48172
f026f30
3d793c4
0a27669
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
.hermit/ | ||
.vscode/* | ||
.registry/ | ||
!/.vscode/extensions.json | ||
!/.vscode/settings.json | ||
!/.vscode/launch.json | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
package artefacts | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/hex" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"strings" | ||
|
||
"github.com/opencontainers/go-digest" | ||
v1 "github.com/opencontainers/image-spec/specs-go/v1" | ||
"oras.land/oras-go/v2" | ||
"oras.land/oras-go/v2/content/memory" | ||
"oras.land/oras-go/v2/registry/remote" | ||
"oras.land/oras-go/v2/registry/remote/auth" | ||
"oras.land/oras-go/v2/registry/remote/retry" | ||
|
||
"github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql" | ||
"github.com/TBD54566975/ftl/backend/libdal" | ||
"github.com/TBD54566975/ftl/internal/model" | ||
"github.com/TBD54566975/ftl/internal/sha256" | ||
) | ||
|
||
const ( | ||
ModuleArtifactPrefix = "ftl/modules/" | ||
) | ||
|
||
type ContainerConfig struct { | ||
Registry string `help:"OCI container registry host:port" env:"FTL_ARTEFACTS_REGISTRY"` | ||
Username string `help:"OCI container registry username" env:"FTL_ARTEFACTS_USER"` | ||
Password string `help:"OCI container registry password" env:"FTL_ARTEFACTS_PWD"` | ||
AllowPlainHTTP bool `help:"Allows OCI container requests to accept plain HTTP responses" env:"FTL_ARTEFACTS_ALLOW_HTTP"` | ||
} | ||
|
||
type ContainerService struct { | ||
host string | ||
repoConnectionBuilder func(container string) (*remote.Repository, error) | ||
|
||
// in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table | ||
Handle *libdal.Handle[ContainerService] | ||
db sql.Querier | ||
} | ||
|
||
type ArtefactRepository struct { | ||
ModuleDigest sha256.SHA256 | ||
MediaType string | ||
ArtefactType string | ||
RepositoryDigest digest.Digest | ||
Size int64 | ||
} | ||
|
||
func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService { | ||
// Connect the registry targeting the specified container | ||
repoConnectionBuilder := func(path string) (*remote.Repository, error) { | ||
ref := fmt.Sprintf("%s/%s", c.Registry, path) | ||
reg, err := remote.NewRepository(ref) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err) | ||
} | ||
|
||
reg.Client = &auth.Client{ | ||
Client: retry.DefaultClient, | ||
Cache: auth.NewCache(), | ||
Credential: auth.StaticCredential(c.Registry, auth.Credential{ | ||
Username: c.Username, | ||
Password: c.Password, | ||
}), | ||
} | ||
reg.PlainHTTP = c.AllowPlainHTTP | ||
|
||
return reg, nil | ||
} | ||
|
||
return &ContainerService{ | ||
host: c.Registry, | ||
repoConnectionBuilder: repoConnectionBuilder, | ||
Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService { | ||
return &ContainerService{ | ||
host: c.Registry, | ||
repoConnectionBuilder: repoConnectionBuilder, | ||
Handle: h, | ||
db: sql.New(h.Connection), | ||
} | ||
}), | ||
} | ||
} | ||
|
||
func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) { | ||
set := make(map[sha256.SHA256]bool) | ||
for _, d := range digests { | ||
set[d] = true | ||
} | ||
modules, err := s.DiscoverModuleArtefacts(ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be explicitly checking the registry for each digest, not listing the full registry then iterating over the list. Ideally Oras should be sending a Even then I am not sure if this is the right approach, we can still store in the database which artifact we know about, as for JVM applications even the HEAD approach will likely be slower than we would want. We could still potentially use the HEAD approach if we don't know about the artifact, as it would allow us to re-use jars from other clusters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great call and thanks for catching it at this stage. Some rethinking is due here, this ticket will track that. |
||
if err != nil { | ||
return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err) | ||
} | ||
keys = make([]ArtefactKey, 0) | ||
for _, m := range modules { | ||
if set[m.ModuleDigest] { | ||
keys = append(keys, ArtefactKey{Digest: m.ModuleDigest}) | ||
delete(set, m.ModuleDigest) | ||
} | ||
} | ||
missing = make([]sha256.SHA256, 0) | ||
for d := range set { | ||
missing = append(missing, d) | ||
} | ||
return keys, missing, nil | ||
} | ||
|
||
func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) { | ||
ref := fmt.Sprintf("ftl/modules/%s", artefact.Digest) | ||
ms := memory.New() | ||
mediaDescriptor := v1.Descriptor{ | ||
MediaType: "application/ftl.module.v1", | ||
Digest: digest.NewDigestFromBytes(digest.SHA256, artefact.Digest[:]), | ||
Size: int64(len(artefact.Content)), | ||
} | ||
err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content)) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err) | ||
} | ||
artifactType := "application/ftl.module.artifact" | ||
opts := oras.PackManifestOptions{ | ||
Layers: []v1.Descriptor{mediaDescriptor}, | ||
} | ||
tag := "latest" | ||
manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err) | ||
} | ||
if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err) | ||
} | ||
repo, err := s.repoConnectionBuilder(ref) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err) | ||
} | ||
if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err) | ||
} | ||
return artefact.Digest, nil | ||
} | ||
|
||
func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) { | ||
ref := createModuleRepositoryPathFromDigest(digest) | ||
registry, err := s.repoConnectionBuilder(ref) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err) | ||
} | ||
_, stream, err := oras.Fetch(ctx, registry, createModuleRepositoryReferenceFromDigest(s.host, digest), oras.DefaultFetchOptions) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to download artefact: %w", err) | ||
} | ||
return stream, nil | ||
} | ||
|
||
func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) { | ||
return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix) | ||
} | ||
|
||
func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) { | ||
registry, err := remote.NewRegistry(s.host) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err) | ||
} | ||
registry.PlainHTTP = true | ||
result := make([]ArtefactRepository, 0) | ||
err = registry.Repositories(ctx, "", func(repos []string) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this iterating over all repositories? That won't be scalable will it? eg. in Cash's cloud there will be thousands if not hundreds of thousands of repositories. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this operation is actually part of the OCI standard either. The underlying implementation queries Either way there should be a single configured repository where we upload these artifacts, and we should not rely on the discovery APIs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created an issue to track fully spec'ing out the approach. |
||
for _, path := range repos { | ||
if !strings.HasPrefix(path, prefix) { | ||
continue | ||
} | ||
d, err := getDigestFromModuleRepositoryPath(path) | ||
if err != nil { | ||
return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err) | ||
} | ||
repo, err := registry.Repository(ctx, path) | ||
if err != nil { | ||
return fmt.Errorf("unable to connect to repository '%s': %w", path, err) | ||
} | ||
desc, err := repo.Resolve(ctx, "latest") | ||
if err != nil { | ||
return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err) | ||
} | ||
_, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions) | ||
if err != nil { | ||
return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err) | ||
} | ||
var manifest v1.Manifest | ||
if err := json.Unmarshal(data, &manifest); err != nil { | ||
return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err) | ||
} | ||
result = append(result, ArtefactRepository{ | ||
ModuleDigest: d, | ||
MediaType: manifest.Layers[0].MediaType, | ||
ArtefactType: manifest.ArtifactType, | ||
RepositoryDigest: desc.Digest, | ||
Size: desc.Size, | ||
}) | ||
} | ||
return nil | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to discover artefacts: %w", err) | ||
} | ||
return result, nil | ||
} | ||
|
||
func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) { | ||
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID) | ||
} | ||
|
||
func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error { | ||
return addReleaseArtefacts(ctx, s.db, key, ra) | ||
} | ||
|
||
// createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root | ||
func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string { | ||
return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:])) | ||
} | ||
|
||
// createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository | ||
func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string { | ||
return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest)) | ||
} | ||
|
||
// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/<digest>:latest | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this path actually defined BTW, I don't see it anywhere. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment above, but basically this should be a config option and not per module so we can re-use artifacts. |
||
func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) { | ||
slash := strings.LastIndex(repository, "/") | ||
if slash == -1 { | ||
return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository) | ||
} | ||
d, err := sha256.ParseSHA256(repository[slash+1:]) | ||
if err != nil { | ||
return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err) | ||
} | ||
return d, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"crypto/sha256" | ||
"fmt" | ||
|
||
"github.com/google/uuid" | ||
|
||
"github.com/TBD54566975/ftl/backend/controller/artefacts" | ||
internalobservability "github.com/TBD54566975/ftl/internal/observability" | ||
) | ||
|
||
type releaseCmd struct { | ||
Registry string `help:"Registry host:port" default:"127.0.0.1:5001"` | ||
DSN string `help:"DAL DSN." default:"postgres://127.0.0.1:15432/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"` | ||
MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"` | ||
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"` | ||
|
||
Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."` | ||
List releaseListCmd `cmd:"" help:"Lists all published releases."` | ||
} | ||
|
||
type releasePublishCmd struct { | ||
} | ||
|
||
func (d *releasePublishCmd) Run(release *releaseCmd) error { | ||
svc, err := createContainerService(release) | ||
if err != nil { | ||
return fmt.Errorf("failed to create container service: %w", err) | ||
} | ||
content := uuid.New() | ||
contentBytes := content[:] | ||
hash, err := svc.Upload(context.Background(), artefacts.Artefact{ | ||
Digest: sha256.Sum256(contentBytes), | ||
Metadata: artefacts.Metadata{ | ||
Path: fmt.Sprintf("random/%s", content), | ||
}, | ||
Content: contentBytes, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed upload artefact: %w", err) | ||
} | ||
fmt.Printf("Artefact published with hash: \033[31m%s\033[0m\n", hash) | ||
return nil | ||
} | ||
|
||
type releaseListCmd struct { | ||
} | ||
|
||
func (d *releaseListCmd) Run(release *releaseCmd) error { | ||
svc, err := createContainerService(release) | ||
if err != nil { | ||
return fmt.Errorf("failed to create container service: %w", err) | ||
} | ||
modules, err := svc.DiscoverModuleArtefacts(context.Background()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would basically translate to the OCI list operation, which is often really slow and not scalable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good, captured in #3076 |
||
if err != nil { | ||
return fmt.Errorf("failed to discover module artefacts: %w", err) | ||
} | ||
if len(modules) == 0 { | ||
fmt.Println("No module artefacts found.") | ||
return nil | ||
} | ||
|
||
format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n" | ||
fmt.Printf("Found %d module artefacts:\n", len(modules)) | ||
for i, m := range modules { | ||
fmt.Printf("\033[31m Artefact %d\033[0m\n", i) | ||
fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func createContainerService(release *releaseCmd) (*artefacts.ContainerService, error) { | ||
conn, err := internalobservability.OpenDBAndInstrument(release.DSN) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to open DB connection: %w", err) | ||
} | ||
conn.SetMaxIdleConns(release.MaxIdleDBConnections) | ||
conn.SetMaxOpenConns(release.MaxOpenDBConnections) | ||
|
||
return artefacts.NewContainerService(artefacts.ContainerConfig{ | ||
Registry: release.Registry, | ||
AllowPlainHTTP: true, | ||
}, conn), nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI the typical pattern for config is to tag them with Kong tags, and embed them in the CLI so they can be automatically passed through from the CLI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated