Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pre-pull images and sideload them into the cluster #143

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 21 additions & 37 deletions internal/cmd/images/manifest_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"strings"

"github.com/airbytehq/abctl/internal/cmd/local/helm"
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
"github.com/airbytehq/abctl/internal/common"
"github.com/airbytehq/abctl/internal/trace"
helmlib "github.com/mittwald/go-helm-client"
"helm.sh/helm/v3/pkg/repo"

"github.com/airbytehq/abctl/internal/common"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -27,16 +26,11 @@ type ManifestCmd struct {
Values string `type:"existingfile" help:"An Airbyte helm chart values file to configure helm."`
}

func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
func (c *ManifestCmd) Run(ctx context.Context) error {
ctx, span := trace.NewSpan(ctx, "images manifest")
defer span.End()

client, err := helm.New(provider.Kubeconfig, provider.Context, common.AirbyteNamespace)
if err != nil {
return err
}

images, err := c.findAirbyteImages(ctx, client)
images, err := c.findAirbyteImages(ctx)
if err != nil {
return err
}
Expand All @@ -48,7 +42,7 @@ func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error {
return nil
}

func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) ([]string, error) {
func (c *ManifestCmd) findAirbyteImages(ctx context.Context) ([]string, error) {
valuesYaml, err := helm.BuildAirbyteValues(ctx, helm.ValuesOpts{
ValuesFile: c.Values,
})
Expand All @@ -57,11 +51,20 @@ func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client)
}

airbyteChartLoc := helm.LocateLatestAirbyteChart(c.ChartVersion, c.Chart)
return findImagesFromChart(client, valuesYaml, airbyteChartLoc, c.ChartVersion)
return FindImagesFromChart(valuesYaml, airbyteChartLoc, c.ChartVersion)
}

