Skip to content

Commit

Permalink
feat: support image cache copying
Browse files Browse the repository at this point in the history
Fixes #9615

The are no integration tests, this is to be addressed later.
I did manual tests so far.

Also includes first draft of the documentation.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Nov 29, 2024
1 parent 60e4561 commit 7208d00
Show file tree
Hide file tree
Showing 17 changed files with 840 additions and 206 deletions.
1 change: 1 addition & 0 deletions api/resource/definitions/cri/cri.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "resource/definitions/enums/enums.proto";
message ImageCacheConfigSpec {
talos.resource.definitions.enums.CriImageCacheStatus status = 1;
repeated string roots = 2;
talos.resource.definitions.enums.CriImageCacheCopyStatus copy_status = 3;
}

// RegistriesConfigSpec describes status of rendered secrets.
Expand Down
8 changes: 8 additions & 0 deletions api/resource/definitions/enums/enums.proto
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,14 @@ enum CriImageCacheStatus {
IMAGE_CACHE_STATUS_READY = 3;
}

// CriImageCacheCopyStatus describes image cache copy status type.
enum CriImageCacheCopyStatus {
IMAGE_CACHE_COPY_STATUS_UNKNOWN = 0;
IMAGE_CACHE_COPY_STATUS_SKIPPED = 1;
IMAGE_CACHE_COPY_STATUS_PENDING = 2;
IMAGE_CACHE_COPY_STATUS_READY = 3;
}

