From c7b251b0af227a4f9bcbc8cc1fdab1bfafd51fb6 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Thu, 31 Oct 2024 13:09:52 -0700 Subject: [PATCH 1/8] feat: pre-pull images and sideload them into the cluster --- internal/cmd/images/manifest_cmd.go | 29 ++++++----- internal/cmd/local/helm/helm.go | 22 +++++---- internal/cmd/local/k8s/cluster.go | 74 +++++++++++++++++++++++++++++ internal/cmd/local/local/install.go | 13 +++++ internal/cmd/local/local_install.go | 2 + 5 files changed, 118 insertions(+), 22 deletions(-) diff --git a/internal/cmd/images/manifest_cmd.go b/internal/cmd/images/manifest_cmd.go index 3ac3d5d..50b95d9 100644 --- a/internal/cmd/images/manifest_cmd.go +++ b/internal/cmd/images/manifest_cmd.go @@ -7,12 +7,11 @@ import ( "strings" "github.com/airbytehq/abctl/internal/cmd/local/helm" - "github.com/airbytehq/abctl/internal/cmd/local/k8s" + "github.com/airbytehq/abctl/internal/common" "github.com/airbytehq/abctl/internal/trace" helmlib "github.com/mittwald/go-helm-client" "helm.sh/helm/v3/pkg/repo" - "github.com/airbytehq/abctl/internal/common" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -27,16 +26,11 @@ type ManifestCmd struct { Values string `type:"existingfile" help:"An Airbyte helm chart values file to configure helm."` } -func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error { +func (c *ManifestCmd) Run(ctx context.Context) error { ctx, span := trace.NewSpan(ctx, "images manifest") defer span.End() - client, err := helm.New(provider.Kubeconfig, provider.Context, common.AirbyteNamespace) - if err != nil { - return err - } - - images, err := c.findAirbyteImages(ctx, client) + images, err := c.findAirbyteImages(ctx) if err != nil { return err } @@ -48,7 +42,7 @@ func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error { return nil } -func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) ([]string, error) { +func (c *ManifestCmd) findAirbyteImages(ctx context.Context) ([]string, error) { valuesYaml, err := helm.BuildAirbyteValues(ctx, helm.ValuesOpts{ ValuesFile: c.Values, }) @@ -57,11 +51,20 @@ func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) } airbyteChartLoc := helm.LocateLatestAirbyteChart(c.ChartVersion, c.Chart) - return findImagesFromChart(client, valuesYaml, airbyteChartLoc, c.ChartVersion) + return FindImagesFromChart(valuesYaml, airbyteChartLoc, c.ChartVersion) } -func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion string) ([]string, error) { - err := client.AddOrUpdateChartRepo(repo.Entry{ +func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string, error) { + + // sharing a helm client with the install code causes some weird issues, + // and templating the chart doesn't need details about the k8s provider, + // we create a throwaway helm client here. + client, err := helmlib.New(helm.ClientOptions(common.AirbyteNamespace)) + if err != nil { + return nil, err + } + + err = client.AddOrUpdateChartRepo(repo.Entry{ Name: common.AirbyteRepoName, URL: common.AirbyteRepoURL, }) diff --git a/internal/cmd/local/helm/helm.go b/internal/cmd/local/helm/helm.go index fab4ff3..9aaf782 100644 --- a/internal/cmd/local/helm/helm.go +++ b/internal/cmd/local/helm/helm.go @@ -26,6 +26,18 @@ type Client interface { TemplateChart(spec *helmclient.ChartSpec, options *helmclient.HelmTemplateOptions) ([]byte, error) } +func ClientOptions(namespace string) *helmclient.Options { + logger := helmLogger{} + return &helmclient.Options{ + Namespace: namespace, + Output: logger, + DebugLog: logger.Debug, + Debug: true, + RepositoryCache: paths.HelmRepoCache, + RepositoryConfig: paths.HelmRepoConfig, + } +} + // New returns the default helm client func New(kubecfg, kubectx, namespace string) (Client, error) { k8sCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( @@ -38,16 +50,8 @@ func New(kubecfg, kubectx, namespace string) (Client, error) { return nil, fmt.Errorf("%w: unable to create rest config: %w", localerr.ErrKubernetes, err) } - logger := helmLogger{} helm, err := helmclient.NewClientFromRestConf(&helmclient.RestConfClientOptions{ - Options: &helmclient.Options{ - Namespace: namespace, - Output: logger, - DebugLog: logger.Debug, - Debug: true, - RepositoryCache: paths.HelmRepoCache, - RepositoryConfig: paths.HelmRepoConfig, - }, + Options: ClientOptions(namespace), RestConfig: restCfg, }) if err != nil { diff --git a/internal/cmd/local/k8s/cluster.go b/internal/cmd/local/k8s/cluster.go index 072510e..7367721 100644 --- a/internal/cmd/local/k8s/cluster.go +++ b/internal/cmd/local/k8s/cluster.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "os" + "os/exec" + "path/filepath" + "sync" "time" "github.com/airbytehq/abctl/internal/cmd/local/k8s/kind" @@ -13,7 +16,9 @@ import ( "github.com/pterm/pterm" "gopkg.in/yaml.v3" "sigs.k8s.io/kind/pkg/cluster" + "sigs.k8s.io/kind/pkg/cluster/nodeutils" kindExec "sigs.k8s.io/kind/pkg/exec" + "sigs.k8s.io/kind/pkg/fs" ) // ExtraVolumeMount defines a host volume mount for the Kind cluster @@ -30,6 +35,7 @@ type Cluster interface { Delete(ctx context.Context) error // Exists returns true if the cluster exists, false otherwise. Exists(ctx context.Context) bool + LoadImages(ctx context.Context, images []string) } // interface sanity check @@ -110,6 +116,74 @@ func (k *kindCluster) Exists(ctx context.Context) bool { return false } +// LoadImages pulls images from Docker Hub, and loads them into the kind cluster. +// This is a best-effort optimization, which is why it doesn't an error; +// it's possible that only some images will be loaded. +// TODO this should probably take a context, and handle cancellation. +func (k *kindCluster) LoadImages(ctx context.Context, images []string) { + err := k.loadImages(ctx, images) + pterm.Debug.Printfln("failed to load images: %s", err) +} + +func (k *kindCluster) loadImages(ctx context.Context, images []string) error { + // Get a list of Kind nodes. + nodes, err := k.p.ListNodes(k.clusterName) + if err != nil { + return fmt.Errorf("listing nodes: %w", err) + } + + // Setup the tar path where the images will be saved. + dir, err := fs.TempDir("", "images-tar-") + if err != nil { + return err + } + defer os.RemoveAll(dir) + + // Pull all the images via "docker pull", in parallel. + var wg sync.WaitGroup + wg.Add(len(images)) + for _, img := range images { + pterm.Debug.Printfln("pulling image %s", img) + + go func(img string) { + defer wg.Done() + out, err := exec.CommandContext(ctx, "docker", "pull", img).CombinedOutput() + if err != nil { + pterm.Debug.Printfln("error pulling image %s", out) + // don't return the error here, because other image pulls might succeed. + } + }(img) + } + wg.Wait() + + // The context could be canceled by now. If so, return early. + if ctx.Err() != nil { + return ctx.Err() + } + + // Save all the images to an archive, images.tar + imagesTarPath := filepath.Join(dir, "images.tar") + pterm.Debug.Printfln("saving image archive to %s", imagesTarPath) + + out, err := exec.CommandContext(ctx, "docker", append([]string{"save", "-o", imagesTarPath}, images...)...).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to run 'docker save': %s", out) + } + + // Load the image archive into the Kind nodes. + f, err := os.Open(imagesTarPath) + if err != nil { + return err + } + defer f.Close() + + for _, n := range nodes { + pterm.Debug.Printfln("loading image archive into kind node %s", n) + nodeutils.LoadImageArchive(n, f) + } + return nil +} + func formatKindErr(err error) error { var kindErr *kindExec.RunError if errors.As(err, &kindErr) { diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index fa06afa..2517275 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/airbytehq/abctl/internal/cmd/images" "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/helm" "github.com/airbytehq/abctl/internal/cmd/local/k8s" @@ -151,6 +152,18 @@ func (c *Command) persistentVolumeClaim(ctx context.Context, namespace, name, vo return nil } +// PrepImages determines the docker images needed by the chart, pulls them, and loads them into the cluster. +// This is best effort, so errors are dropped here. +func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *InstallOpts) { + manifest, err := images.FindImagesFromChart(opts.HelmValuesYaml, opts.AirbyteChartLoc, opts.HelmChartVersion) + if err != nil { + pterm.Debug.Printfln("error building image manifest: %s", err) + return + } + + cluster.LoadImages(ctx, manifest) +} + // Install handles the installation of Airbyte func (c *Command) Install(ctx context.Context, opts *InstallOpts) error { ctx, span := trace.NewSpan(ctx, "command.Install") diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index 51efb6b..ed7bcdd 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -164,6 +164,8 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t return fmt.Errorf("unable to initialize local command: %w", err) } + lc.PrepImages(ctx, cluster, opts) + if err := lc.Install(ctx, opts); err != nil { spinner.Fail("Unable to install Airbyte locally") return err From 396c78d803f0a70c94950512c2277db40dcfdd18 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Thu, 31 Oct 2024 13:16:48 -0700 Subject: [PATCH 2/8] fix test --- internal/cmd/images/manifest_cmd_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/internal/cmd/images/manifest_cmd_test.go b/internal/cmd/images/manifest_cmd_test.go index b7b0baf..3196000 100644 --- a/internal/cmd/images/manifest_cmd_test.go +++ b/internal/cmd/images/manifest_cmd_test.go @@ -20,11 +20,10 @@ func getHelmTestClient(t *testing.T) helm.Client { } func TestManifestCmd(t *testing.T) { - client := getHelmTestClient(t) cmd := ManifestCmd{ ChartVersion: "1.1.0", } - actual, err := cmd.findAirbyteImages(context.Background(), client) + actual, err := cmd.findAirbyteImages(context.Background()) if err != nil { t.Fatal(err) } @@ -48,12 +47,11 @@ func TestManifestCmd(t *testing.T) { } func TestManifestCmd_Enterprise(t *testing.T) { - client := getHelmTestClient(t) cmd := ManifestCmd{ ChartVersion: "1.1.0", Values: "testdata/enterprise.values.yaml", } - actual, err := cmd.findAirbyteImages(context.Background(), client) + actual, err := cmd.findAirbyteImages(context.Background()) if err != nil { t.Fatal(err) } @@ -81,13 +79,12 @@ func TestManifestCmd_Enterprise(t *testing.T) { } func TestManifestCmd_Nightly(t *testing.T) { - client := getHelmTestClient(t) cmd := ManifestCmd{ // This version includes chart fixes that expose images more consistently and completely. ChartVersion: "1.1.0-nightly-1728428783-9025e1a46e", Values: "testdata/enterprise.values.yaml", } - actual, err := cmd.findAirbyteImages(context.Background(), client) + actual, err := cmd.findAirbyteImages(context.Background()) if err != nil { t.Fatal(err) } From 7d864b7be81be6ef5682591065ed20934201013b Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Thu, 31 Oct 2024 13:20:38 -0700 Subject: [PATCH 3/8] remove TODO --- internal/cmd/local/k8s/cluster.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/cmd/local/k8s/cluster.go b/internal/cmd/local/k8s/cluster.go index 7367721..8799f7e 100644 --- a/internal/cmd/local/k8s/cluster.go +++ b/internal/cmd/local/k8s/cluster.go @@ -119,7 +119,6 @@ func (k *kindCluster) Exists(ctx context.Context) bool { // LoadImages pulls images from Docker Hub, and loads them into the kind cluster. // This is a best-effort optimization, which is why it doesn't an error; // it's possible that only some images will be loaded. -// TODO this should probably take a context, and handle cancellation. func (k *kindCluster) LoadImages(ctx context.Context, images []string) { err := k.loadImages(ctx, images) pterm.Debug.Printfln("failed to load images: %s", err) From 437f32e25f2b49e40255739d0754a6331aef6ec8 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Thu, 31 Oct 2024 13:22:28 -0700 Subject: [PATCH 4/8] add info log --- internal/cmd/local/local/install.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index 2517275..b49d1d4 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -161,6 +161,7 @@ func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *Ins return } + pterm.Info.Println("pulling airbyte images") cluster.LoadImages(ctx, manifest) } From a5266484a222efc26bdbe89fb8c68e629d54124c Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Thu, 31 Oct 2024 13:29:48 -0700 Subject: [PATCH 5/8] fix comment --- internal/cmd/local/k8s/cluster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/cmd/local/k8s/cluster.go b/internal/cmd/local/k8s/cluster.go index 8799f7e..c04b994 100644 --- a/internal/cmd/local/k8s/cluster.go +++ b/internal/cmd/local/k8s/cluster.go @@ -117,8 +117,8 @@ func (k *kindCluster) Exists(ctx context.Context) bool { } // LoadImages pulls images from Docker Hub, and loads them into the kind cluster. -// This is a best-effort optimization, which is why it doesn't an error; -// it's possible that only some images will be loaded. +// This is a best-effort optimization, which is why it doesn't return an error. +// It's possible that only some images will be loaded. func (k *kindCluster) LoadImages(ctx context.Context, images []string) { err := k.loadImages(ctx, images) pterm.Debug.Printfln("failed to load images: %s", err) From 90c6ec72522fb736cd9f69c1c0fdb6cea67f4bd9 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Fri, 1 Nov 2024 07:29:49 -0700 Subject: [PATCH 6/8] use internal docker client --- internal/cmd/local/docker/docker.go | 1 + .../cmd/local/docker/dockertest/dockertest.go | 5 ++ internal/cmd/local/k8s/cluster.go | 65 +++++++++++++------ internal/cmd/local/local/install.go | 2 +- 4 files changed, 51 insertions(+), 22 deletions(-) diff --git a/internal/cmd/local/docker/docker.go b/internal/cmd/local/docker/docker.go index 9fb5500..b602e36 100644 --- a/internal/cmd/local/docker/docker.go +++ b/internal/cmd/local/docker/docker.go @@ -43,6 +43,7 @@ type Client interface { ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error) ImagePull(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error) + ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error) ServerVersion(ctx context.Context) (types.Version, error) VolumeInspect(ctx context.Context, volumeID string) (volume.Volume, error) diff --git a/internal/cmd/local/docker/dockertest/dockertest.go b/internal/cmd/local/docker/dockertest/dockertest.go index 42412ae..46a1a39 100644 --- a/internal/cmd/local/docker/dockertest/dockertest.go +++ b/internal/cmd/local/docker/dockertest/dockertest.go @@ -26,6 +26,7 @@ type MockClient struct { FnContainerExecStart func(ctx context.Context, execID string, config container.ExecStartOptions) error FnImageList func(ctx context.Context, options image.ListOptions) ([]image.Summary, error) FnImagePull func(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error) + FnImageSave func(ctx context.Context, imageIDs []string) (io.ReadCloser, error) FnServerVersion func(ctx context.Context) (types.Version, error) FnVolumeInspect func(ctx context.Context, volumeID string) (volume.Volume, error) FnInfo func(ctx context.Context) (system.Info, error) @@ -82,6 +83,10 @@ func (m MockClient) ImagePull(ctx context.Context, refStr string, options image. return m.FnImagePull(ctx, refStr, options) } +func (m MockClient) ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error) { + return m.ImageSave(ctx, imageIDs) +} + func (m MockClient) ServerVersion(ctx context.Context) (types.Version, error) { return m.FnServerVersion(ctx) } diff --git a/internal/cmd/local/k8s/cluster.go b/internal/cmd/local/k8s/cluster.go index c04b994..9b2abbb 100644 --- a/internal/cmd/local/k8s/cluster.go +++ b/internal/cmd/local/k8s/cluster.go @@ -4,15 +4,17 @@ import ( "context" "errors" "fmt" + "io" "os" - "os/exec" "path/filepath" "sync" "time" + "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/k8s/kind" "github.com/airbytehq/abctl/internal/cmd/local/paths" "github.com/airbytehq/abctl/internal/trace" + "github.com/docker/docker/api/types/image" "github.com/pterm/pterm" "gopkg.in/yaml.v3" "sigs.k8s.io/kind/pkg/cluster" @@ -35,7 +37,7 @@ type Cluster interface { Delete(ctx context.Context) error // Exists returns true if the cluster exists, false otherwise. Exists(ctx context.Context) bool - LoadImages(ctx context.Context, images []string) + LoadImages(ctx context.Context, dockerClient docker.Client, images []string) } // interface sanity check @@ -119,25 +121,18 @@ func (k *kindCluster) Exists(ctx context.Context) bool { // LoadImages pulls images from Docker Hub, and loads them into the kind cluster. // This is a best-effort optimization, which is why it doesn't return an error. // It's possible that only some images will be loaded. -func (k *kindCluster) LoadImages(ctx context.Context, images []string) { - err := k.loadImages(ctx, images) +func (k *kindCluster) LoadImages(ctx context.Context, dockerClient docker.Client, images []string) { + err := k.loadImages(ctx, dockerClient, images) pterm.Debug.Printfln("failed to load images: %s", err) } -func (k *kindCluster) loadImages(ctx context.Context, images []string) error { +func (k *kindCluster) loadImages(ctx context.Context, dockerClient docker.Client, images []string) error { // Get a list of Kind nodes. nodes, err := k.p.ListNodes(k.clusterName) if err != nil { return fmt.Errorf("listing nodes: %w", err) } - // Setup the tar path where the images will be saved. - dir, err := fs.TempDir("", "images-tar-") - if err != nil { - return err - } - defer os.RemoveAll(dir) - // Pull all the images via "docker pull", in parallel. var wg sync.WaitGroup wg.Add(len(images)) @@ -146,11 +141,13 @@ func (k *kindCluster) loadImages(ctx context.Context, images []string) error { go func(img string) { defer wg.Done() - out, err := exec.CommandContext(ctx, "docker", "pull", img).CombinedOutput() + r, err := dockerClient.ImagePull(ctx, img, image.PullOptions{}) if err != nil { - pterm.Debug.Printfln("error pulling image %s", out) - // don't return the error here, because other image pulls might succeed. + pterm.Debug.Printfln("error pulling image %s", err) + // image pull errors are intentionally dropped because we're in a goroutine, + // and because we don't want to interrupt other image pulls. } + defer r.Close() }(img) } wg.Wait() @@ -161,14 +158,11 @@ func (k *kindCluster) loadImages(ctx context.Context, images []string) error { } // Save all the images to an archive, images.tar - imagesTarPath := filepath.Join(dir, "images.tar") - pterm.Debug.Printfln("saving image archive to %s", imagesTarPath) - - out, err := exec.CommandContext(ctx, "docker", append([]string{"save", "-o", imagesTarPath}, images...)...).CombinedOutput() + imagesTarPath, err := saveImageArchive(ctx, dockerClient, images) if err != nil { - return fmt.Errorf("failed to run 'docker save': %s", out) + return fmt.Errorf("failed to save image archive: %w", err) } - + // Load the image archive into the Kind nodes. f, err := os.Open(imagesTarPath) if err != nil { @@ -183,6 +177,35 @@ func (k *kindCluster) loadImages(ctx context.Context, images []string) error { return nil } +func saveImageArchive(ctx context.Context, dockerClient docker.Client, images []string) (string, error) { + + // Setup the tar path where the images will be saved. + dir, err := fs.TempDir("", "images-tar-") + if err != nil { + return "", err + } + defer os.RemoveAll(dir) + imagesTarPath := filepath.Join(dir, "images.tar") + pterm.Debug.Printfln("saving image archive to %s", imagesTarPath) + + wf, err := os.Create(imagesTarPath) + if err != nil { + return "", err + } + defer wf.Close() + + r, err := dockerClient.ImageSave(ctx, images) + if err != nil { + return "", err + } + + if _, err := io.Copy(wf, r); err != nil { + return "", err + } + + return imagesTarPath, nil +} + func formatKindErr(err error) error { var kindErr *kindExec.RunError if errors.As(err, &kindErr) { diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index b49d1d4..a365237 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -162,7 +162,7 @@ func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *Ins } pterm.Info.Println("pulling airbyte images") - cluster.LoadImages(ctx, manifest) + cluster.LoadImages(ctx, c.docker.Client, manifest) } // Install handles the installation of Airbyte From 617fa214c2c82458c0c076fe59d4314bada33b82 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Tue, 5 Nov 2024 10:49:42 -0800 Subject: [PATCH 7/8] only load images that don't already exist on the node --- internal/cmd/images/manifest_cmd.go | 29 +--- internal/cmd/local/k8s/cluster.go | 85 +---------- internal/cmd/local/k8s/load_images.go | 208 ++++++++++++++++++++++++++ internal/cmd/local/local/install.go | 1 - internal/cmd/local/local_install.go | 3 +- internal/common/set.go | 32 ++++ 6 files changed, 251 insertions(+), 107 deletions(-) create mode 100644 internal/cmd/local/k8s/load_images.go create mode 100644 internal/common/set.go diff --git a/internal/cmd/images/manifest_cmd.go b/internal/cmd/images/manifest_cmd.go index 50b95d9..3892771 100644 --- a/internal/cmd/images/manifest_cmd.go +++ b/internal/cmd/images/manifest_cmd.go @@ -91,7 +91,7 @@ func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string, // It returns a unique, sorted list of images found. func findAllImages(chartYaml string) []string { objs := decodeK8sResources(chartYaml) - imageSet := set[string]{} + imageSet := common.Set[string]{} for _, obj := range objs { @@ -101,7 +101,7 @@ func findAllImages(chartYaml string) []string { if strings.HasSuffix(z.Name, "airbyte-env") { for k, v := range z.Data { if strings.HasSuffix(k, "_IMAGE") { - imageSet.add(v) + imageSet.Add(v) } } } @@ -119,15 +119,15 @@ func findAllImages(chartYaml string) []string { } for _, c := range podSpec.InitContainers { - imageSet.add(c.Image) + imageSet.Add(c.Image) } for _, c := range podSpec.Containers { - imageSet.add(c.Image) + imageSet.Add(c.Image) } } var out []string - for _, k := range imageSet.items() { + for _, k := range imageSet.Items() { if k != "" { out = append(out, k) } @@ -152,22 +152,3 @@ func decodeK8sResources(renderedYaml string) []runtime.Object { } return out } - -type set[T comparable] struct { - vals map[T]struct{} -} - -func (s *set[T]) add(v T) { - if s.vals == nil { - s.vals = map[T]struct{}{} - } - s.vals[v] = struct{}{} -} - -func (s *set[T]) items() []T { - out := make([]T, len(s.vals)) - for k := range s.vals { - out = append(out, k) - } - return out -} diff --git a/internal/cmd/local/k8s/cluster.go b/internal/cmd/local/k8s/cluster.go index 9b2abbb..d10af5a 100644 --- a/internal/cmd/local/k8s/cluster.go +++ b/internal/cmd/local/k8s/cluster.go @@ -4,23 +4,17 @@ import ( "context" "errors" "fmt" - "io" "os" - "path/filepath" - "sync" "time" "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/k8s/kind" "github.com/airbytehq/abctl/internal/cmd/local/paths" "github.com/airbytehq/abctl/internal/trace" - "github.com/docker/docker/api/types/image" "github.com/pterm/pterm" "gopkg.in/yaml.v3" "sigs.k8s.io/kind/pkg/cluster" - "sigs.k8s.io/kind/pkg/cluster/nodeutils" kindExec "sigs.k8s.io/kind/pkg/exec" - "sigs.k8s.io/kind/pkg/fs" ) // ExtraVolumeMount defines a host volume mount for the Kind cluster @@ -122,88 +116,17 @@ func (k *kindCluster) Exists(ctx context.Context) bool { // This is a best-effort optimization, which is why it doesn't return an error. // It's possible that only some images will be loaded. func (k *kindCluster) LoadImages(ctx context.Context, dockerClient docker.Client, images []string) { - err := k.loadImages(ctx, dockerClient, images) - pterm.Debug.Printfln("failed to load images: %s", err) -} - -func (k *kindCluster) loadImages(ctx context.Context, dockerClient docker.Client, images []string) error { // Get a list of Kind nodes. nodes, err := k.p.ListNodes(k.clusterName) if err != nil { - return fmt.Errorf("listing nodes: %w", err) - } - - // Pull all the images via "docker pull", in parallel. - var wg sync.WaitGroup - wg.Add(len(images)) - for _, img := range images { - pterm.Debug.Printfln("pulling image %s", img) - - go func(img string) { - defer wg.Done() - r, err := dockerClient.ImagePull(ctx, img, image.PullOptions{}) - if err != nil { - pterm.Debug.Printfln("error pulling image %s", err) - // image pull errors are intentionally dropped because we're in a goroutine, - // and because we don't want to interrupt other image pulls. - } - defer r.Close() - }(img) - } - wg.Wait() - - // The context could be canceled by now. If so, return early. - if ctx.Err() != nil { - return ctx.Err() - } - - // Save all the images to an archive, images.tar - imagesTarPath, err := saveImageArchive(ctx, dockerClient, images) - if err != nil { - return fmt.Errorf("failed to save image archive: %w", err) - } - - // Load the image archive into the Kind nodes. - f, err := os.Open(imagesTarPath) - if err != nil { - return err - } - defer f.Close() - - for _, n := range nodes { - pterm.Debug.Printfln("loading image archive into kind node %s", n) - nodeutils.LoadImageArchive(n, f) - } - return nil -} - -func saveImageArchive(ctx context.Context, dockerClient docker.Client, images []string) (string, error) { - - // Setup the tar path where the images will be saved. - dir, err := fs.TempDir("", "images-tar-") - if err != nil { - return "", err + pterm.Debug.Printfln("failed to load images: %s", err) + return } - defer os.RemoveAll(dir) - imagesTarPath := filepath.Join(dir, "images.tar") - pterm.Debug.Printfln("saving image archive to %s", imagesTarPath) - wf, err := os.Create(imagesTarPath) + err = loadImages(ctx, dockerClient, nodes, images) if err != nil { - return "", err + pterm.Debug.Printfln("failed to load images: %s", err) } - defer wf.Close() - - r, err := dockerClient.ImageSave(ctx, images) - if err != nil { - return "", err - } - - if _, err := io.Copy(wf, r); err != nil { - return "", err - } - - return imagesTarPath, nil } func formatKindErr(err error) error { diff --git a/internal/cmd/local/k8s/load_images.go b/internal/cmd/local/k8s/load_images.go new file mode 100644 index 0000000..2caeffe --- /dev/null +++ b/internal/cmd/local/k8s/load_images.go @@ -0,0 +1,208 @@ +package k8s + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/airbytehq/abctl/internal/cmd/local/docker" + "github.com/airbytehq/abctl/internal/common" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/image" + "github.com/pterm/pterm" + nodeslib "sigs.k8s.io/kind/pkg/cluster/nodes" + "sigs.k8s.io/kind/pkg/cluster/nodeutils" + "sigs.k8s.io/kind/pkg/exec" + "sigs.k8s.io/kind/pkg/fs" +) + +// loadImages pulls and loads images into the kind cluster. +// It will pull all images in parallel, skip any images that already exist on the nodes, +// save the rest to an image archive (tar file), and load archive onto the nodes. +func loadImages(ctx context.Context, dockerClient docker.Client, nodes []nodeslib.Node, images []string) error { + + // Pull all the images via "docker pull", in parallel. + var wg sync.WaitGroup + wg.Add(len(images)) + for _, img := range images { + pterm.Info.Printfln("Pulling image %s", img) + + go func(img string) { + defer wg.Done() + r, err := dockerClient.ImagePull(ctx, img, image.PullOptions{}) + if err != nil { + pterm.Debug.Printfln("error pulling image %s", err) + // image pull errors are intentionally dropped because we're in a goroutine, + // and because we don't want to interrupt other image pulls. + } + defer r.Close() + io.Copy(io.Discard, r) + }(img) + } + wg.Wait() + + // The context could be canceled by now. If so, return early. + if ctx.Err() != nil { + return ctx.Err() + } + + // Determine which images need to be loaded onto the nodes. + needed := determineImagesForLoading(ctx, dockerClient, images, nodes) + if len(needed) == 0 { + return nil + } + + // for _, node := range nodes { + // for _, img := range needed { + + // pterm.Debug.Printfln("saving image %s into node %s", img, node) + + // r, err := dockerClient.ImageSave(ctx, images) + // if err != nil { + // pterm.Debug.Printfln("error loading image %s into node %s: %s", img, node, err) + // continue + // } + + // pterm.Debug.Printfln("loading image %s into node %s", img, node) + + // err = nodeutils.LoadImageArchive(node, r) + // if err != nil { + // pterm.Debug.Printfln("error loading image %s into node %s: %s", img, node, err) + // } + // } + // } + + // Save all the images to an archive, images.tar + imagesTarPath, err := saveImageArchive(ctx, dockerClient, needed) + if err != nil { + return fmt.Errorf("failed to save image archive: %w", err) + } + // defer os.RemoveAll(imagesTarPath) + + // Load the image archive into the Kind nodes. + f, err := os.Open(imagesTarPath) + if err != nil { + return fmt.Errorf("failed to open image archive: %w", err) + } + defer f.Close() + + for _, n := range nodes { + pterm.Debug.Printfln("loading image archive into kind node %s", n) + err := nodeutils.LoadImageArchive(n, f) + if err != nil { + pterm.Debug.Printfln("%s", err) + } + } + return nil +} + +// getExistingImageDigests returns the set of images that already exist on the nodes. +func getExistingImageDigests(ctx context.Context, nodes []nodeslib.Node) common.Set[string] { + existingByNode := map[string]int{} + + for _, n := range nodes { + + out, err := exec.CombinedOutputLines(n.CommandContext(ctx, "ctr", "--namespace=k8s.io", "images", "list")) + if err != nil { + // ignore the error because discovering these images is just an optimization. + pterm.Debug.Printfln("error discovering existing images: %s %s", err, out) + continue + } + if len(out) < 1 { + continue + } + + // the first line is a header. verify the columns we expect, just in case the format ever changes. + header := strings.Fields(out[0]) + if len(header) < 1 || header[0] != "REF" { + pterm.Debug.Printfln("unexpected format from ctr image list. skipping node %s.", n) + continue + } + + // skip the first line, which is a header. + for _, l := range out[1:] { + ref := strings.Fields(l)[0] + pterm.Debug.Printfln("found existing image with ref %s", ref) + existingByNode[ref] += 1 + } + } + + existing := common.Set[string]{} + for ref, count := range existingByNode { + if count == len(nodes) { + existing.Add(ref) + } + } + return existing +} + +// determineImagesForLoading gets the IDs of the desired images (using "docker images"), +// subtracts the images that already exist on the nodes, and returns the resulting list. +func determineImagesForLoading(ctx context.Context, dockerClient docker.Client, images []string, nodes []nodeslib.Node) []string { + + // Get the digests of the images that already exist on the nodes. + existing := getExistingImageDigests(ctx, nodes) + if existing.Len() == 0 { + return images + } + + // Get the digests of the requested images, so we can compare them to the existing images. + imgFilter := filters.NewArgs() + for _, img := range images { + imgFilter.Add("reference", img) + } + + imgList, err := dockerClient.ImageList(ctx, image.ListOptions{Filters: imgFilter}) + if err != nil { + // ignore errors from the image digest list – it's an optimization. + pterm.Debug.Printfln("error getting image digests: %s", err) + return images + } + + // Subtract the images that already exist on the nodes. + var needed []string + for _, img := range imgList { + if !existing.Contains(img.ID) { + pterm.Debug.Printfln("image does not exist: %s %v", img.ID, img.RepoTags) + for _, tag := range img.RepoTags { + needed = append(needed, tag) + } + } else { + pterm.Debug.Printfln("image already exists: %s", img.ID) + } + } + return needed +} + +func saveImageArchive(ctx context.Context, dockerClient docker.Client, images []string) (string, error) { + + // Setup the tar path where the images will be saved. + dir, err := fs.TempDir("", "images-tar-") + if err != nil { + return "", err + } + + imagesTarPath := filepath.Join(dir, "images.tar") + pterm.Debug.Printfln("saving image archive to %s", imagesTarPath) + + wf, err := os.Create(imagesTarPath) + if err != nil { + return "", err + } + defer wf.Close() + + r, err := dockerClient.ImageSave(ctx, images) + if err != nil { + return "", err + } + + if _, err := io.Copy(wf, r); err != nil { + return "", err + } + + return imagesTarPath, nil +} diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index a365237..aae939b 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -161,7 +161,6 @@ func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *Ins return } - pterm.Info.Println("pulling airbyte images") cluster.LoadImages(ctx, c.docker.Client, manifest) } diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index ed7bcdd..d33f113 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -164,8 +164,9 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t return fmt.Errorf("unable to initialize local command: %w", err) } + spinner.UpdateText("Pulling images") lc.PrepImages(ctx, cluster, opts) - + if err := lc.Install(ctx, opts); err != nil { spinner.Fail("Unable to install Airbyte locally") return err diff --git a/internal/common/set.go b/internal/common/set.go new file mode 100644 index 0000000..5443f55 --- /dev/null +++ b/internal/common/set.go @@ -0,0 +1,32 @@ +package common + +type Set[T comparable] struct { + vals map[T]struct{} +} + +func (s *Set[T]) Add(v T) { + if s.vals == nil { + s.vals = map[T]struct{}{} + } + s.vals[v] = struct{}{} +} + +func (s *Set[T]) Contains(v T) bool { + if s.vals == nil { + return false + } + _, ok := s.vals[v] + return ok +} + +func (s *Set[T]) Len() int { + return len(s.vals) +} + +func (s *Set[T]) Items() []T { + out := make([]T, len(s.vals)) + for k := range s.vals { + out = append(out, k) + } + return out +} From 807335f1aa652a78ec2b2eeed5dbdf526041b864 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Tue, 5 Nov 2024 11:10:03 -0800 Subject: [PATCH 8/8] cleanup --- internal/cmd/local/k8s/load_images.go | 28 ++++++--------------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/internal/cmd/local/k8s/load_images.go b/internal/cmd/local/k8s/load_images.go index 2caeffe..c898cd5 100644 --- a/internal/cmd/local/k8s/load_images.go +++ b/internal/cmd/local/k8s/load_images.go @@ -56,32 +56,12 @@ func loadImages(ctx context.Context, dockerClient docker.Client, nodes []nodesli return nil } - // for _, node := range nodes { - // for _, img := range needed { - - // pterm.Debug.Printfln("saving image %s into node %s", img, node) - - // r, err := dockerClient.ImageSave(ctx, images) - // if err != nil { - // pterm.Debug.Printfln("error loading image %s into node %s: %s", img, node, err) - // continue - // } - - // pterm.Debug.Printfln("loading image %s into node %s", img, node) - - // err = nodeutils.LoadImageArchive(node, r) - // if err != nil { - // pterm.Debug.Printfln("error loading image %s into node %s: %s", img, node, err) - // } - // } - // } - // Save all the images to an archive, images.tar imagesTarPath, err := saveImageArchive(ctx, dockerClient, needed) if err != nil { return fmt.Errorf("failed to save image archive: %w", err) } - // defer os.RemoveAll(imagesTarPath) + defer os.RemoveAll(imagesTarPath) // Load the image archive into the Kind nodes. f, err := os.Open(imagesTarPath) @@ -125,7 +105,11 @@ func getExistingImageDigests(ctx context.Context, nodes []nodeslib.Node) common. // skip the first line, which is a header. for _, l := range out[1:] { - ref := strings.Fields(l)[0] + fields := strings.Fields(l) + if len(fields) < 1 { + continue + } + ref := fields[0] pterm.Debug.Printfln("found existing image with ref %s", ref) existingByNode[ref] += 1 }