func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion string) ([]string, error) {
err := client.AddOrUpdateChartRepo(repo.Entry{
func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string, error) {

// sharing a helm client with the install code causes some weird issues,
// and templating the chart doesn't need details about the k8s provider,
// we create a throwaway helm client here.
client, err := helmlib.New(helm.ClientOptions(common.AirbyteNamespace))
if err != nil {
return nil, err
}

err = client.AddOrUpdateChartRepo(repo.Entry{
Name: common.AirbyteRepoName,
URL: common.AirbyteRepoURL,
})
Expand All @@ -88,7 +91,7 @@ func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion
// It returns a unique, sorted list of images found.
func findAllImages(chartYaml string) []string {
objs := decodeK8sResources(chartYaml)
imageSet := set[string]{}
imageSet := common.Set[string]{}

for _, obj := range objs {

Expand All @@ -98,7 +101,7 @@ func findAllImages(chartYaml string) []string {
if strings.HasSuffix(z.Name, "airbyte-env") {
for k, v := range z.Data {
if strings.HasSuffix(k, "_IMAGE") {
imageSet.add(v)
imageSet.Add(v)
}
}
}
Expand All @@ -116,15 +119,15 @@ func findAllImages(chartYaml string) []string {
}

for _, c := range podSpec.InitContainers {
imageSet.add(c.Image)
imageSet.Add(c.Image)
}
for _, c := range podSpec.Containers {
imageSet.add(c.Image)
imageSet.Add(c.Image)
}
}

var out []string
for _, k := range imageSet.items() {
for _, k := range imageSet.Items() {
if k != "" {
out = append(out, k)
}
Expand All @@ -149,22 +152,3 @@ func decodeK8sResources(renderedYaml string) []runtime.Object {
}
return out
}

type set[T comparable] struct {
vals map[T]struct{}
}

func (s *set[T]) add(v T) {
if s.vals == nil {
s.vals = map[T]struct{}{}
}
s.vals[v] = struct{}{}
}

func (s *set[T]) items() []T {
out := make([]T, len(s.vals))
for k := range s.vals {
out = append(out, k)
}
return out
}
9 changes: 3 additions & 6 deletions internal/cmd/images/manifest_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ func getHelmTestClient(t *testing.T) helm.Client {
}

func TestManifestCmd(t *testing.T) {
client := getHelmTestClient(t)
cmd := ManifestCmd{
ChartVersion: "1.1.0",
}
actual, err := cmd.findAirbyteImages(context.Background(), client)
actual, err := cmd.findAirbyteImages(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -48,12 +47,11 @@ func TestManifestCmd(t *testing.T) {
}

func TestManifestCmd_Enterprise(t *testing.T) {
client := getHelmTestClient(t)
cmd := ManifestCmd{
ChartVersion: "1.1.0",
Values: "testdata/enterprise.values.yaml",
}
actual, err := cmd.findAirbyteImages(context.Background(), client)
actual, err := cmd.findAirbyteImages(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -81,13 +79,12 @@ func TestManifestCmd_Enterprise(t *testing.T) {
}

func TestManifestCmd_Nightly(t *testing.T) {
client := getHelmTestClient(t)
cmd := ManifestCmd{
// This version includes chart fixes that expose images more consistently and completely.
ChartVersion: "1.1.0-nightly-1728428783-9025e1a46e",
Values: "testdata/enterprise.values.yaml",
}
actual, err := cmd.findAirbyteImages(context.Background(), client)
actual, err := cmd.findAirbyteImages(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/local/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Client interface {

ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error)
ImagePull(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error)
ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error)

ServerVersion(ctx context.Context) (types.Version, error)
VolumeInspect(ctx context.Context, volumeID string) (volume.Volume, error)
Expand Down
5 changes: 5 additions & 0 deletions internal/cmd/local/docker/dockertest/dockertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type MockClient struct {
FnContainerExecStart func(ctx context.Context, execID string, config container.ExecStartOptions) error
FnImageList func(ctx context.Context, options image.ListOptions) ([]image.Summary, error)
FnImagePull func(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error)
FnImageSave func(ctx context.Context, imageIDs []string) (io.ReadCloser, error)
FnServerVersion func(ctx context.Context) (types.Version, error)
FnVolumeInspect func(ctx context.Context, volumeID string) (volume.Volume, error)
FnInfo func(ctx context.Context) (system.Info, error)
Expand Down Expand Up @@ -82,6 +83,10 @@ func (m MockClient) ImagePull(ctx context.Context, refStr string, options image.
return m.FnImagePull(ctx, refStr, options)
}

func (m MockClient) ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error) {
return m.ImageSave(ctx, imageIDs)
}

func (m MockClient) ServerVersion(ctx context.Context) (types.Version, error) {
return m.FnServerVersion(ctx)
}
Expand Down
22 changes: 13 additions & 9 deletions internal/cmd/local/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ type Client interface {
TemplateChart(spec *helmclient.ChartSpec, options *helmclient.HelmTemplateOptions) ([]byte, error)
}

func ClientOptions(namespace string) *helmclient.Options {
logger := helmLogger{}
return &helmclient.Options{
Namespace: namespace,
Output: logger,
DebugLog: logger.Debug,
Debug: true,
RepositoryCache: paths.HelmRepoCache,
RepositoryConfig: paths.HelmRepoConfig,
}
}

// New returns the default helm client
func New(kubecfg, kubectx, namespace string) (Client, error) {
k8sCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
Expand All @@ -38,16 +50,8 @@ func New(kubecfg, kubectx, namespace string) (Client, error) {
return nil, fmt.Errorf("%w: unable to create rest config: %w", localerr.ErrKubernetes, err)
}

logger := helmLogger{}
helm, err := helmclient.NewClientFromRestConf(&helmclient.RestConfClientOptions{
Options: &helmclient.Options{
Namespace: namespace,
Output: logger,
DebugLog: logger.Debug,
Debug: true,
RepositoryCache: paths.HelmRepoCache,
RepositoryConfig: paths.HelmRepoConfig,
},
Options: ClientOptions(namespace),
RestConfig: restCfg,
})
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions internal/cmd/local/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/docker"
"github.com/airbytehq/abctl/internal/cmd/local/k8s/kind"
"github.com/airbytehq/abctl/internal/cmd/local/paths"
"github.com/airbytehq/abctl/internal/trace"
Expand All @@ -30,6 +31,7 @@ type Cluster interface {
Delete(ctx context.Context) error
// Exists returns true if the cluster exists, false otherwise.
Exists(ctx context.Context) bool
LoadImages(ctx context.Context, dockerClient docker.Client, images []string)
}

// interface sanity check
Expand Down Expand Up @@ -110,6 +112,23 @@ func (k *kindCluster) Exists(ctx context.Context) bool {
return false
}

// LoadImages pulls images from Docker Hub, and loads them into the kind cluster.
// This is a best-effort optimization, which is why it doesn't return an error.
// It's possible that only some images will be loaded.
func (k *kindCluster) LoadImages(ctx context.Context, dockerClient docker.Client, images []string) {
// Get a list of Kind nodes.
nodes, err := k.p.ListNodes(k.clusterName)
if err != nil {
pterm.Debug.Printfln("failed to load images: %s", err)
return
}

err = loadImages(ctx, dockerClient, nodes, images)
if err != nil {
pterm.Debug.Printfln("failed to load images: %s", err)
}
}

func formatKindErr(err error) error {
var kindErr *kindExec.RunError
if errors.As(err, &kindErr) {
Expand Down
Loading