// KubespanPeerState is KubeSpan peer current state.
enum KubespanPeerState {
PEER_STATE_UNKNOWN = 0;
Expand Down
5 changes: 5 additions & 0 deletions hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ search default.svc.cluster.local svc.cluster.local cluster.local my-custom-searc
nameserver 10.96.0.10
options ndots:5
```
"""

[notes.image-cache]
title = "Image Cache"
description = """\
Talos now supports providing a local [Image Cache](https://www.talos.dev/v1.9/talos-guides/configuration/image-cache/) for container images.
"""

[make_deps]
Expand Down
264 changes: 240 additions & 24 deletions internal/app/machined/pkg/controllers/cri/image_cache_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ package cri
import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/dustin/go-humanize"
"github.com/google/cel-go/common/operators"
"github.com/google/cel-go/common/types"
"github.com/siderolabs/gen/optional"
Expand Down Expand Up @@ -42,6 +46,10 @@ type ServiceManager interface {
type ImageCacheConfigController struct {
V1Alpha1ServiceManager ServiceManager
VolumeMounter func(label string, opts ...mountv2.NewPointOption) error

DisableCacheCopy bool // used for testing

cacheCopyDone bool
}

// Name implements controller.StatsController interface.
Expand Down Expand Up @@ -100,7 +108,7 @@ const (
// Run implements controller.StatsController interface.
//
//nolint:gocyclo,cyclop
func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
Expand All @@ -122,12 +130,15 @@ func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Ru
imageCacheDisabled := cfg == nil || cfg.Config().Machine() == nil || !cfg.Config().Machine().Features().ImageCache().LocalEnabled()

var (
status cri.ImageCacheStatus
roots []string
status cri.ImageCacheStatus
copyStatus cri.ImageCacheCopyStatus
roots []string
allReady bool
)

if imageCacheDisabled {
status = cri.ImageCacheStatusDisabled
copyStatus = cri.ImageCacheCopyStatusSkipped
} else {
status = cri.ImageCacheStatusPreparing

Expand All @@ -140,22 +151,15 @@ func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Ru
return fmt.Errorf("error creating volume config: %w", err)
}

allReady := false

// analyze volume statuses, and build the roots
for _, volumeID := range []string{VolumeImageCacheISO, VolumeImageCacheDISK} {
root, ready, err := ctrl.getImageCacheRoot(ctx, r, volumeID)
if err != nil {
return fmt.Errorf("error getting image cache root: %w", err)
}

allReady = allReady && ready

if rootPath, ok := root.Get(); ok {
roots = append(roots, rootPath)
}
cacheVolumeStatus, err := ctrl.analyzeImageCacheVolumes(ctx, logger, r)
if err != nil {
return fmt.Errorf("error analyzing image cache volumes: %w", err)
}

allReady = cacheVolumeStatus.allReady
roots = cacheVolumeStatus.roots
copyStatus = cacheVolumeStatus.copyStatus

if allReady && len(roots) == 0 {
// all volumes identified, but no roots found
status = cri.ImageCacheStatusDisabled
Expand All @@ -181,6 +185,7 @@ func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Ru

if err = safe.WriterModify(ctx, r, cri.NewImageCacheConfig(), func(cfg *cri.ImageCacheConfig) error {
cfg.TypedSpec().Status = status
cfg.TypedSpec().CopyStatus = copyStatus
cfg.TypedSpec().Roots = roots

return nil
Expand Down Expand Up @@ -272,6 +277,7 @@ func (ctrl *ImageCacheConfigController) createVolumeConfigDisk(ctx context.Conte
}

if extraCfg, ok := cfg.Volumes().ByName(constants.ImageCachePartitionLabel); ok {
volumeCfg.TypedSpec().Provisioning.Wave = block.WaveSystemDisk
volumeCfg.TypedSpec().Provisioning.DiskSelector.Match = extraCfg.Provisioning().DiskSelector().ValueOr(*diskExpr)
volumeCfg.TypedSpec().Provisioning.PartitionSpec.Grow = extraCfg.Provisioning().Grow().ValueOr(false)
volumeCfg.TypedSpec().Provisioning.PartitionSpec.MinSize = extraCfg.Provisioning().MinSize().ValueOr(MinImageCacheSize)
Expand All @@ -287,16 +293,116 @@ func (ctrl *ImageCacheConfigController) createVolumeConfigDisk(ctx context.Conte
})
}

func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r controller.Reader, volumeID string) (optional.Optional[string], bool, error) {
volumeStatus, err := safe.ReaderGetByID[*block.VolumeStatus](ctx, r, volumeID)
if err != nil {
if state.IsNotFoundError(err) {
return optional.None[string](), false, nil
type imageCacheVolumeStatus struct {
roots []string
allReady bool
copyStatus cri.ImageCacheCopyStatus
}

//nolint:gocyclo,cyclop
func (ctrl *ImageCacheConfigController) analyzeImageCacheVolumes(ctx context.Context, logger *zap.Logger, r controller.Reader) (*imageCacheVolumeStatus, error) {
volumeIDs := []string{VolumeImageCacheDISK, VolumeImageCacheISO} // prefer disk cache over ISO cache
volumeStatuses := make([]*block.VolumeStatus, 0, len(volumeIDs))

for _, volumeID := range volumeIDs {
volumeStatus, err := safe.ReaderGetByID[*block.VolumeStatus](ctx, r, volumeID)
if err != nil {
if state.IsNotFoundError(err) {
// wait for volume statuses to be present
return &imageCacheVolumeStatus{}, nil
}

return nil, fmt.Errorf("error getting volume status: %w", err)
}

volumeStatuses = append(volumeStatuses, volumeStatus)
}

// we need to ensure that we first wait for the ISO to be either missing or ready,
// so that we can make a decision on copying the image cache from an ISO to the disk volume
var isoStatus, diskStatus block.VolumePhase

for _, volumeStatus := range volumeStatuses {
switch volumeStatus.Metadata().ID() {
case VolumeImageCacheISO:
isoStatus = volumeStatus.TypedSpec().Phase
case VolumeImageCacheDISK:
diskStatus = volumeStatus.TypedSpec().Phase
}
}

if isoStatus != block.VolumePhaseMissing && isoStatus != block.VolumePhaseReady {
return &imageCacheVolumeStatus{}, nil
}

isoPresent := isoStatus == block.VolumePhaseReady
diskMissing := diskStatus == block.VolumePhaseMissing

roots := make([]string, 0, len(volumeIDs))

var (
allReady, isoReady, diskReady bool
copySource, copyTarget string
)

// analyze volume statuses, and build the roots
for _, volumeStatus := range volumeStatuses {
// mount as rw only disk cache if the ISO cache is present
root, ready, err := ctrl.getImageCacheRoot(ctx, r, volumeStatus, !(volumeStatus.Metadata().ID() == VolumeImageCacheDISK && isoPresent))
if err != nil {
return nil, fmt.Errorf("error getting image cache root: %w", err)
}

if ready {
switch volumeStatus.Metadata().ID() {
case VolumeImageCacheISO:
isoReady = true
copySource = root.ValueOr("")
case VolumeImageCacheDISK:
diskReady = true
copyTarget = root.ValueOr("")
}
}

allReady = allReady && ready

if rootPath, ok := root.Get(); ok {
roots = append(roots, rootPath)
}
}

var copyStatus cri.ImageCacheCopyStatus

switch {
case !isoPresent:
// if there's no ISO, we don't need to copy anything
copyStatus = cri.ImageCacheCopyStatusSkipped
case diskMissing:
// if the disk volume is not configured, we can't copy the image cache
copyStatus = cri.ImageCacheCopyStatusSkipped
case ctrl.cacheCopyDone:
// if the copy has already been done, we don't need to do it again
copyStatus = cri.ImageCacheCopyStatusReady
case isoReady && diskReady:
// ready to copy
if err := ctrl.copyImageCache(ctx, logger, copySource, copyTarget); err != nil {
return nil, fmt.Errorf("error copying image cache: %w", err)
}

return optional.None[string](), false, fmt.Errorf("error getting volume status: %w", err)
copyStatus = cri.ImageCacheCopyStatusReady
default:
// waiting for copy preconditions
copyStatus = cri.ImageCacheCopyStatusPending
}

return &imageCacheVolumeStatus{
roots: roots,
allReady: allReady,
copyStatus: copyStatus,
}, nil
}

func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r controller.Reader, volumeStatus *block.VolumeStatus, mountReadyOnly bool) (optional.Optional[string], bool, error) {
switch volumeStatus.TypedSpec().Phase { //nolint:exhaustive
case block.VolumePhaseMissing:
// image cache is missing
Expand All @@ -308,12 +414,20 @@ func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r
return optional.None[string](), false, nil
}

volumeID := volumeStatus.Metadata().ID()

volumeConfig, err := safe.ReaderGetByID[*block.VolumeConfig](ctx, r, volumeID)
if err != nil {
return optional.None[string](), false, fmt.Errorf("error getting volume config: %w", err)
}

if err = ctrl.VolumeMounter(volumeID, mountv2.WithReadonly()); err != nil {
var mountOpts []mountv2.NewPointOption

if mountReadyOnly {
mountOpts = append(mountOpts, mountv2.WithReadonly())
}

if err = ctrl.VolumeMounter(volumeID, mountOpts...); err != nil {
return optional.None[string](), false, fmt.Errorf("error mounting volume: %w", err)
}

Expand All @@ -326,3 +440,105 @@ func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r

return optional.Some(targetPath), true, nil
}

func (ctrl *ImageCacheConfigController) copyImageCache(ctx context.Context, logger *zap.Logger, source, target string) error {
logger.Info("copying image cache", zap.String("source", source), zap.String("target", target))

if ctrl.DisableCacheCopy {
// used for testing
return nil
}

var bytesCopied int64

if err := filepath.WalkDir(source, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return fmt.Errorf("error walking source directory: %w", err)
}

select {
case <-ctx.Done():
return ctx.Err()
default:
}

relPath, err := filepath.Rel(source, path)
if err != nil {
return fmt.Errorf("error getting relative path: %w", err)
}

targetPath := filepath.Join(target, relPath)

info, err := d.Info()
if err != nil {
return fmt.Errorf("error getting file info: %w", err)
}

// we only support directories and files
switch {
case info.Mode().IsDir():
if err := os.MkdirAll(targetPath, 0o755); err != nil {
return fmt.Errorf("error creating directory: %w", err)
}

return nil
case info.Mode().IsRegular():
bytesCopied += info.Size()

return copyFileSafe(path, targetPath)
default:
return fmt.Errorf("unsupported file type %s: %s", info.Mode(), path)
}
}); err != nil {
return fmt.Errorf("error copying image cache: %w", err)
}

logger.Info("image cache copied", zap.String("size", humanize.IBytes(uint64(bytesCopied))))

ctrl.cacheCopyDone = true

return nil
}

func copyFileSafe(src, dst string) error {
srcStat, err := os.Stat(src)
if err != nil {
return fmt.Errorf("error getting source file info: %w", err)
}

dstStat, err := os.Stat(dst)
if err == nil && srcStat.Size() == dstStat.Size() {
// skipping copy
return nil
}

srcFile, err := os.Open(src)
if err != nil {
return fmt.Errorf("error opening source file: %w", err)
}

defer srcFile.Close() //nolint:errcheck

tempPath := dst + ".tmp"

dstFile, err := os.Create(tempPath)
if err != nil {
return fmt.Errorf("error creating destination file: %w", err)
}

defer dstFile.Close() //nolint:errcheck

if _, err = io.Copy(dstFile, srcFile); err != nil {
return fmt.Errorf("error copying file: %w", err)
}

if err = dstFile.Close(); err != nil {
return fmt.Errorf("error closing destination file: %w", err)
}

if err = os.Rename(tempPath, dst); err != nil {
return fmt.Errorf("error renaming file: %w", err)
}

return nil
}
Loading

0 comments on commit 7208d00

Please sign in to comment.