Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support image cache copying #9845

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/resource/definitions/cri/cri.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "resource/definitions/enums/enums.proto";
message ImageCacheConfigSpec {
talos.resource.definitions.enums.CriImageCacheStatus status = 1;
repeated string roots = 2;
talos.resource.definitions.enums.CriImageCacheCopyStatus copy_status = 3;
}

// RegistriesConfigSpec describes status of rendered secrets.
Expand Down
8 changes: 8 additions & 0 deletions api/resource/definitions/enums/enums.proto
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,14 @@ enum CriImageCacheStatus {
IMAGE_CACHE_STATUS_READY = 3;
}

// CriImageCacheCopyStatus describes image cache copy status type.
enum CriImageCacheCopyStatus {
IMAGE_CACHE_COPY_STATUS_UNKNOWN = 0;
IMAGE_CACHE_COPY_STATUS_SKIPPED = 1;
IMAGE_CACHE_COPY_STATUS_PENDING = 2;
IMAGE_CACHE_COPY_STATUS_READY = 3;
}

// KubespanPeerState is KubeSpan peer current state.
enum KubespanPeerState {
PEER_STATE_UNKNOWN = 0;
Expand Down
5 changes: 5 additions & 0 deletions hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ search default.svc.cluster.local svc.cluster.local cluster.local my-custom-searc
nameserver 10.96.0.10
options ndots:5
```
"""

[notes.image-cache]
title = "Image Cache"
description = """\
Talos now supports providing a local [Image Cache](https://www.talos.dev/v1.9/talos-guides/configuration/image-cache/) for container images.
"""

[make_deps]
Expand Down
264 changes: 240 additions & 24 deletions internal/app/machined/pkg/controllers/cri/image_cache_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ package cri
import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/dustin/go-humanize"
"github.com/google/cel-go/common/operators"
"github.com/google/cel-go/common/types"
"github.com/siderolabs/gen/optional"
Expand Down Expand Up @@ -42,6 +46,10 @@ type ServiceManager interface {
type ImageCacheConfigController struct {
V1Alpha1ServiceManager ServiceManager
VolumeMounter func(label string, opts ...mountv2.NewPointOption) error

DisableCacheCopy bool // used for testing

cacheCopyDone bool
}

// Name implements controller.StatsController interface.
Expand Down Expand Up @@ -100,7 +108,7 @@ const (
// Run implements controller.StatsController interface.
//
//nolint:gocyclo,cyclop
func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
Expand All @@ -122,12 +130,15 @@ func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Ru
imageCacheDisabled := cfg == nil || cfg.Config().Machine() == nil || !cfg.Config().Machine().Features().ImageCache().LocalEnabled()

var (
status cri.ImageCacheStatus
roots []string
status cri.ImageCacheStatus
copyStatus cri.ImageCacheCopyStatus
roots []string
allReady bool
)

if imageCacheDisabled {
status = cri.ImageCacheStatusDisabled
copyStatus = cri.ImageCacheCopyStatusSkipped
} else {
status = cri.ImageCacheStatusPreparing

Expand All @@ -140,22 +151,15 @@ func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Ru
return fmt.Errorf("error creating volume config: %w", err)
}

allReady := false

// analyze volume statuses, and build the roots
for _, volumeID := range []string{VolumeImageCacheISO, VolumeImageCacheDISK} {
root, ready, err := ctrl.getImageCacheRoot(ctx, r, volumeID)
if err != nil {
return fmt.Errorf("error getting image cache root: %w", err)
}

allReady = allReady && ready

if rootPath, ok := root.Get(); ok {
roots = append(roots, rootPath)
}
cacheVolumeStatus, err := ctrl.analyzeImageCacheVolumes(ctx, logger, r)
if err != nil {
return fmt.Errorf("error analyzing image cache volumes: %w", err)
}

allReady = cacheVolumeStatus.allReady
roots = cacheVolumeStatus.roots
copyStatus = cacheVolumeStatus.copyStatus

if allReady && len(roots) == 0 {
// all volumes identified, but no roots found
status = cri.ImageCacheStatusDisabled
Expand All @@ -181,6 +185,7 @@ func (ctrl *ImageCacheConfigController) Run(ctx context.Context, r controller.Ru

if err = safe.WriterModify(ctx, r, cri.NewImageCacheConfig(), func(cfg *cri.ImageCacheConfig) error {
cfg.TypedSpec().Status = status
cfg.TypedSpec().CopyStatus = copyStatus
cfg.TypedSpec().Roots = roots

return nil
Expand Down Expand Up @@ -272,6 +277,7 @@ func (ctrl *ImageCacheConfigController) createVolumeConfigDisk(ctx context.Conte
}

if extraCfg, ok := cfg.Volumes().ByName(constants.ImageCachePartitionLabel); ok {
volumeCfg.TypedSpec().Provisioning.Wave = block.WaveSystemDisk
volumeCfg.TypedSpec().Provisioning.DiskSelector.Match = extraCfg.Provisioning().DiskSelector().ValueOr(*diskExpr)
volumeCfg.TypedSpec().Provisioning.PartitionSpec.Grow = extraCfg.Provisioning().Grow().ValueOr(false)
volumeCfg.TypedSpec().Provisioning.PartitionSpec.MinSize = extraCfg.Provisioning().MinSize().ValueOr(MinImageCacheSize)
Expand All @@ -287,16 +293,116 @@ func (ctrl *ImageCacheConfigController) createVolumeConfigDisk(ctx context.Conte
})
}

func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r controller.Reader, volumeID string) (optional.Optional[string], bool, error) {
volumeStatus, err := safe.ReaderGetByID[*block.VolumeStatus](ctx, r, volumeID)
if err != nil {
if state.IsNotFoundError(err) {
return optional.None[string](), false, nil
type imageCacheVolumeStatus struct {
roots []string
allReady bool
copyStatus cri.ImageCacheCopyStatus
}

//nolint:gocyclo,cyclop
func (ctrl *ImageCacheConfigController) analyzeImageCacheVolumes(ctx context.Context, logger *zap.Logger, r controller.Reader) (*imageCacheVolumeStatus, error) {
volumeIDs := []string{VolumeImageCacheDISK, VolumeImageCacheISO} // prefer disk cache over ISO cache
volumeStatuses := make([]*block.VolumeStatus, 0, len(volumeIDs))

for _, volumeID := range volumeIDs {
volumeStatus, err := safe.ReaderGetByID[*block.VolumeStatus](ctx, r, volumeID)
if err != nil {
if state.IsNotFoundError(err) {
// wait for volume statuses to be present
return &imageCacheVolumeStatus{}, nil
}

return nil, fmt.Errorf("error getting volume status: %w", err)
}

volumeStatuses = append(volumeStatuses, volumeStatus)
}

// we need to ensure that we first wait for the ISO to be either missing or ready,
// so that we can make a decision on copying the image cache from an ISO to the disk volume
var isoStatus, diskStatus block.VolumePhase

for _, volumeStatus := range volumeStatuses {
switch volumeStatus.Metadata().ID() {
case VolumeImageCacheISO:
isoStatus = volumeStatus.TypedSpec().Phase
case VolumeImageCacheDISK:
diskStatus = volumeStatus.TypedSpec().Phase
}
}

if isoStatus != block.VolumePhaseMissing && isoStatus != block.VolumePhaseReady {
return &imageCacheVolumeStatus{}, nil
}

isoPresent := isoStatus == block.VolumePhaseReady
diskMissing := diskStatus == block.VolumePhaseMissing

roots := make([]string, 0, len(volumeIDs))

var (
allReady, isoReady, diskReady bool
copySource, copyTarget string
)

// analyze volume statuses, and build the roots
for _, volumeStatus := range volumeStatuses {
DmitriyMV marked this conversation as resolved.
Show resolved Hide resolved
// mount as rw only disk cache if the ISO cache is present
root, ready, err := ctrl.getImageCacheRoot(ctx, r, volumeStatus, !(volumeStatus.Metadata().ID() == VolumeImageCacheDISK && isoPresent))
if err != nil {
return nil, fmt.Errorf("error getting image cache root: %w", err)
}

if ready {
switch volumeStatus.Metadata().ID() {
case VolumeImageCacheISO:
isoReady = true
copySource = root.ValueOr("")
case VolumeImageCacheDISK:
diskReady = true
copyTarget = root.ValueOr("")
}
}

allReady = allReady && ready
DmitriyMV marked this conversation as resolved.
Show resolved Hide resolved

if rootPath, ok := root.Get(); ok {
roots = append(roots, rootPath)
}
}

var copyStatus cri.ImageCacheCopyStatus

switch {
case !isoPresent:
// if there's no ISO, we don't need to copy anything
copyStatus = cri.ImageCacheCopyStatusSkipped
case diskMissing:
// if the disk volume is not configured, we can't copy the image cache
copyStatus = cri.ImageCacheCopyStatusSkipped
case ctrl.cacheCopyDone:
// if the copy has already been done, we don't need to do it again
copyStatus = cri.ImageCacheCopyStatusReady
case isoReady && diskReady:
// ready to copy
if err := ctrl.copyImageCache(ctx, logger, copySource, copyTarget); err != nil {
return nil, fmt.Errorf("error copying image cache: %w", err)
}

return optional.None[string](), false, fmt.Errorf("error getting volume status: %w", err)
copyStatus = cri.ImageCacheCopyStatusReady
default:
// waiting for copy preconditions
copyStatus = cri.ImageCacheCopyStatusPending
}

return &imageCacheVolumeStatus{
roots: roots,
allReady: allReady,
copyStatus: copyStatus,
}, nil
}

func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r controller.Reader, volumeStatus *block.VolumeStatus, mountReadyOnly bool) (optional.Optional[string], bool, error) {
DmitriyMV marked this conversation as resolved.
Show resolved Hide resolved
switch volumeStatus.TypedSpec().Phase { //nolint:exhaustive
case block.VolumePhaseMissing:
// image cache is missing
Expand All @@ -308,12 +414,20 @@ func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r
return optional.None[string](), false, nil
}

volumeID := volumeStatus.Metadata().ID()

volumeConfig, err := safe.ReaderGetByID[*block.VolumeConfig](ctx, r, volumeID)
if err != nil {
return optional.None[string](), false, fmt.Errorf("error getting volume config: %w", err)
}

if err = ctrl.VolumeMounter(volumeID, mountv2.WithReadonly()); err != nil {
var mountOpts []mountv2.NewPointOption

if mountReadyOnly {
mountOpts = append(mountOpts, mountv2.WithReadonly())
}

if err = ctrl.VolumeMounter(volumeID, mountOpts...); err != nil {
return optional.None[string](), false, fmt.Errorf("error mounting volume: %w", err)
}

Expand All @@ -326,3 +440,105 @@ func (ctrl *ImageCacheConfigController) getImageCacheRoot(ctx context.Context, r

return optional.Some(targetPath), true, nil
}

func (ctrl *ImageCacheConfigController) copyImageCache(ctx context.Context, logger *zap.Logger, source, target string) error {
logger.Info("copying image cache", zap.String("source", source), zap.String("target", target))

if ctrl.DisableCacheCopy {
// used for testing
return nil
}

var bytesCopied int64

if err := filepath.WalkDir(source, func(path string, d fs.DirEntry, err error) error {
DmitriyMV marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error walking source directory: %w", err)
}

select {
case <-ctx.Done():
return ctx.Err()
default:
}

relPath, err := filepath.Rel(source, path)
if err != nil {
return fmt.Errorf("error getting relative path: %w", err)
}

targetPath := filepath.Join(target, relPath)

info, err := d.Info()
if err != nil {
return fmt.Errorf("error getting file info: %w", err)
}

// we only support directories and files
switch {
case info.Mode().IsDir():
if err := os.MkdirAll(targetPath, 0o755); err != nil {
return fmt.Errorf("error creating directory: %w", err)
}

return nil
case info.Mode().IsRegular():
bytesCopied += info.Size()

return copyFileSafe(path, targetPath)
default:
return fmt.Errorf("unsupported file type %s: %s", info.Mode(), path)
}
}); err != nil {
return fmt.Errorf("error copying image cache: %w", err)
}

logger.Info("image cache copied", zap.String("size", humanize.IBytes(uint64(bytesCopied))))

ctrl.cacheCopyDone = true

return nil
}

func copyFileSafe(src, dst string) error {
srcStat, err := os.Stat(src)
if err != nil {
return fmt.Errorf("error getting source file info: %w", err)
}

dstStat, err := os.Stat(dst)
if err == nil && srcStat.Size() == dstStat.Size() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two files could haave the same size, but hashing is expensive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be outside of our control I guess, we just try to skip copying what is already there

// skipping copy
return nil
}

srcFile, err := os.Open(src)
if err != nil {
return fmt.Errorf("error opening source file: %w", err)
}

defer srcFile.Close() //nolint:errcheck

tempPath := dst + ".tmp"
DmitriyMV marked this conversation as resolved.
Show resolved Hide resolved

dstFile, err := os.Create(tempPath)
if err != nil {
return fmt.Errorf("error creating destination file: %w", err)
}

defer dstFile.Close() //nolint:errcheck

if _, err = io.Copy(dstFile, srcFile); err != nil {
return fmt.Errorf("error copying file: %w", err)
}

if err = dstFile.Close(); err != nil {
return fmt.Errorf("error closing destination file: %w", err)
}

if err = os.Rename(tempPath, dst); err != nil {
return fmt.Errorf("error renaming file: %w", err)
}

return nil
}
Loading