diff --git a/.gitignore b/.gitignore index 3f570ba540..1d52528b7f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .hermit/ .vscode/* +.registry/ !/.vscode/extensions.json !/.vscode/settings.json !/.vscode/launch.json diff --git a/backend/controller/artefacts/dal_registry.go b/backend/controller/artefacts/dal_registry.go index 42e7e9728e..43236a4ff0 100644 --- a/backend/controller/artefacts/dal_registry.go +++ b/backend/controller/artefacts/dal_registry.go @@ -76,7 +76,11 @@ func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCl } func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) { - rows, err := s.db.GetDeploymentArtefacts(ctx, releaseID) + return getDatabaseReleaseArtefacts(ctx, s.db, releaseID) +} + +func getDatabaseReleaseArtefacts(ctx context.Context, db sql.Querier, releaseID int64) ([]ReleaseArtefact, error) { + rows, err := db.GetDeploymentArtefacts(ctx, releaseID) if err != nil { return nil, fmt.Errorf("unable to get release artefacts: %w", libdal.TranslatePGError(err)) } @@ -90,7 +94,11 @@ func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]R } func (s *Service) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error { - err := s.db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{ + return addReleaseArtefacts(ctx, s.db, key, ra) +} + +func addReleaseArtefacts(ctx context.Context, db sql.Querier, key model.DeploymentKey, ra ReleaseArtefact) error { + err := db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{ Key: key, Digest: ra.Artefact.Digest[:], Executable: ra.Executable, diff --git a/backend/controller/artefacts/oci_registry.go b/backend/controller/artefacts/oci_registry.go new file mode 100644 index 0000000000..b8cdb881d3 --- /dev/null +++ b/backend/controller/artefacts/oci_registry.go @@ -0,0 +1,241 @@ +package artefacts + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "strings" + + "github.com/opencontainers/go-digest" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras-go/v2" + "oras.land/oras-go/v2/content/memory" + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/auth" + "oras.land/oras-go/v2/registry/remote/retry" + + "github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql" + "github.com/TBD54566975/ftl/backend/libdal" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/sha256" +) + +const ( + ModuleArtifactPrefix = "ftl/modules/" +) + +type ContainerConfig struct { + Registry string `help:"OCI container registry host:port" env:"FTL_ARTEFACTS_REGISTRY"` + Username string `help:"OCI container registry username" env:"FTL_ARTEFACTS_USER"` + Password string `help:"OCI container registry password" env:"FTL_ARTEFACTS_PWD"` + AllowPlainHTTP bool `help:"Allows OCI container requests to accept plain HTTP responses" env:"FTL_ARTEFACTS_ALLOW_HTTP"` +} + +type ContainerService struct { + host string + repoConnectionBuilder func(container string) (*remote.Repository, error) + + // in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table + Handle *libdal.Handle[ContainerService] + db sql.Querier +} + +type ArtefactRepository struct { + ModuleDigest sha256.SHA256 + MediaType string + ArtefactType string + RepositoryDigest digest.Digest + Size int64 +} + +func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService { + // Connect the registry targeting the specified container + repoConnectionBuilder := func(path string) (*remote.Repository, error) { + ref := fmt.Sprintf("%s/%s", c.Registry, path) + reg, err := remote.NewRepository(ref) + if err != nil { + return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err) + } + + reg.Client = &auth.Client{ + Client: retry.DefaultClient, + Cache: auth.NewCache(), + Credential: auth.StaticCredential(c.Registry, auth.Credential{ + Username: c.Username, + Password: c.Password, + }), + } + reg.PlainHTTP = c.AllowPlainHTTP + + return reg, nil + } + + return &ContainerService{ + host: c.Registry, + repoConnectionBuilder: repoConnectionBuilder, + Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService { + return &ContainerService{ + host: c.Registry, + repoConnectionBuilder: repoConnectionBuilder, + Handle: h, + db: sql.New(h.Connection), + } + }), + } +} + +func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) { + set := make(map[sha256.SHA256]bool) + for _, d := range digests { + set[d] = true + } + modules, err := s.DiscoverModuleArtefacts(ctx) + if err != nil { + return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err) + } + keys = make([]ArtefactKey, 0) + for _, m := range modules { + if set[m.ModuleDigest] { + keys = append(keys, ArtefactKey{Digest: m.ModuleDigest}) + delete(set, m.ModuleDigest) + } + } + missing = make([]sha256.SHA256, 0) + for d := range set { + missing = append(missing, d) + } + return keys, missing, nil +} + +func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) { + ref := fmt.Sprintf("ftl/modules/%s", artefact.Digest) + ms := memory.New() + mediaDescriptor := v1.Descriptor{ + MediaType: "application/ftl.module.v1", + Digest: digest.NewDigestFromBytes(digest.SHA256, artefact.Digest[:]), + Size: int64(len(artefact.Content)), + } + err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content)) + if err != nil { + return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err) + } + artifactType := "application/ftl.module.artifact" + opts := oras.PackManifestOptions{ + Layers: []v1.Descriptor{mediaDescriptor}, + } + tag := "latest" + manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts) + if err != nil { + return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err) + } + if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil { + return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err) + } + repo, err := s.repoConnectionBuilder(ref) + if err != nil { + return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err) + } + if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil { + return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err) + } + return artefact.Digest, nil +} + +func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) { + ref := createModuleRepositoryPathFromDigest(digest) + registry, err := s.repoConnectionBuilder(ref) + if err != nil { + return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err) + } + _, stream, err := oras.Fetch(ctx, registry, createModuleRepositoryReferenceFromDigest(s.host, digest), oras.DefaultFetchOptions) + if err != nil { + return nil, fmt.Errorf("unable to download artefact: %w", err) + } + return stream, nil +} + +func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) { + return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix) +} + +func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) { + registry, err := remote.NewRegistry(s.host) + if err != nil { + return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err) + } + registry.PlainHTTP = true + result := make([]ArtefactRepository, 0) + err = registry.Repositories(ctx, "", func(repos []string) error { + for _, path := range repos { + if !strings.HasPrefix(path, prefix) { + continue + } + d, err := getDigestFromModuleRepositoryPath(path) + if err != nil { + return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err) + } + repo, err := registry.Repository(ctx, path) + if err != nil { + return fmt.Errorf("unable to connect to repository '%s': %w", path, err) + } + desc, err := repo.Resolve(ctx, "latest") + if err != nil { + return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err) + } + _, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions) + if err != nil { + return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err) + } + var manifest v1.Manifest + if err := json.Unmarshal(data, &manifest); err != nil { + return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err) + } + result = append(result, ArtefactRepository{ + ModuleDigest: d, + MediaType: manifest.Layers[0].MediaType, + ArtefactType: manifest.ArtifactType, + RepositoryDigest: desc.Digest, + Size: desc.Size, + }) + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("unable to discover artefacts: %w", err) + } + return result, nil +} + +func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) { + return getDatabaseReleaseArtefacts(ctx, s.db, releaseID) +} + +func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error { + return addReleaseArtefacts(ctx, s.db, key, ra) +} + +// createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root +func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string { + return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:])) +} + +// createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository +func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string { + return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest)) +} + +// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/:latest +func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) { + slash := strings.LastIndex(repository, "/") + if slash == -1 { + return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository) + } + d, err := sha256.ParseSHA256(repository[slash+1:]) + if err != nil { + return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err) + } + return d, nil +} diff --git a/docker-compose.yml b/docker-compose.yml index 51b3c7b0ac..58fc482c75 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,12 @@ services: environment: SERVICES: secretsmanager DEBUG: 1 + registry: + image: registry:2 + ports: + - "5001:5000" + volumes: + - ./.registry:/var/lib/registry volumes: grafana-storage: {} diff --git a/frontend/cli/cmd_release.go b/frontend/cli/cmd_release.go new file mode 100644 index 0000000000..0c5f23f525 --- /dev/null +++ b/frontend/cli/cmd_release.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "crypto/sha256" + "fmt" + + "github.com/google/uuid" + + "github.com/TBD54566975/ftl/backend/controller/artefacts" + internalobservability "github.com/TBD54566975/ftl/internal/observability" +) + +type releaseCmd struct { + Registry string `help:"Registry host:port" default:"127.0.0.1:5001"` + DSN string `help:"DAL DSN." default:"postgres://127.0.0.1:15432/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"` + MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"` + MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"` + + Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."` + List releaseListCmd `cmd:"" help:"Lists all published releases."` +} + +type releasePublishCmd struct { +} + +func (d *releasePublishCmd) Run(release *releaseCmd) error { + svc, err := createContainerService(release) + if err != nil { + return fmt.Errorf("failed to create container service: %w", err) + } + content := uuid.New() + contentBytes := content[:] + hash, err := svc.Upload(context.Background(), artefacts.Artefact{ + Digest: sha256.Sum256(contentBytes), + Metadata: artefacts.Metadata{ + Path: fmt.Sprintf("random/%s", content), + }, + Content: contentBytes, + }) + if err != nil { + return fmt.Errorf("failed upload artefact: %w", err) + } + fmt.Printf("Artefact published with hash: \033[31m%s\033[0m\n", hash) + return nil +} + +type releaseListCmd struct { +} + +func (d *releaseListCmd) Run(release *releaseCmd) error { + svc, err := createContainerService(release) + if err != nil { + return fmt.Errorf("failed to create container service: %w", err) + } + modules, err := svc.DiscoverModuleArtefacts(context.Background()) + if err != nil { + return fmt.Errorf("failed to discover module artefacts: %w", err) + } + if len(modules) == 0 { + fmt.Println("No module artefacts found.") + return nil + } + + format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n" + fmt.Printf("Found %d module artefacts:\n", len(modules)) + for i, m := range modules { + fmt.Printf("\033[31m Artefact %d\033[0m\n", i) + fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType) + } + + return nil +} + +func createContainerService(release *releaseCmd) (*artefacts.ContainerService, error) { + conn, err := internalobservability.OpenDBAndInstrument(release.DSN) + if err != nil { + return nil, fmt.Errorf("failed to open DB connection: %w", err) + } + conn.SetMaxIdleConns(release.MaxIdleDBConnections) + conn.SetMaxOpenConns(release.MaxOpenDBConnections) + + return artefacts.NewContainerService(artefacts.ContainerConfig{ + Registry: release.Registry, + AllowPlainHTTP: true, + }, conn), nil +} diff --git a/frontend/cli/main.go b/frontend/cli/main.go index f042bf5e01..3c73ac2200 100644 --- a/frontend/cli/main.go +++ b/frontend/cli/main.go @@ -56,6 +56,7 @@ type InteractiveCLI struct { Secret secretCmd `cmd:"" help:"Manage secrets."` Config configCmd `cmd:"" help:"Manage configuration."` Pubsub pubsubCmd `cmd:"" help:"Manage pub/sub."` + Release releaseCmd `cmd:"" help:"Manage releases."` } type CLI struct { diff --git a/go.mod b/go.mod index e382c9f269..15d2610b31 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,8 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/mattn/go-isatty v0.0.20 github.com/multiformats/go-base36 v0.2.0 + github.com/opencontainers/go-digest v1.0.0 + github.com/opencontainers/image-spec v1.1.0 github.com/otiai10/copy v1.14.0 github.com/posener/complete v1.2.3 github.com/radovskyb/watcher v1.0.7 @@ -82,6 +84,7 @@ require ( k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 modernc.org/sqlite v1.33.1 + oras.land/oras-go/v2 v2.5.0 sigs.k8s.io/yaml v1.4.0 ) @@ -130,8 +133,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/onsi/gomega v1.33.1 // indirect - github.com/opencontainers/go-digest v1.0.0 // indirect - github.com/opencontainers/image-spec v1.1.0 // indirect github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect diff --git a/go.sum b/go.sum index 858a8a6d46..d40ad7510b 100644 --- a/go.sum +++ b/go.sum @@ -510,6 +510,8 @@ modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +oras.land/oras-go/v2 v2.5.0 h1:o8Me9kLY74Vp5uw07QXPiitjsw7qNXi8Twd+19Zf02c= +oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZHg= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=