Skip to content

Commit

Permalink
only load images that don't already exist on the node
Browse files Browse the repository at this point in the history
  • Loading branch information
abuchanan-airbyte committed Nov 5, 2024
1 parent 90c6ec7 commit 617fa21
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 107 deletions.
29 changes: 5 additions & 24 deletions internal/cmd/images/manifest_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string,
// It returns a unique, sorted list of images found.
func findAllImages(chartYaml string) []string {
objs := decodeK8sResources(chartYaml)
imageSet := set[string]{}
imageSet := common.Set[string]{}

for _, obj := range objs {

Expand All @@ -101,7 +101,7 @@ func findAllImages(chartYaml string) []string {
if strings.HasSuffix(z.Name, "airbyte-env") {
for k, v := range z.Data {
if strings.HasSuffix(k, "_IMAGE") {
imageSet.add(v)
imageSet.Add(v)
}
}
}
Expand All @@ -119,15 +119,15 @@ func findAllImages(chartYaml string) []string {
}

for _, c := range podSpec.InitContainers {
imageSet.add(c.Image)
imageSet.Add(c.Image)
}
for _, c := range podSpec.Containers {
imageSet.add(c.Image)
imageSet.Add(c.Image)
}
}

var out []string
for _, k := range imageSet.items() {
for _, k := range imageSet.Items() {
if k != "" {
out = append(out, k)
}
Expand All @@ -152,22 +152,3 @@ func decodeK8sResources(renderedYaml string) []runtime.Object {
}
return out
}

type set[T comparable] struct {
vals map[T]struct{}
}

func (s *set[T]) add(v T) {
if s.vals == nil {
s.vals = map[T]struct{}{}
}
s.vals[v] = struct{}{}
}

func (s *set[T]) items() []T {
out := make([]T, len(s.vals))
for k := range s.vals {
out = append(out, k)
}
return out
}
85 changes: 4 additions & 81 deletions internal/cmd/local/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,17 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/docker"
"github.com/airbytehq/abctl/internal/cmd/local/k8s/kind"
"github.com/airbytehq/abctl/internal/cmd/local/paths"
"github.com/airbytehq/abctl/internal/trace"
"github.com/docker/docker/api/types/image"
"github.com/pterm/pterm"
"gopkg.in/yaml.v3"
"sigs.k8s.io/kind/pkg/cluster"
"sigs.k8s.io/kind/pkg/cluster/nodeutils"
kindExec "sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/fs"
)

// ExtraVolumeMount defines a host volume mount for the Kind cluster
Expand Down Expand Up @@ -122,88 +116,17 @@ func (k *kindCluster) Exists(ctx context.Context) bool {
// This is a best-effort optimization, which is why it doesn't return an error.
// It's possible that only some images will be loaded.
func (k *kindCluster) LoadImages(ctx context.Context, dockerClient docker.Client, images []string) {
err := k.loadImages(ctx, dockerClient, images)
pterm.Debug.Printfln("failed to load images: %s", err)
}

func (k *kindCluster) loadImages(ctx context.Context, dockerClient docker.Client, images []string) error {
// Get a list of Kind nodes.
nodes, err := k.p.ListNodes(k.clusterName)
if err != nil {
return fmt.Errorf("listing nodes: %w", err)
}

// Pull all the images via "docker pull", in parallel.
var wg sync.WaitGroup
wg.Add(len(images))
for _, img := range images {
pterm.Debug.Printfln("pulling image %s", img)

go func(img string) {
defer wg.Done()
r, err := dockerClient.ImagePull(ctx, img, image.PullOptions{})
if err != nil {
pterm.Debug.Printfln("error pulling image %s", err)
// image pull errors are intentionally dropped because we're in a goroutine,
// and because we don't want to interrupt other image pulls.
}
defer r.Close()
}(img)
}
wg.Wait()

// The context could be canceled by now. If so, return early.
if ctx.Err() != nil {
return ctx.Err()
}

// Save all the images to an archive, images.tar
imagesTarPath, err := saveImageArchive(ctx, dockerClient, images)
if err != nil {
return fmt.Errorf("failed to save image archive: %w", err)
}

// Load the image archive into the Kind nodes.
f, err := os.Open(imagesTarPath)
if err != nil {
return err
}
defer f.Close()

for _, n := range nodes {
pterm.Debug.Printfln("loading image archive into kind node %s", n)
nodeutils.LoadImageArchive(n, f)
}
return nil
}

func saveImageArchive(ctx context.Context, dockerClient docker.Client, images []string) (string, error) {

// Setup the tar path where the images will be saved.
dir, err := fs.TempDir("", "images-tar-")
if err != nil {
return "", err
pterm.Debug.Printfln("failed to load images: %s", err)
return
}
defer os.RemoveAll(dir)
imagesTarPath := filepath.Join(dir, "images.tar")
pterm.Debug.Printfln("saving image archive to %s", imagesTarPath)

wf, err := os.Create(imagesTarPath)
err = loadImages(ctx, dockerClient, nodes, images)
if err != nil {
return "", err
pterm.Debug.Printfln("failed to load images: %s", err)
}
defer wf.Close()

r, err := dockerClient.ImageSave(ctx, images)
if err != nil {
return "", err
}

if _, err := io.Copy(wf, r); err != nil {
return "", err
}

return imagesTarPath, nil
}

func formatKindErr(err error) error {
Expand Down
208 changes: 208 additions & 0 deletions internal/cmd/local/k8s/load_images.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package k8s

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"

"github.com/airbytehq/abctl/internal/cmd/local/docker"
"github.com/airbytehq/abctl/internal/common"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/image"
"github.com/pterm/pterm"
nodeslib "sigs.k8s.io/kind/pkg/cluster/nodes"
"sigs.k8s.io/kind/pkg/cluster/nodeutils"
"sigs.k8s.io/kind/pkg/exec"
"sigs.k8s.io/kind/pkg/fs"
)

// loadImages pulls and loads images into the kind cluster.
// It will pull all images in parallel, skip any images that already exist on the nodes,
// save the rest to an image archive (tar file), and load archive onto the nodes.
func loadImages(ctx context.Context, dockerClient docker.Client, nodes []nodeslib.Node, images []string) error {

// Pull all the images via "docker pull", in parallel.
var wg sync.WaitGroup
wg.Add(len(images))
for _, img := range images {
pterm.Info.Printfln("Pulling image %s", img)

go func(img string) {
defer wg.Done()
r, err := dockerClient.ImagePull(ctx, img, image.PullOptions{})
if err != nil {
pterm.Debug.Printfln("error pulling image %s", err)
// image pull errors are intentionally dropped because we're in a goroutine,
// and because we don't want to interrupt other image pulls.
}
defer r.Close()
io.Copy(io.Discard, r)
}(img)
}
wg.Wait()

// The context could be canceled by now. If so, return early.
if ctx.Err() != nil {
return ctx.Err()
}

// Determine which images need to be loaded onto the nodes.
needed := determineImagesForLoading(ctx, dockerClient, images, nodes)
if len(needed) == 0 {
return nil
}

// for _, node := range nodes {
// for _, img := range needed {

// pterm.Debug.Printfln("saving image %s into node %s", img, node)

// r, err := dockerClient.ImageSave(ctx, images)
// if err != nil {
// pterm.Debug.Printfln("error loading image %s into node %s: %s", img, node, err)
// continue
// }

// pterm.Debug.Printfln("loading image %s into node %s", img, node)

// err = nodeutils.LoadImageArchive(node, r)
// if err != nil {
// pterm.Debug.Printfln("error loading image %s into node %s: %s", img, node, err)
// }
// }
// }

// Save all the images to an archive, images.tar
imagesTarPath, err := saveImageArchive(ctx, dockerClient, needed)
if err != nil {
return fmt.Errorf("failed to save image archive: %w", err)
}
// defer os.RemoveAll(imagesTarPath)

// Load the image archive into the Kind nodes.
f, err := os.Open(imagesTarPath)
if err != nil {
return fmt.Errorf("failed to open image archive: %w", err)
}
defer f.Close()

for _, n := range nodes {
pterm.Debug.Printfln("loading image archive into kind node %s", n)
err := nodeutils.LoadImageArchive(n, f)
if err != nil {
pterm.Debug.Printfln("%s", err)
}
}
return nil
}

// getExistingImageDigests returns the set of images that already exist on the nodes.
func getExistingImageDigests(ctx context.Context, nodes []nodeslib.Node) common.Set[string] {
existingByNode := map[string]int{}

for _, n := range nodes {

out, err := exec.CombinedOutputLines(n.CommandContext(ctx, "ctr", "--namespace=k8s.io", "images", "list"))
if err != nil {
// ignore the error because discovering these images is just an optimization.
pterm.Debug.Printfln("error discovering existing images: %s %s", err, out)
continue
}
if len(out) < 1 {
continue
}

// the first line is a header. verify the columns we expect, just in case the format ever changes.
header := strings.Fields(out[0])
if len(header) < 1 || header[0] != "REF" {
pterm.Debug.Printfln("unexpected format from ctr image list. skipping node %s.", n)
continue
}

// skip the first line, which is a header.
for _, l := range out[1:] {
ref := strings.Fields(l)[0]
pterm.Debug.Printfln("found existing image with ref %s", ref)
existingByNode[ref] += 1
}
}

existing := common.Set[string]{}
for ref, count := range existingByNode {
if count == len(nodes) {
existing.Add(ref)
}
}
return existing
}

// determineImagesForLoading gets the IDs of the desired images (using "docker images"),
// subtracts the images that already exist on the nodes, and returns the resulting list.
func determineImagesForLoading(ctx context.Context, dockerClient docker.Client, images []string, nodes []nodeslib.Node) []string {

// Get the digests of the images that already exist on the nodes.
existing := getExistingImageDigests(ctx, nodes)
if existing.Len() == 0 {
return images
}

// Get the digests of the requested images, so we can compare them to the existing images.
imgFilter := filters.NewArgs()
for _, img := range images {
imgFilter.Add("reference", img)
}

imgList, err := dockerClient.ImageList(ctx, image.ListOptions{Filters: imgFilter})
if err != nil {
// ignore errors from the image digest list – it's an optimization.
pterm.Debug.Printfln("error getting image digests: %s", err)
return images
}

// Subtract the images that already exist on the nodes.
var needed []string
for _, img := range imgList {
if !existing.Contains(img.ID) {
pterm.Debug.Printfln("image does not exist: %s %v", img.ID, img.RepoTags)
for _, tag := range img.RepoTags {
needed = append(needed, tag)
}
} else {
pterm.Debug.Printfln("image already exists: %s", img.ID)
}
}
return needed
}

func saveImageArchive(ctx context.Context, dockerClient docker.Client, images []string) (string, error) {

// Setup the tar path where the images will be saved.
dir, err := fs.TempDir("", "images-tar-")
if err != nil {
return "", err
}

imagesTarPath := filepath.Join(dir, "images.tar")
pterm.Debug.Printfln("saving image archive to %s", imagesTarPath)

wf, err := os.Create(imagesTarPath)
if err != nil {
return "", err
}
defer wf.Close()

r, err := dockerClient.ImageSave(ctx, images)
if err != nil {
return "", err
}

if _, err := io.Copy(wf, r); err != nil {
return "", err
}

return imagesTarPath, nil
}
1 change: 0 additions & 1 deletion internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *Ins
return
}

pterm.Info.Println("pulling airbyte images")
cluster.LoadImages(ctx, c.docker.Client, manifest)
}

Expand Down
Loading

0 comments on commit 617fa21

Please sign in to comment.