From 1aac63c67636663abb9a123c82fb6e1525036678 Mon Sep 17 00:00:00 2001 From: Airren Date: Mon, 15 Apr 2024 15:37:04 +0800 Subject: [PATCH 1/3] feat: enhance orm by nri Signed-off-by: Airren --- .../app/options/orm/orm_base.go | 29 +- go.mod | 32 ++- go.sum | 65 +++-- pkg/agent/orm/checkpoint_test.go | 2 +- pkg/agent/orm/manager.go | 268 ++++++++++++++++-- pkg/agent/orm/manager_test.go | 261 +++++++++++++++++ pkg/agent/orm/pod_resource_test.go | 17 +- pkg/agent/orm/types.go | 7 + pkg/config/agent/orm/orm_base.go | 14 +- 9 files changed, 621 insertions(+), 74 deletions(-) diff --git a/cmd/katalyst-agent/app/options/orm/orm_base.go b/cmd/katalyst-agent/app/options/orm/orm_base.go index 380adce2d..c39c55532 100644 --- a/cmd/katalyst-agent/app/options/orm/orm_base.go +++ b/cmd/katalyst-agent/app/options/orm/orm_base.go @@ -25,7 +25,8 @@ import ( ) type GenericORMPluginOptions struct { - ORMRconcilePeriod time.Duration + ORMWorkMode string + ORMReconcilePeriod time.Duration ORMResourceNamesMap map[string]string ORMPodNotifyChanLen int TopologyPolicyName string @@ -33,11 +34,16 @@ type GenericORMPluginOptions struct { ORMPodResourcesSocket string ORMDevicesProvider string ORMKubeletPodResourcesEndpoints []string + ORMNRISocketPath string + ORMNRIPluginName string + ORMNRIPluginIndex string + ORMNRIHandleEvents string } func NewGenericORMPluginOptions() *GenericORMPluginOptions { return &GenericORMPluginOptions{ - ORMRconcilePeriod: time.Second * 5, + ORMWorkMode: "bypass", + ORMReconcilePeriod: time.Second * 5, ORMResourceNamesMap: map[string]string{}, ORMPodNotifyChanLen: 10, TopologyPolicyName: "", @@ -45,14 +51,19 @@ func NewGenericORMPluginOptions() *GenericORMPluginOptions { ORMPodResourcesSocket: "unix:/var/lib/katalyst/pod-resources/kubelet.sock", ORMDevicesProvider: "", ORMKubeletPodResourcesEndpoints: []string{"/var/lib/kubelet/pod-resources/kubelet.sock"}, + ORMNRISocketPath: "/var/run/nri/nri.sock", + ORMNRIPluginName: "orm", + ORMNRIPluginIndex: "00", + ORMNRIHandleEvents: "RunPodSandbox,CreateContainer,UpdateContainer,RemovePodSandbox", } } func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs := fss.FlagSet("orm") - fs.DurationVar(&o.ORMRconcilePeriod, "orm-reconcile-period", - o.ORMRconcilePeriod, "orm resource reconcile period") + fs.StringVar(&o.ORMWorkMode, "orm-work-mode", o.ORMWorkMode, "orm work mode, nri or bypass") + fs.DurationVar(&o.ORMReconcilePeriod, "orm-reconcile-period", + o.ORMReconcilePeriod, "orm resource reconcile period") fs.StringToStringVar(&o.ORMResourceNamesMap, "orm-resource-names-map", o.ORMResourceNamesMap, "A set of ResourceName=ResourceQuantity pairs that map resource name during out-of-band Resource Manager allocation period. "+ "e.g. 'resource.katalyst.kubewharf.io/reclaimed_millicpu=cpu,resource.katalyst.kubewharf.io/reclaimed_memory=memory' "+ @@ -70,10 +81,14 @@ func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { "devices provider provides devices resources and allocatable for ORM podResources api") fs.StringSliceVar(&o.ORMKubeletPodResourcesEndpoints, "orm-kubelet-pod-resources-endpoints", o.ORMKubeletPodResourcesEndpoints, "kubelet podResources endpoints for ORM kubelet devices provider") + fs.StringVar(&o.ORMNRIPluginName, "orm-nri-plugin-name", o.ORMNRIPluginName, "orm nri plugin name") + fs.StringVar(&o.ORMNRIPluginIndex, "orm-nri-plugin-index", o.ORMNRIPluginIndex, "orm nri plugin index") + fs.StringVar(&o.ORMNRIHandleEvents, "orm-nri-handle-events", o.ORMNRIHandleEvents, "orm nri handle events") } func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguration) error { - conf.ORMRconcilePeriod = o.ORMRconcilePeriod + conf.ORMWorkMode = o.ORMWorkMode + conf.ORMReconcilePeriod = o.ORMReconcilePeriod conf.ORMResourceNamesMap = o.ORMResourceNamesMap conf.ORMPodNotifyChanLen = o.ORMPodNotifyChanLen conf.TopologyPolicyName = o.TopologyPolicyName @@ -81,6 +96,10 @@ func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguratio conf.ORMPodResourcesSocket = o.ORMPodResourcesSocket conf.ORMDevicesProvider = o.ORMDevicesProvider conf.ORMKubeletPodResourcesEndpoints = o.ORMKubeletPodResourcesEndpoints + conf.ORMNRISocketPath = o.ORMNRISocketPath + conf.ORMNRIPluginName = o.ORMNRIPluginName + conf.ORMNRIPluginIndex = o.ORMNRIPluginIndex + conf.ORMNRIHandleEvents = o.ORMNRIHandleEvents return nil } diff --git a/go.mod b/go.mod index 9e4e89aba..e74a8765c 100644 --- a/go.mod +++ b/go.mod @@ -8,11 +8,12 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/cilium/ebpf v0.7.0 github.com/containerd/cgroups v1.0.1 + github.com/containerd/nri v0.6.0 github.com/evanphx/json-patch v5.6.0+incompatible github.com/fsnotify/fsnotify v1.5.4 github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 - github.com/golang/protobuf v1.5.2 + github.com/golang/protobuf v1.5.3 github.com/google/cadvisor v0.44.2 github.com/google/uuid v1.3.0 github.com/h2non/gock v1.2.0 @@ -38,10 +39,10 @@ require ( go.opentelemetry.io/otel/sdk/export/metric v0.20.0 go.opentelemetry.io/otel/sdk/metric v0.20.0 go.uber.org/atomic v1.9.0 - golang.org/x/sys v0.7.0 + golang.org/x/sys v0.13.0 golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 gonum.org/v1/gonum v0.8.2 - google.golang.org/grpc v1.51.0 + google.golang.org/grpc v1.57.1 gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.0.3 k8s.io/api v0.26.1 @@ -52,7 +53,7 @@ require ( k8s.io/client-go v0.26.1 k8s.io/component-base v0.25.0 k8s.io/component-helpers v0.24.16 - k8s.io/cri-api v0.24.6 + k8s.io/cri-api v0.25.3 k8s.io/klog/v2 v2.80.1 k8s.io/kube-aggregator v0.24.6 k8s.io/kubelet v0.24.6 @@ -71,7 +72,8 @@ require ( github.com/OneOfOne/xxhash v1.2.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/containerd/ttrpc v1.2.3-0.20231030150553-baadfd8e7956 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cyphar/filepath-securejoin v0.2.3 // indirect @@ -102,14 +104,14 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect github.com/moby/spdystream v0.2.0 // indirect - github.com/moby/sys/mountinfo v0.6.0 // indirect + github.com/moby/sys/mountinfo v0.6.2 // indirect github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/opencontainers/go-digest v1.0.0 // indirect - github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect + github.com/opencontainers/runtime-spec v1.0.3-0.20220825212826-86290f6a00fb // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect @@ -126,19 +128,21 @@ require ( go.opentelemetry.io/proto/otlp v0.7.0 // indirect go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.19.1 // indirect - golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect + golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect - golang.org/x/net v0.9.0 // indirect - golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/oauth2 v0.7.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/term v0.7.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect gomodules.xyz/jsonpatch/v3 v3.0.1 // indirect gomodules.xyz/orderedmap v0.1.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect - google.golang.org/protobuf v1.28.1 // indirect + google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 072c87335..5f15eed2b 100644 --- a/go.sum +++ b/go.sum @@ -21,14 +21,16 @@ cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKP cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= cloud.google.com/go v0.78.0/go.mod h1:QjdrLG0uq+YwhjoVOLsS1t7TW8fs36kLs4XO5R5ECHg= cloud.google.com/go v0.79.0/go.mod h1:3bzgcEeQlzbuEAYu4mrWhKqWjmpprinYgKJLgKHnbb8= -cloud.google.com/go v0.81.0 h1:at8Tk2zUz63cLPR0JPWm5vp77pEZmzxEQBEfRKn1VV8= cloud.google.com/go v0.81.0/go.mod h1:mk/AM35KwGk/Nm2YSeZbxXdrNK3KZOYHmLkOqC2V6E0= +cloud.google.com/go v0.110.4 h1:1JYyxKMN9hd5dR2MYTPWkGUgcoxVVhg0LKNKEo0qvmk= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= +cloud.google.com/go/compute v1.20.1 h1:6aKEtlUiwEpJzM001l0yFkpXmUVXaN8W+fbkb2AZNbg= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= @@ -133,8 +135,9 @@ github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6 github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5/go.mod h1:/iP1qXHoty45bqomnu2LM+VVyAEdWN+vtSHGlQgyxbw= github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -154,6 +157,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA= @@ -170,7 +174,11 @@ github.com/containerd/containerd v1.4.12/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtM github.com/containerd/continuity v0.1.0/go.mod h1:ICJu0PwR54nI0yPEnJ6jcS+J7CZAUXrLh8lPo2knzsM= github.com/containerd/fifo v1.0.0/go.mod h1:ocF/ME1SX5b1AOlWi9r677YJmCPSwwWnQ9O123vzpE4= github.com/containerd/go-runc v1.0.0/go.mod h1:cNU0ZbCgCQVZK4lgG3P+9tn9/PaJNmoDXPpoJhDR+Ok= +github.com/containerd/nri v0.6.0 h1:hdztxwL0gCS1CrCa9bvD1SoJiFN4jBuRQhplCvCPMj8= +github.com/containerd/nri v0.6.0/go.mod h1:F7OZfO4QTPqw5r87aq+syZJwiVvRYLIlHZiZDBV1W3A= github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= +github.com/containerd/ttrpc v1.2.3-0.20231030150553-baadfd8e7956 h1:BQwXCrKPRdDQvTYfiDatp36FIH/EF7JTBOZU+EPIKWY= +github.com/containerd/ttrpc v1.2.3-0.20231030150553-baadfd8e7956/go.mod h1:ieWsXucbb8Mj9PH0rXCw1i8IunRbbAiDkpXkbfflWBM= github.com/containerd/typeurl v1.0.2/go.mod h1:9trJWW2sRlGub4wZJRTW83VtbOLS6hwcDZXTn6oPz9s= github.com/coredns/caddy v1.1.0/go.mod h1:A6ntJQlAWuQfFlsd9hvigKbo2WS0VUs2l1e2F+BawD4= github.com/coredns/corefile-migration v1.0.14/go.mod h1:XnhgULOEouimnzgn0t4WPuFDN2/PJQcTxdWKC5eXNGE= @@ -240,6 +248,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -369,8 +378,9 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2/go.mod h1:k9Qvh+8juN+UKMCS/3jFtGICgW8O96FVaZsaxdzDkR4= github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a/go.mod h1:ryS0uhF+x9jgbj/N71xsEqODy9BN81/GonCZiOzirOk= @@ -618,8 +628,9 @@ github.com/moby/ipvs v1.0.1/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hx github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/moby/sys/mountinfo v0.5.0/go.mod h1:3bMD3Rg+zkqx8MRYPi7Pyb0Ie97QEBmdxbhnCLlSvSU= -github.com/moby/sys/mountinfo v0.6.0 h1:gUDhXQx58YNrpHlK4nSL+7y2pxFZkUcXqzFDKWdC0Oo= github.com/moby/sys/mountinfo v0.6.0/go.mod h1:3bMD3Rg+zkqx8MRYPi7Pyb0Ie97QEBmdxbhnCLlSvSU= +github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= +github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -671,11 +682,12 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108 github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo/v2 v2.5.0 h1:TRtrvv2vdQqzkwrQ1ke6vtXf7IK34RBUJafIy1wMwls= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE= +github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= @@ -686,8 +698,9 @@ github.com/opencontainers/runc v1.1.6 h1:XbhB8IfG/EsnhNvZtNdLB0GBw92GYEFvKlhaJk9 github.com/opencontainers/runc v1.1.6/go.mod h1:CbUumNnWCuTGFukNXahoo/RFBZvDAgRh/smNYNOhA50= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.0.3-0.20200929063507-e6143ca7d51d/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= -github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 h1:3snG66yBm59tKhhSPQrQ/0bCrv1LQbKt40LnUPiUxdc= github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/opencontainers/runtime-spec v1.0.3-0.20220825212826-86290f6a00fb h1:1xSVPOd7/UA+39/hXEGnBJ13p6JFB0E1EvQFlrRDOXI= +github.com/opencontainers/runtime-spec v1.0.3-0.20220825212826-86290f6a00fb/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.10.0 h1:rAiKF8hTcgLI3w0DHm6i0ylVVcOrlgR1kK99DRLDhyU= github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= @@ -1002,8 +1015,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1111,8 +1124,8 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.3.1-0.20221206200815-1e63c2f08a10/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1128,8 +1141,9 @@ golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b h1:clP8eMhB30EHdc0bd2Twtq6kgU7yl5ub2cQLSdrv1Dg= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= +golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= +golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1236,13 +1250,13 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= -golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= -golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1254,8 +1268,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1448,8 +1462,12 @@ google.golang.org/genproto v0.0.0-20210429181445-86c259c2b4ab/go.mod h1:P3QM42oQ google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 h1:hrbNEivu7Zn1pxvHk6MBrq9iE22woVILTHqexqBxe6I= -google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= +google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 h1:Au6te5hbKUV8pIYWHqOUZ1pva5qK/rwbIhoXEUB9Lu8= +google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:O9kGHb51iE/nOGvQaDUuadVYqovW56s5emA88lQnj6Y= +google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 h1:s5YSX+ZH5b5vS9rnpGymvIyMpLRJizowqDlOuyjXnTk= +google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d h1:pgIUhmqwKOUlnKna4r6amKdUngdL8DrkpFeV8+VBElY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= @@ -1477,9 +1495,9 @@ google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= -google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U= google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= +google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg= +google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1493,9 +1511,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/agent/orm/checkpoint_test.go b/pkg/agent/orm/checkpoint_test.go index efc446959..6c7398a2a 100644 --- a/pkg/agent/orm/checkpoint_test.go +++ b/pkg/agent/orm/checkpoint_test.go @@ -46,7 +46,7 @@ func TestCheckpoint(t *testing.T) { file := m.checkpointFile() assert.Equal(t, file, "/tmp/checkpoint/kubelet_qrm_checkpoint") - allocationInfo := generateResourceAllocationInfo() + allocationInfo := generateCpuSetCpusAllocationInfo() m.podResources.insert("testPod", "testContainer", "cpu", allocationInfo) err = m.writeCheckpoint() diff --git a/pkg/agent/orm/manager.go b/pkg/agent/orm/manager.go index f1d9156f2..615c4f897 100644 --- a/pkg/agent/orm/manager.go +++ b/pkg/agent/orm/manager.go @@ -26,12 +26,14 @@ import ( "sync" "time" + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nri/pkg/stub" "github.com/opencontainers/selinux/go-selinux" - "k8s.io/klog/v2" - "google.golang.org/grpc" + "gopkg.in/yaml.v3" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" "k8s.io/kubelet/pkg/apis/pluginregistration/v1" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -44,9 +46,11 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/orm/server" "github.com/kubewharf/katalyst-core/pkg/agent/orm/server/podresources" "github.com/kubewharf/katalyst-core/pkg/agent/orm/topology" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/generic" "github.com/kubewharf/katalyst-core/pkg/metaserver" + metaserverpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/bitmask" cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" @@ -54,9 +58,15 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/native" ) +type nriConfig struct { + Events []string `json:"events"` +} + type ManagerImpl struct { ctx context.Context + mode workMode + socketname string socketdir string @@ -69,6 +79,13 @@ type ManagerImpl struct { topologyManager topology.Manager + nriConf nriConfig + // nriStub is the implementation of NRI events handlers + nriStub stub.Stub + // nriMask stores the specific events that need to be hooked + nriMask stub.EventMask + nriOptions []stub.Option + server *grpc.Server wg sync.WaitGroup @@ -88,8 +105,11 @@ type ManagerImpl struct { devicesProvider podresources.DevicesProvider } -func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, config *config.Configuration) (*ManagerImpl, error) { - klog.V(2).Infof("new ORM..., socketPath: %v, resourceNameMap: %v, reconcilePeriod: %v", socketPath, config.ORMResourceNamesMap, config.ORMRconcilePeriod) +func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, + config *config.Configuration, +) (*ManagerImpl, error) { + klog.V(2).Infof("new ORM..., socketPath: %v, resourceNameMap: %v, reconcilePeriod: %v", socketPath, + config.ORMResourceNamesMap, config.ORMReconcilePeriod) if socketPath == "" || !filepath.IsAbs(socketPath) { return nil, fmt.Errorf(errBadSocket+" %s", socketPath) @@ -102,6 +122,7 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me } m := &ManagerImpl{ + mode: workMode(config.ORMWorkMode), socketdir: dir, socketname: file, @@ -110,7 +131,7 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me checkpointManager: checkpointManager, resourceNamesMap: config.ORMResourceNamesMap, - reconcilePeriod: config.ORMRconcilePeriod, + reconcilePeriod: config.ORMReconcilePeriod, podAddChan: make(chan string, config.ORMPodNotifyChanLen), podDeleteChan: make(chan string, config.ORMPodNotifyChanLen), @@ -118,11 +139,9 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me qosConfig: config.QoSConfiguration, podResourceSocket: config.ORMPodResourcesSocket, } - - m.resourceExecutor = executor.NewExecutor(cgroupmgr.GetManager()) - - metaManager := metamanager.NewManager(emitter, m.podResources.pods, metaServer) - m.metaManager = metaManager + m.metaManager = metamanager.NewManager(emitter, m.podResources.pods, metaServer) + // init orm work mode with essential components + m.initORMWorkMode(config) topologyManager, err := topology.NewManager(metaServer.Topology, config.TopologyPolicyName, config.NumericAlignResources) if err != nil { @@ -144,6 +163,43 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me return m, nil } +func (m *ManagerImpl) initORMWorkMode(config *config.Configuration) { + if !m.validateNRIMode(config) { + m.mode = workModeBypass + klog.Infof("[ORM] init ORM work mode with bypass mode") + m.resourceExecutor = executor.NewExecutor(cgroupmgr.GetManager()) + return + } + klog.Infof("[ORM] init ORM work mode with nri mode") + return +} + +func (m *ManagerImpl) validateNRIMode(config *config.Configuration) bool { + var err error + if config.ORMWorkMode != string(workModeNri) { + return false + } + if _, err := os.Stat(config.ORMNRISocketPath); os.IsNotExist(err) { + klog.Errorf("[ORM] nri socket path %q does not exist", config.ORMNRISocketPath) + return false + } + var opts []stub.Option + opts = append(opts, stub.WithPluginName(config.ORMNRIPluginName)) + opts = append(opts, stub.WithPluginIdx(config.ORMNRIPluginIndex)) + opts = append(opts, stub.WithSocketPath(config.ORMNRISocketPath)) + m.nriOptions = opts + + if m.nriMask, err = api.ParseEventMask(config.ORMNRIHandleEvents); err != nil { + klog.Errorf("[ORM] parse nri handle events fail: %v", err) + return false + } + if m.nriStub, err = stub.New(m, append(opts, stub.WithOnClose(m.onClose))...); err != nil { + klog.Errorf("[ORM] create nri stub fail: %v", err) + return false + } + return true +} + func (m *ManagerImpl) Run(ctx context.Context) { klog.V(2).Infof("[ORM] running...") m.ctx = ctx @@ -182,23 +238,34 @@ func (m *ManagerImpl) Run(ctx context.Context) { if err := recover(); err != nil { klog.Fatalf("[ORM] Start recover from err: %v", err) } - s.Close() + _ = s.Close() }() - m.server.Serve(s) + err := m.server.Serve(s) + if err != nil { + klog.Fatalf("[ORM] serve fail: %v", err) + } }() klog.V(5).Infof("[ORM] start serve socketPath %v", socketPath) - go func() { - m.process() - }() - - go wait.Until(m.reconcile, m.reconcilePeriod, m.ctx.Done()) - m.metaManager.RegistPodAddedFunc(m.onPodAdd) - m.metaManager.RegistPodDeletedFunc(m.onPodDelete) + if m.mode == workModeBypass { + go func() { + m.process() + }() + m.metaManager.RegistPodAddedFunc(m.onPodAdd) + m.metaManager.RegistPodDeletedFunc(m.onPodDelete) + } else { + go func() { + klog.Info("[ORM] nri stub run...") + err := m.nriStub.Run(ctx) + if err != nil { + klog.Fatalf("[ORM] nri stub run fail: %v", err) + } + }() + } m.metaManager.Run(ctx, m.reconcilePeriod) - + go wait.Until(m.reconcile, m.reconcilePeriod, m.ctx.Done()) go server.ListenAndServePodResources(m.podResourceSocket, m.metaManager, m, m.devicesProvider, m.emitter) } @@ -298,11 +365,12 @@ func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topology.Top func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { if pod == nil || container == nil { - return fmt.Errorf("Allocate got nil pod: %v or container: %v", pod, container) + return fmt.Errorf("allocate got nil pod: %v or container: %v", pod, container) } + // allocate resources for current pod, return after resource allocate when run in NRIMode err := m.addContainer(pod, container) - if err != nil { + if err != nil || m.mode == workModeNri { return err } @@ -380,7 +448,16 @@ func (m *ManagerImpl) process() { } func (m *ManagerImpl) processAddPod(podUID string) error { - pod, err := m.metaManager.MetaServer.GetPod(m.ctx, podUID) + var ( + pod *v1.Pod + err error + ) + if m.mode == workModeNri { + nriQueryCtx := context.WithValue(m.ctx, metaserverpod.BypassCacheKey, metaserverpod.BypassCacheTrue) + pod, err = m.metaManager.GetPod(nriQueryCtx, podUID) + } else { + pod, err = m.metaManager.GetPod(m.ctx, podUID) + } if err != nil { klog.Errorf("[ORM] processAddPod getPod fail, podUID: %v, err: %v", podUID, err) return err @@ -393,8 +470,8 @@ func (m *ManagerImpl) processDeletePod(podUID string) error { allSuccess := true m.mutex.Lock() - for resourceName, endpoint := range m.endpoints { - _, err := endpoint.E.RemovePod(m.ctx, &pluginapi.RemovePodRequest{ + for resourceName, ep := range m.endpoints { + _, err := ep.E.RemovePod(m.ctx, &pluginapi.RemovePodRequest{ PodUid: podUID, }) if err != nil { @@ -606,7 +683,17 @@ func (m *ManagerImpl) reconcile() { } } - _ = m.syncContainer(pod, &container) + if m.mode == workModeNri { + containerId, err := native.GetContainerID(pod, container.Name) + if err != nil { + klog.Errorf("[ORM] pod: %s/%s/%s, container: %s, get container id fail: %v", + pod.Namespace, pod.Name, pod.UID, container.Name, err) + continue + } + m.updateContainerByNRI(string(pod.UID), containerId, container.Name) + } else { + _ = m.syncContainer(pod, &container) + } } } @@ -675,6 +762,135 @@ func (m *ManagerImpl) IsContainerRequestResource(container *v1.Container, resour return false, nil } +// ************************************NRI Plugin Interface implement ************************************************** + +func (m *ManagerImpl) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) { + klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version) + if config == "" { + return m.nriMask, nil + } + + err := yaml.Unmarshal([]byte(config), &m.nriConf) + if err != nil { + return 0, fmt.Errorf("failed to parse provided configuration: %w", err) + } + + m.nriMask, err = api.ParseEventMask(m.nriConf.Events...) + if err != nil { + return 0, fmt.Errorf("failed to parse events in configuration: %w", err) + } + + klog.V(6).Infof("handle NRI Configure successfully, config %s, runtime %s, version %s", + config, runtime, version) + return m.nriMask, nil +} + +func (m *ManagerImpl) Synchronize(_ context.Context, pods []*api.PodSandbox, containers []*api.Container) ( + []*api.ContainerUpdate, error, +) { + // todo: update existed containers resources if orm stared after the Pod create events + return nil, nil +} + +func (m *ManagerImpl) RunPodSandbox(_ context.Context, pod *api.PodSandbox) error { + klog.Infof("[ORM] RunPodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) + klog.V(6).Infof("[ORM] RunPodSandbox, pod annotations: %v", pod.Annotations) + err := m.processAddPod(pod.Uid) + if err != nil { + klog.Errorf("[ORM] RunPodSandbox processAddPod fail, pod: %s/%s/%s, err: %v", + pod.Namespace, pod.Name, pod.Uid, err) + } + return err +} + +func (m *ManagerImpl) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) ( + *api.ContainerAdjustment, []*api.ContainerUpdate, error, +) { + klog.Infof("[ORM] CreateContainer, pod: %s/%s/%s, container: %v", pod.Namespace, pod.Name, pod.Uid, container.Name) + containerAllResources := m.podResources.containerAllResources(pod.Uid, container.Name) + if containerAllResources == nil { + klog.V(5).Infof("[ORM] CreateContainer process failed, pod: %s/%s/%s, container: %v, resources nil", + pod.Namespace, pod.Name, pod.Uid, container.Name) + return nil, nil, nil + } + + adjust := &api.ContainerAdjustment{} + for _, resourceAllocationInfo := range containerAllResources { + switch resourceAllocationInfo.OciPropertyName { + case util.OCIPropertyNameCPUSetCPUs: + if resourceAllocationInfo.AllocationResult != "" { + adjust.SetLinuxCPUSetCPUs(resourceAllocationInfo.AllocationResult) + } + case util.OCIPropertyNameCPUSetMems: + if resourceAllocationInfo.AllocationResult != "" { + adjust.SetLinuxCPUSetMems(resourceAllocationInfo.AllocationResult) + } + } + } + klog.V(5).Infof("[ORM] handle NRI CreateContainer successfully, pod: %s/%s/%s, container: %s, adjust: %v", + pod.Namespace, pod.Name, pod.Uid, container.Name, adjust) + return adjust, nil, nil +} + +func (m *ManagerImpl) UpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container, r *api.LinuxResources, +) ([]*api.ContainerUpdate, error) { + // todo: hook this method to update container resources + return nil, nil + // containerUpdate := m.getNRIContainerUpdate(pod.Uid, container.Id, container.Name) + // klog.V(5).Infof("[ORM] handle NRI UpdateContainer successfully, pod: %s/%s/%s, container: %s, update: %v", + // pod.Namespace, pod.Name, pod.Uid, container.Name, containerUpdate) + // return []*api.ContainerUpdate{containerUpdate}, nil +} + +func (m *ManagerImpl) RemovePodSandbox(_ context.Context, pod *api.PodSandbox) error { + klog.Infof("[ORM] RemovePodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) + err := m.processDeletePod(pod.Uid) + if err != nil { + klog.Errorf("[ORM] RemovePodSandbox processDeletePod fail, pod: %s/%s/%s, err: %v", + pod.Namespace, pod.Name, pod.Uid, err) + } + return err +} + +func (m *ManagerImpl) onClose() { + m.nriStub.Stop() + klog.V(6).Infof("NRI server closes") +} + +func (m *ManagerImpl) updateContainerByNRI(podUID, containerId, containerName string) { + klog.V(2).Infof("[ORM] updateContainerByNRI, pod: %v, container: %v", podUID, containerName) + containerUpdate := m.getNRIContainerUpdate(podUID, containerId, containerName) + _, err := m.nriStub.UpdateContainers([]*api.ContainerUpdate{containerUpdate}) + if err != nil { + klog.Errorf("[ORM] updateContainerByNRI fail, pod %v container %v,resource %v, err: %v", podUID, containerName, err) + } +} + +func (m *ManagerImpl) getNRIContainerUpdate(podUID, containerId, containerName string) *api.ContainerUpdate { + containerUpdate := &api.ContainerUpdate{ + ContainerId: containerId, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{Cpus: "", Mems: ""}}}, + } + containerAllResources := m.podResources.containerAllResources(podUID, containerName) + for _, resourceAllocationInfo := range containerAllResources { + switch resourceAllocationInfo.OciPropertyName { + case util.OCIPropertyNameCPUSetCPUs: + if resourceAllocationInfo.AllocationResult != "" { + containerUpdate.Linux.Resources.Cpu.Cpus = resourceAllocationInfo.AllocationResult + } + case util.OCIPropertyNameCPUSetMems: + if resourceAllocationInfo.AllocationResult != "" { + containerUpdate.Linux.Resources.Cpu.Mems = resourceAllocationInfo.AllocationResult + } + default: + + } + } + return containerUpdate +} + +// ********************************************************************************************************************* + func GetContainerTypeAndIndex(pod *v1.Pod, container *v1.Container) (containerType pluginapi.ContainerType, containerIndex uint64, err error) { if pod == nil || container == nil { err = fmt.Errorf("got nil pod: %v or container: %v", pod, container) diff --git a/pkg/agent/orm/manager_test.go b/pkg/agent/orm/manager_test.go index c34b6905c..a1e9a776d 100644 --- a/pkg/agent/orm/manager_test.go +++ b/pkg/agent/orm/manager_test.go @@ -24,6 +24,10 @@ import ( "testing" "time" + "github.com/containerd/nri/pkg/stub" + + "github.com/containerd/nri/pkg/api" + cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -99,6 +103,7 @@ func TestProcess(t *testing.T) { m := &ManagerImpl{ ctx: ctx, + mode: workModeBypass, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/process", metaManager: metamanager, @@ -182,6 +187,7 @@ func TestReconcile(t *testing.T) { assert.NoError(t, err) m := &ManagerImpl{ + mode: workModeBypass, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/reconcile", metaManager: metamanager, @@ -366,6 +372,7 @@ func TestRun(t *testing.T) { assert.NoError(t, err) m := &ManagerImpl{ + mode: workModeBypass, reconcilePeriod: 2 * time.Second, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/run", @@ -668,3 +675,257 @@ func (m *MockEndpoint) GetTopologyAwareAllocatableResources(c context.Context, r } return nil, nil } + +type fakeNRIStub struct{} + +func (f *fakeNRIStub) Run(ctx context.Context) error { + return nil +} + +func (f *fakeNRIStub) Start(ctx context.Context) error { + return nil +} + +func (f *fakeNRIStub) Stop() { + return +} + +func (f *fakeNRIStub) Wait() { + return +} + +func (f *fakeNRIStub) UpdateContainers(_ []*api.ContainerUpdate) ([]*api.ContainerUpdate, error) { + return nil, nil +} + +func TestManagerImpl_Configure(t *testing.T) { + t.Parallel() + conf := "{\"events\":[\"RunPodSandbox\",\"CreateContainer\",\"UpdateContainer\",\"RemovePodSandbox\"]}" + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + eventMask, err := m.Configure(context.TODO(), conf, "", "") + res := stub.EventMask(141) + assert.NoError(t, err) + assert.Equal(t, eventMask, res) +} + +func TestManagerImpl_Synchronize(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + update, err := m.Synchronize(context.TODO(), []*api.PodSandbox{}, []*api.Container{}) + assert.NoError(t, err) + assert.Nil(t, update) +} + +func TestManagerImpl_RunPodSandbox(t *testing.T) { + t.Parallel() + + pods := []*v1.Pod{ + makePod("testPod1", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + makePod("testPod2", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + } + + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + endpoints: map[string]endpoint.EndpointInfo{}, + podResources: newPodResourcesChk(), + } + topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ + { + Id: 0, + }, + }, "none", nil) + topologyManager.AddHintProvider(m) + m.topologyManager = topologyManager + + ckDir, err := ioutil.TempDir("", "checkpoint-Test") + assert.NoError(t, err) + defer func() { _ = os.RemoveAll(ckDir) }() + + conf := generateTestConfiguration(ckDir) + metaServer, err := generateTestMetaServer(conf, pods) + assert.NoError(t, err) + metaManager := metamanager.NewManager(metrics.DummyMetrics{}, m.podResources.pods, metaServer) + m.metaManager = metaManager + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") + assert.NoError(t, err) + + m.checkpointManager = checkpointManager + podUID1 := "testPodUID1" + err = m.RunPodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID1}) + assert.Error(t, err, fmt.Errorf("failed to find pod by uid testPod1")) +} + +func TestManagerImpl_CreateContainer(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + } + + // test CpuSetCpus + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + podName1 := "testPodName1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + pod1 := &api.PodSandbox{ + Name: podName1, + Uid: podUID1, + } + container1 := &api.Container{ + Id: containerID1, + Name: containerName1, + } + containerAdjust1, _, err := m.CreateContainer(context.TODO(), pod1, container1) + assert.NoError(t, err) + res1 := &api.ContainerAdjustment{ + Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ + Cpus: "5-6,10", + }}}, + } + assert.Equal(t, containerAdjust1, res1) + + // test CpuSetMems + resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() + podUID2 := "testPodUID2" + podName2 := "testPodName2" + containerName2 := "testContainer2" + containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" + m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) + pod2 := &api.PodSandbox{ + Name: podName2, + Uid: podUID2, + } + container2 := &api.Container{ + Id: containerID2, + Name: containerName2, + } + containerAdjust2, _, err := m.CreateContainer(context.TODO(), pod2, container2) + assert.NoError(t, err) + res2 := &api.ContainerAdjustment{ + Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ + Mems: "7-8,11", + }}}, + } + assert.Equal(t, containerAdjust2, res2) + + // test pod not exist + podNotExist := &api.PodSandbox{ + Name: "PodNotExist", + Uid: "PodUIDNotExist", + } + containerNotExist := &api.Container{ + Id: "ContainerIDNotExist", + Name: "ContainerNotExist", + } + containerAdjust, _, err := m.CreateContainer(context.TODO(), podNotExist, containerNotExist) + assert.Nil(t, containerAdjust) + assert.NoError(t, err) +} + +func TestManagerImpl_UpdateContainer(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + update, err := m.UpdateContainer(context.TODO(), &api.PodSandbox{}, &api.Container{}, &api.LinuxResources{}) + assert.NoError(t, err) + assert.Nil(t, update) +} + +func TestManagerImpl_RemovePodSandbox(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + endpoints: map[string]endpoint.EndpointInfo{}, + podResources: newPodResourcesChk(), + } + topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ + { + Id: 0, + }, + }, "none", nil) + topologyManager.AddHintProvider(m) + m.topologyManager = topologyManager + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") + assert.NoError(t, err) + + m.checkpointManager = checkpointManager + podUID := "testPodUID" + err = m.RemovePodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID}) + assert.NoError(t, err) +} + +func TestManagerImpl_onClose(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + m.onClose() +} + +func TestManagerImpl_updateContainerByNRI(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + nriStub: &fakeNRIStub{}, + } + + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + m.updateContainerByNRI(podUID1, containerID1, containerName1) +} + +func TestManagerImpl_getNRIContainerUpdate(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + } + + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + containerUpdate1 := m.getNRIContainerUpdate(podUID1, containerID1, containerName1) + res1 := &api.ContainerUpdate{ + ContainerId: containerID1, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ + Cpu: &api.LinuxCPU{ + Cpus: "5-6,10", + }, + }}, + } + assert.Equal(t, containerUpdate1, res1) + + resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() + podUID2 := "testPodUID2" + containerName2 := "testContainer2" + containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" + m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) + containerUpdate2 := m.getNRIContainerUpdate(podUID2, containerID2, containerName2) + res2 := &api.ContainerUpdate{ + ContainerId: containerID2, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ + Cpu: &api.LinuxCPU{ + Mems: "7-8,11", + }, + }}, + } + assert.Equal(t, containerUpdate2, res2) +} diff --git a/pkg/agent/orm/pod_resource_test.go b/pkg/agent/orm/pod_resource_test.go index 787eb0a0a..e952ee705 100644 --- a/pkg/agent/orm/pod_resource_test.go +++ b/pkg/agent/orm/pod_resource_test.go @@ -32,7 +32,7 @@ func TestPodResources(t *testing.T) { podResource := newPodResourcesChk() - resourceAllocationInfo := generateResourceAllocationInfo() + resourceAllocationInfo := generateCpuSetCpusAllocationInfo() podResource.insert("testPod", "testContainer", "cpu", resourceAllocationInfo) @@ -155,7 +155,7 @@ func TestCheckpointMarshal(t *testing.T) { assert.NoError(t, err) } -func generateResourceAllocationInfo() *pluginapi.ResourceAllocationInfo { +func generateCpuSetCpusAllocationInfo() *pluginapi.ResourceAllocationInfo { return &pluginapi.ResourceAllocationInfo{ OciPropertyName: "CpusetCpus", IsNodeResource: true, @@ -167,3 +167,16 @@ func generateResourceAllocationInfo() *pluginapi.ResourceAllocationInfo { ResourceHints: &pluginapi.ListOfTopologyHints{}, } } + +func generateCpuSetMemsAllocationInfo() *pluginapi.ResourceAllocationInfo { + return &pluginapi.ResourceAllocationInfo{ + OciPropertyName: "CpusetMems", + IsNodeResource: true, + IsScalarResource: true, + AllocatedQuantity: 3, + AllocationResult: "7-8,11", + Envs: map[string]string{"mock_key": "mock_env"}, + Annotations: map[string]string{"mock_key": "mock_ano"}, + ResourceHints: &pluginapi.ListOfTopologyHints{}, + } +} diff --git a/pkg/agent/orm/types.go b/pkg/agent/orm/types.go index cba34043d..e69962247 100644 --- a/pkg/agent/orm/types.go +++ b/pkg/agent/orm/types.go @@ -48,3 +48,10 @@ const ( NoneDevicesProvider = "" ) + +type workMode string + +const ( + workModeNri workMode = "nri" + workModeBypass workMode = "bypass" +) diff --git a/pkg/config/agent/orm/orm_base.go b/pkg/config/agent/orm/orm_base.go index adcb7cd28..eb7b2d647 100644 --- a/pkg/config/agent/orm/orm_base.go +++ b/pkg/config/agent/orm/orm_base.go @@ -19,7 +19,8 @@ package orm import "time" type GenericORMConfiguration struct { - ORMRconcilePeriod time.Duration + ORMWorkMode string + ORMReconcilePeriod time.Duration ORMResourceNamesMap map[string]string ORMPodNotifyChanLen int TopologyPolicyName string @@ -27,11 +28,16 @@ type GenericORMConfiguration struct { ORMPodResourcesSocket string ORMDevicesProvider string ORMKubeletPodResourcesEndpoints []string + ORMNRISocketPath string + ORMNRIPluginName string + ORMNRIPluginIndex string + ORMNRIHandleEvents string } func NewGenericORMConfiguration() *GenericORMConfiguration { return &GenericORMConfiguration{ - ORMRconcilePeriod: time.Second * 5, + ORMWorkMode: "bypass", + ORMReconcilePeriod: time.Second * 5, ORMResourceNamesMap: map[string]string{}, ORMPodNotifyChanLen: 10, TopologyPolicyName: "none", @@ -39,5 +45,9 @@ func NewGenericORMConfiguration() *GenericORMConfiguration { ORMPodResourcesSocket: "unix:/var/lib/katalyst/pod-resources/kubelet.sock", ORMDevicesProvider: "", ORMKubeletPodResourcesEndpoints: []string{"/var/lib/kubelet/pod-resources/kubelet.sock"}, + ORMNRISocketPath: "/var/run/nri/nri.sock", + ORMNRIPluginName: "orm", + ORMNRIPluginIndex: "00", + ORMNRIHandleEvents: "RunPodSandbox,CreateContainer,UpdateContainer,RemovePodSandbox", } } From 83f23aaf5b8bad8b5b5814c55148d08be58dfc71 Mon Sep 17 00:00:00 2001 From: caohe Date: Fri, 28 Jun 2024 15:31:38 +0800 Subject: [PATCH 2/3] fix(sysadvisor): fix data race in base server Signed-off-by: caohe --- pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go | 6 ++++++ pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go | 3 +++ .../sysadvisor/plugin/qosaware/server/memory_server.go | 3 +++ 3 files changed, 12 insertions(+) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go index da7154925..746c1709b 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go @@ -24,6 +24,7 @@ import ( "path" "reflect" "strings" + "sync" "time" "google.golang.org/grpc" @@ -58,6 +59,8 @@ const ( ) type baseServer struct { + mutex sync.RWMutex + name string period time.Duration advisorSocketPath string @@ -183,6 +186,9 @@ func (bs *baseServer) dial(unixSocketPath string, timeout time.Duration) (*grpc. } func (bs *baseServer) Stop() error { + bs.mutex.RLock() + defer bs.mutex.RUnlock() + close(bs.stopCh) _ = bs.emitter.StoreInt64(bs.genMetricsName(metricServerStopCalled), int64(bs.period.Seconds()), metrics.MetricTypeNameCount) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go index 3fb9890b3..e46480f7e 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go @@ -65,6 +65,9 @@ func NewCPUServer(recvCh chan types.InternalCPUCalculationResult, sendCh chan ty } func (cs *cpuServer) RegisterAdvisorServer() { + cs.mutex.Lock() + defer cs.mutex.Unlock() + grpcServer := grpc.NewServer() cpuadvisor.RegisterCPUAdvisorServer(grpcServer, cs) cs.grpcServer = grpcServer diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go index 6df3f51c6..f5aa46edb 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go @@ -61,6 +61,9 @@ func NewMemoryServer(recvCh chan types.InternalMemoryCalculationResult, sendCh c } func (ms *memoryServer) RegisterAdvisorServer() { + ms.mutex.Lock() + defer ms.mutex.Unlock() + grpcServer := grpc.NewServer() advisorsvc.RegisterAdvisorServiceServer(grpcServer, ms) ms.grpcServer = grpcServer From 08064743e1fb741433e733cbdb69a7db7d70856a Mon Sep 17 00:00:00 2001 From: Airren Date: Tue, 2 Jul 2024 17:05:38 +0800 Subject: [PATCH 3/3] refactor(orm): make the code more logical Signed-off-by: Airren --- .../app/options/orm/orm_base.go | 7 +- pkg/agent/orm/manager.go | 184 +---------- pkg/agent/orm/manager_nri.go | 188 +++++++++++ pkg/agent/orm/manager_nri_test.go | 293 ++++++++++++++++++ pkg/agent/orm/manager_test.go | 265 +--------------- pkg/agent/orm/types.go | 7 - pkg/config/agent/orm/orm_base.go | 10 +- pkg/consts/orm.go | 24 ++ 8 files changed, 532 insertions(+), 446 deletions(-) create mode 100644 pkg/agent/orm/manager_nri.go create mode 100644 pkg/agent/orm/manager_nri_test.go create mode 100644 pkg/consts/orm.go diff --git a/cmd/katalyst-agent/app/options/orm/orm_base.go b/cmd/katalyst-agent/app/options/orm/orm_base.go index c39c55532..5a9ad2a32 100644 --- a/cmd/katalyst-agent/app/options/orm/orm_base.go +++ b/cmd/katalyst-agent/app/options/orm/orm_base.go @@ -22,10 +22,11 @@ import ( cliflag "k8s.io/component-base/cli/flag" ormconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/orm" + "github.com/kubewharf/katalyst-core/pkg/consts" ) type GenericORMPluginOptions struct { - ORMWorkMode string + ORMWorkMode consts.WorkMode ORMReconcilePeriod time.Duration ORMResourceNamesMap map[string]string ORMPodNotifyChanLen int @@ -42,7 +43,7 @@ type GenericORMPluginOptions struct { func NewGenericORMPluginOptions() *GenericORMPluginOptions { return &GenericORMPluginOptions{ - ORMWorkMode: "bypass", + ORMWorkMode: consts.WorkModeBypass, ORMReconcilePeriod: time.Second * 5, ORMResourceNamesMap: map[string]string{}, ORMPodNotifyChanLen: 10, @@ -61,7 +62,7 @@ func NewGenericORMPluginOptions() *GenericORMPluginOptions { func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs := fss.FlagSet("orm") - fs.StringVar(&o.ORMWorkMode, "orm-work-mode", o.ORMWorkMode, "orm work mode, nri or bypass") + fs.StringVar((*string)(&o.ORMWorkMode), "orm-work-mode", string(o.ORMWorkMode), "orm work mode, nri or bypass") fs.DurationVar(&o.ORMReconcilePeriod, "orm-reconcile-period", o.ORMReconcilePeriod, "orm resource reconcile period") fs.StringToStringVar(&o.ORMResourceNamesMap, "orm-resource-names-map", o.ORMResourceNamesMap, diff --git a/pkg/agent/orm/manager.go b/pkg/agent/orm/manager.go index 615c4f897..455d9e4ac 100644 --- a/pkg/agent/orm/manager.go +++ b/pkg/agent/orm/manager.go @@ -26,11 +26,9 @@ import ( "sync" "time" - "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" "github.com/opencontainers/selinux/go-selinux" "google.golang.org/grpc" - "gopkg.in/yaml.v3" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -46,9 +44,9 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/orm/server" "github.com/kubewharf/katalyst-core/pkg/agent/orm/server/podresources" "github.com/kubewharf/katalyst-core/pkg/agent/orm/topology" - "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" metaserverpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -58,14 +56,10 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/native" ) -type nriConfig struct { - Events []string `json:"events"` -} - type ManagerImpl struct { ctx context.Context - mode workMode + mode consts.WorkMode socketname string socketdir string @@ -122,7 +116,7 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me } m := &ManagerImpl{ - mode: workMode(config.ORMWorkMode), + mode: config.ORMWorkMode, socketdir: dir, socketname: file, @@ -164,42 +158,17 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me } func (m *ManagerImpl) initORMWorkMode(config *config.Configuration) { - if !m.validateNRIMode(config) { - m.mode = workModeBypass + if m.validateNRIMode(config) { + m.mode = consts.WorkModeNri + klog.Infof("[ORM] init ORM work mode with nri mode") + } else { + m.mode = consts.WorkModeBypass klog.Infof("[ORM] init ORM work mode with bypass mode") m.resourceExecutor = executor.NewExecutor(cgroupmgr.GetManager()) - return } - klog.Infof("[ORM] init ORM work mode with nri mode") return } -func (m *ManagerImpl) validateNRIMode(config *config.Configuration) bool { - var err error - if config.ORMWorkMode != string(workModeNri) { - return false - } - if _, err := os.Stat(config.ORMNRISocketPath); os.IsNotExist(err) { - klog.Errorf("[ORM] nri socket path %q does not exist", config.ORMNRISocketPath) - return false - } - var opts []stub.Option - opts = append(opts, stub.WithPluginName(config.ORMNRIPluginName)) - opts = append(opts, stub.WithPluginIdx(config.ORMNRIPluginIndex)) - opts = append(opts, stub.WithSocketPath(config.ORMNRISocketPath)) - m.nriOptions = opts - - if m.nriMask, err = api.ParseEventMask(config.ORMNRIHandleEvents); err != nil { - klog.Errorf("[ORM] parse nri handle events fail: %v", err) - return false - } - if m.nriStub, err = stub.New(m, append(opts, stub.WithOnClose(m.onClose))...); err != nil { - klog.Errorf("[ORM] create nri stub fail: %v", err) - return false - } - return true -} - func (m *ManagerImpl) Run(ctx context.Context) { klog.V(2).Infof("[ORM] running...") m.ctx = ctx @@ -248,7 +217,7 @@ func (m *ManagerImpl) Run(ctx context.Context) { klog.V(5).Infof("[ORM] start serve socketPath %v", socketPath) - if m.mode == workModeBypass { + if m.mode == consts.WorkModeBypass { go func() { m.process() }() @@ -370,7 +339,7 @@ func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { // allocate resources for current pod, return after resource allocate when run in NRIMode err := m.addContainer(pod, container) - if err != nil || m.mode == workModeNri { + if err != nil || m.mode == consts.WorkModeNri { return err } @@ -452,7 +421,7 @@ func (m *ManagerImpl) processAddPod(podUID string) error { pod *v1.Pod err error ) - if m.mode == workModeNri { + if m.mode == consts.WorkModeNri { nriQueryCtx := context.WithValue(m.ctx, metaserverpod.BypassCacheKey, metaserverpod.BypassCacheTrue) pod, err = m.metaManager.GetPod(nriQueryCtx, podUID) } else { @@ -683,7 +652,7 @@ func (m *ManagerImpl) reconcile() { } } - if m.mode == workModeNri { + if m.mode == consts.WorkModeNri { containerId, err := native.GetContainerID(pod, container.Name) if err != nil { klog.Errorf("[ORM] pod: %s/%s/%s, container: %s, get container id fail: %v", @@ -762,135 +731,6 @@ func (m *ManagerImpl) IsContainerRequestResource(container *v1.Container, resour return false, nil } -// ************************************NRI Plugin Interface implement ************************************************** - -func (m *ManagerImpl) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) { - klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version) - if config == "" { - return m.nriMask, nil - } - - err := yaml.Unmarshal([]byte(config), &m.nriConf) - if err != nil { - return 0, fmt.Errorf("failed to parse provided configuration: %w", err) - } - - m.nriMask, err = api.ParseEventMask(m.nriConf.Events...) - if err != nil { - return 0, fmt.Errorf("failed to parse events in configuration: %w", err) - } - - klog.V(6).Infof("handle NRI Configure successfully, config %s, runtime %s, version %s", - config, runtime, version) - return m.nriMask, nil -} - -func (m *ManagerImpl) Synchronize(_ context.Context, pods []*api.PodSandbox, containers []*api.Container) ( - []*api.ContainerUpdate, error, -) { - // todo: update existed containers resources if orm stared after the Pod create events - return nil, nil -} - -func (m *ManagerImpl) RunPodSandbox(_ context.Context, pod *api.PodSandbox) error { - klog.Infof("[ORM] RunPodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) - klog.V(6).Infof("[ORM] RunPodSandbox, pod annotations: %v", pod.Annotations) - err := m.processAddPod(pod.Uid) - if err != nil { - klog.Errorf("[ORM] RunPodSandbox processAddPod fail, pod: %s/%s/%s, err: %v", - pod.Namespace, pod.Name, pod.Uid, err) - } - return err -} - -func (m *ManagerImpl) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) ( - *api.ContainerAdjustment, []*api.ContainerUpdate, error, -) { - klog.Infof("[ORM] CreateContainer, pod: %s/%s/%s, container: %v", pod.Namespace, pod.Name, pod.Uid, container.Name) - containerAllResources := m.podResources.containerAllResources(pod.Uid, container.Name) - if containerAllResources == nil { - klog.V(5).Infof("[ORM] CreateContainer process failed, pod: %s/%s/%s, container: %v, resources nil", - pod.Namespace, pod.Name, pod.Uid, container.Name) - return nil, nil, nil - } - - adjust := &api.ContainerAdjustment{} - for _, resourceAllocationInfo := range containerAllResources { - switch resourceAllocationInfo.OciPropertyName { - case util.OCIPropertyNameCPUSetCPUs: - if resourceAllocationInfo.AllocationResult != "" { - adjust.SetLinuxCPUSetCPUs(resourceAllocationInfo.AllocationResult) - } - case util.OCIPropertyNameCPUSetMems: - if resourceAllocationInfo.AllocationResult != "" { - adjust.SetLinuxCPUSetMems(resourceAllocationInfo.AllocationResult) - } - } - } - klog.V(5).Infof("[ORM] handle NRI CreateContainer successfully, pod: %s/%s/%s, container: %s, adjust: %v", - pod.Namespace, pod.Name, pod.Uid, container.Name, adjust) - return adjust, nil, nil -} - -func (m *ManagerImpl) UpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container, r *api.LinuxResources, -) ([]*api.ContainerUpdate, error) { - // todo: hook this method to update container resources - return nil, nil - // containerUpdate := m.getNRIContainerUpdate(pod.Uid, container.Id, container.Name) - // klog.V(5).Infof("[ORM] handle NRI UpdateContainer successfully, pod: %s/%s/%s, container: %s, update: %v", - // pod.Namespace, pod.Name, pod.Uid, container.Name, containerUpdate) - // return []*api.ContainerUpdate{containerUpdate}, nil -} - -func (m *ManagerImpl) RemovePodSandbox(_ context.Context, pod *api.PodSandbox) error { - klog.Infof("[ORM] RemovePodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) - err := m.processDeletePod(pod.Uid) - if err != nil { - klog.Errorf("[ORM] RemovePodSandbox processDeletePod fail, pod: %s/%s/%s, err: %v", - pod.Namespace, pod.Name, pod.Uid, err) - } - return err -} - -func (m *ManagerImpl) onClose() { - m.nriStub.Stop() - klog.V(6).Infof("NRI server closes") -} - -func (m *ManagerImpl) updateContainerByNRI(podUID, containerId, containerName string) { - klog.V(2).Infof("[ORM] updateContainerByNRI, pod: %v, container: %v", podUID, containerName) - containerUpdate := m.getNRIContainerUpdate(podUID, containerId, containerName) - _, err := m.nriStub.UpdateContainers([]*api.ContainerUpdate{containerUpdate}) - if err != nil { - klog.Errorf("[ORM] updateContainerByNRI fail, pod %v container %v,resource %v, err: %v", podUID, containerName, err) - } -} - -func (m *ManagerImpl) getNRIContainerUpdate(podUID, containerId, containerName string) *api.ContainerUpdate { - containerUpdate := &api.ContainerUpdate{ - ContainerId: containerId, - Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{Cpus: "", Mems: ""}}}, - } - containerAllResources := m.podResources.containerAllResources(podUID, containerName) - for _, resourceAllocationInfo := range containerAllResources { - switch resourceAllocationInfo.OciPropertyName { - case util.OCIPropertyNameCPUSetCPUs: - if resourceAllocationInfo.AllocationResult != "" { - containerUpdate.Linux.Resources.Cpu.Cpus = resourceAllocationInfo.AllocationResult - } - case util.OCIPropertyNameCPUSetMems: - if resourceAllocationInfo.AllocationResult != "" { - containerUpdate.Linux.Resources.Cpu.Mems = resourceAllocationInfo.AllocationResult - } - default: - - } - } - return containerUpdate -} - -// ********************************************************************************************************************* - func GetContainerTypeAndIndex(pod *v1.Pod, container *v1.Container) (containerType pluginapi.ContainerType, containerIndex uint64, err error) { if pod == nil || container == nil { err = fmt.Errorf("got nil pod: %v or container: %v", pod, container) diff --git a/pkg/agent/orm/manager_nri.go b/pkg/agent/orm/manager_nri.go new file mode 100644 index 000000000..ffc5653de --- /dev/null +++ b/pkg/agent/orm/manager_nri.go @@ -0,0 +1,188 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orm + +import ( + "context" + "fmt" + "os" + + "gopkg.in/yaml.v3" + + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nri/pkg/stub" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/consts" +) + +type nriConfig struct { + Events []string `json:"events"` +} + +func (m *ManagerImpl) validateNRIMode(config *config.Configuration) bool { + var err error + if config.ORMWorkMode != consts.WorkModeNri { + return false + } + if _, err := os.Stat(config.ORMNRISocketPath); os.IsNotExist(err) { + klog.Errorf("[ORM] nri socket path %q does not exist", config.ORMNRISocketPath) + return false + } + var opts []stub.Option + opts = append(opts, stub.WithPluginName(config.ORMNRIPluginName)) + opts = append(opts, stub.WithPluginIdx(config.ORMNRIPluginIndex)) + opts = append(opts, stub.WithSocketPath(config.ORMNRISocketPath)) + m.nriOptions = opts + + if m.nriMask, err = api.ParseEventMask(config.ORMNRIHandleEvents); err != nil { + klog.Errorf("[ORM] parse nri handle events fail: %v", err) + return false + } + if m.nriStub, err = stub.New(m, append(opts, stub.WithOnClose(m.onClose))...); err != nil { + klog.Errorf("[ORM] create nri stub fail: %v", err) + return false + } + return true +} + +func (m *ManagerImpl) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) { + klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version) + if config == "" { + return m.nriMask, nil + } + + err := yaml.Unmarshal([]byte(config), &m.nriConf) + if err != nil { + return 0, fmt.Errorf("failed to parse provided configuration: %w", err) + } + + m.nriMask, err = api.ParseEventMask(m.nriConf.Events...) + if err != nil { + return 0, fmt.Errorf("failed to parse events in configuration: %w", err) + } + + klog.V(6).Infof("handle NRI Configure successfully, config %s, runtime %s, version %s", + config, runtime, version) + return m.nriMask, nil +} + +func (m *ManagerImpl) Synchronize(_ context.Context, pods []*api.PodSandbox, containers []*api.Container) ( + []*api.ContainerUpdate, error, +) { + // todo: update existed containers resources if orm stared after the Pod create events + return nil, nil +} + +func (m *ManagerImpl) RunPodSandbox(_ context.Context, pod *api.PodSandbox) error { + klog.Infof("[ORM] RunPodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) + klog.V(6).Infof("[ORM] RunPodSandbox, pod annotations: %v", pod.Annotations) + err := m.processAddPod(pod.Uid) + if err != nil { + klog.Errorf("[ORM] RunPodSandbox processAddPod fail, pod: %s/%s/%s, err: %v", + pod.Namespace, pod.Name, pod.Uid, err) + } + return err +} + +func (m *ManagerImpl) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) ( + *api.ContainerAdjustment, []*api.ContainerUpdate, error, +) { + klog.Infof("[ORM] CreateContainer, pod: %s/%s/%s, container: %v", pod.Namespace, pod.Name, pod.Uid, container.Name) + containerAllResources := m.podResources.containerAllResources(pod.Uid, container.Name) + if containerAllResources == nil { + klog.V(5).Infof("[ORM] CreateContainer process failed, pod: %s/%s/%s, container: %v, resources nil", + pod.Namespace, pod.Name, pod.Uid, container.Name) + return nil, nil, nil + } + + adjust := &api.ContainerAdjustment{} + for _, resourceAllocationInfo := range containerAllResources { + switch resourceAllocationInfo.OciPropertyName { + case util.OCIPropertyNameCPUSetCPUs: + if resourceAllocationInfo.AllocationResult != "" { + adjust.SetLinuxCPUSetCPUs(resourceAllocationInfo.AllocationResult) + } + case util.OCIPropertyNameCPUSetMems: + if resourceAllocationInfo.AllocationResult != "" { + adjust.SetLinuxCPUSetMems(resourceAllocationInfo.AllocationResult) + } + } + } + klog.V(5).Infof("[ORM] handle NRI CreateContainer successfully, pod: %s/%s/%s, container: %s, adjust: %v", + pod.Namespace, pod.Name, pod.Uid, container.Name, adjust) + return adjust, nil, nil +} + +func (m *ManagerImpl) UpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container, r *api.LinuxResources, +) ([]*api.ContainerUpdate, error) { + // todo: hook this method to update container resources + return nil, nil + // containerUpdate := m.getNRIContainerUpdate(pod.Uid, container.Id, container.Name) + // klog.V(5).Infof("[ORM] handle NRI UpdateContainer successfully, pod: %s/%s/%s, container: %s, update: %v", + // pod.Namespace, pod.Name, pod.Uid, container.Name, containerUpdate) + // return []*api.ContainerUpdate{containerUpdate}, nil +} + +func (m *ManagerImpl) RemovePodSandbox(_ context.Context, pod *api.PodSandbox) error { + klog.Infof("[ORM] RemovePodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) + err := m.processDeletePod(pod.Uid) + if err != nil { + klog.Errorf("[ORM] RemovePodSandbox processDeletePod fail, pod: %s/%s/%s, err: %v", + pod.Namespace, pod.Name, pod.Uid, err) + } + return err +} + +func (m *ManagerImpl) onClose() { + m.nriStub.Stop() + klog.V(6).Infof("NRI server closes") +} + +func (m *ManagerImpl) updateContainerByNRI(podUID, containerId, containerName string) { + klog.V(2).Infof("[ORM] updateContainerByNRI, pod: %v, container: %v", podUID, containerName) + containerUpdate := m.getNRIContainerUpdate(podUID, containerId, containerName) + _, err := m.nriStub.UpdateContainers([]*api.ContainerUpdate{containerUpdate}) + if err != nil { + klog.Errorf("[ORM] updateContainerByNRI fail, pod %v container %v,resource %v, err: %v", podUID, containerName, err) + } +} + +func (m *ManagerImpl) getNRIContainerUpdate(podUID, containerId, containerName string) *api.ContainerUpdate { + containerUpdate := &api.ContainerUpdate{ + ContainerId: containerId, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{Cpus: "", Mems: ""}}}, + } + containerAllResources := m.podResources.containerAllResources(podUID, containerName) + for _, resourceAllocationInfo := range containerAllResources { + switch resourceAllocationInfo.OciPropertyName { + case util.OCIPropertyNameCPUSetCPUs: + if resourceAllocationInfo.AllocationResult != "" { + containerUpdate.Linux.Resources.Cpu.Cpus = resourceAllocationInfo.AllocationResult + } + case util.OCIPropertyNameCPUSetMems: + if resourceAllocationInfo.AllocationResult != "" { + containerUpdate.Linux.Resources.Cpu.Mems = resourceAllocationInfo.AllocationResult + } + default: + + } + } + return containerUpdate +} diff --git a/pkg/agent/orm/manager_nri_test.go b/pkg/agent/orm/manager_nri_test.go new file mode 100644 index 000000000..120ccefc9 --- /dev/null +++ b/pkg/agent/orm/manager_nri_test.go @@ -0,0 +1,293 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orm + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nri/pkg/stub" + cadvisorapi "github.com/google/cadvisor/info/v1" + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + + "github.com/kubewharf/katalyst-core/pkg/agent/orm/endpoint" + "github.com/kubewharf/katalyst-core/pkg/agent/orm/metamanager" + "github.com/kubewharf/katalyst-core/pkg/agent/orm/topology" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +type fakeNRIStub struct{} + +func (f *fakeNRIStub) Run(ctx context.Context) error { + return nil +} + +func (f *fakeNRIStub) Start(ctx context.Context) error { + return nil +} + +func (f *fakeNRIStub) Stop() { + return +} + +func (f *fakeNRIStub) Wait() { + return +} + +func (f *fakeNRIStub) UpdateContainers(_ []*api.ContainerUpdate) ([]*api.ContainerUpdate, error) { + return nil, nil +} + +func TestManagerImpl_Configure(t *testing.T) { + t.Parallel() + conf := "{\"events\":[\"RunPodSandbox\",\"CreateContainer\",\"UpdateContainer\",\"RemovePodSandbox\"]}" + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + eventMask, err := m.Configure(context.TODO(), conf, "", "") + res := stub.EventMask(141) + assert.NoError(t, err) + assert.Equal(t, eventMask, res) +} + +func TestManagerImpl_Synchronize(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + update, err := m.Synchronize(context.TODO(), []*api.PodSandbox{}, []*api.Container{}) + assert.NoError(t, err) + assert.Nil(t, update) +} + +func TestManagerImpl_RunPodSandbox(t *testing.T) { + t.Parallel() + + pods := []*v1.Pod{ + makePod("testPod1", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + makePod("testPod2", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + } + + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + endpoints: map[string]endpoint.EndpointInfo{}, + podResources: newPodResourcesChk(), + } + topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ + { + Id: 0, + }, + }, "none", nil) + topologyManager.AddHintProvider(m) + m.topologyManager = topologyManager + + ckDir, err := ioutil.TempDir("", "checkpoint-Test") + assert.NoError(t, err) + defer func() { _ = os.RemoveAll(ckDir) }() + + conf := generateTestConfiguration(ckDir) + metaServer, err := generateTestMetaServer(conf, pods) + assert.NoError(t, err) + metaManager := metamanager.NewManager(metrics.DummyMetrics{}, m.podResources.pods, metaServer) + m.metaManager = metaManager + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") + assert.NoError(t, err) + + m.checkpointManager = checkpointManager + podUID1 := "testPodUID1" + err = m.RunPodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID1}) + assert.Error(t, err, fmt.Errorf("failed to find pod by uid testPod1")) +} + +func TestManagerImpl_CreateContainer(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + } + + // test CpuSetCpus + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + podName1 := "testPodName1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + pod1 := &api.PodSandbox{ + Name: podName1, + Uid: podUID1, + } + container1 := &api.Container{ + Id: containerID1, + Name: containerName1, + } + containerAdjust1, _, err := m.CreateContainer(context.TODO(), pod1, container1) + assert.NoError(t, err) + res1 := &api.ContainerAdjustment{ + Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ + Cpus: "5-6,10", + }}}, + } + assert.Equal(t, containerAdjust1, res1) + + // test CpuSetMems + resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() + podUID2 := "testPodUID2" + podName2 := "testPodName2" + containerName2 := "testContainer2" + containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" + m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) + pod2 := &api.PodSandbox{ + Name: podName2, + Uid: podUID2, + } + container2 := &api.Container{ + Id: containerID2, + Name: containerName2, + } + containerAdjust2, _, err := m.CreateContainer(context.TODO(), pod2, container2) + assert.NoError(t, err) + res2 := &api.ContainerAdjustment{ + Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ + Mems: "7-8,11", + }}}, + } + assert.Equal(t, containerAdjust2, res2) + + // test pod not exist + podNotExist := &api.PodSandbox{ + Name: "PodNotExist", + Uid: "PodUIDNotExist", + } + containerNotExist := &api.Container{ + Id: "ContainerIDNotExist", + Name: "ContainerNotExist", + } + containerAdjust, _, err := m.CreateContainer(context.TODO(), podNotExist, containerNotExist) + assert.Nil(t, containerAdjust) + assert.NoError(t, err) +} + +func TestManagerImpl_UpdateContainer(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + update, err := m.UpdateContainer(context.TODO(), &api.PodSandbox{}, &api.Container{}, &api.LinuxResources{}) + assert.NoError(t, err) + assert.Nil(t, update) +} + +func TestManagerImpl_RemovePodSandbox(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + endpoints: map[string]endpoint.EndpointInfo{}, + podResources: newPodResourcesChk(), + } + topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ + { + Id: 0, + }, + }, "none", nil) + topologyManager.AddHintProvider(m) + m.topologyManager = topologyManager + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") + assert.NoError(t, err) + + m.checkpointManager = checkpointManager + podUID := "testPodUID" + err = m.RemovePodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID}) + assert.NoError(t, err) +} + +func TestManagerImpl_onClose(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + m.onClose() +} + +func TestManagerImpl_updateContainerByNRI(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + nriStub: &fakeNRIStub{}, + } + + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + m.updateContainerByNRI(podUID1, containerID1, containerName1) +} + +func TestManagerImpl_getNRIContainerUpdate(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + } + + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + containerUpdate1 := m.getNRIContainerUpdate(podUID1, containerID1, containerName1) + res1 := &api.ContainerUpdate{ + ContainerId: containerID1, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ + Cpu: &api.LinuxCPU{ + Cpus: "5-6,10", + }, + }}, + } + assert.Equal(t, containerUpdate1, res1) + + resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() + podUID2 := "testPodUID2" + containerName2 := "testContainer2" + containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" + m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) + containerUpdate2 := m.getNRIContainerUpdate(podUID2, containerID2, containerName2) + res2 := &api.ContainerUpdate{ + ContainerId: containerID2, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ + Cpu: &api.LinuxCPU{ + Mems: "7-8,11", + }, + }}, + } + assert.Equal(t, containerUpdate2, res2) +} diff --git a/pkg/agent/orm/manager_test.go b/pkg/agent/orm/manager_test.go index a1e9a776d..9501d6a2b 100644 --- a/pkg/agent/orm/manager_test.go +++ b/pkg/agent/orm/manager_test.go @@ -24,10 +24,6 @@ import ( "testing" "time" - "github.com/containerd/nri/pkg/stub" - - "github.com/containerd/nri/pkg/api" - cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -47,6 +43,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/orm/topology" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -103,7 +100,7 @@ func TestProcess(t *testing.T) { m := &ManagerImpl{ ctx: ctx, - mode: workModeBypass, + mode: consts.WorkModeBypass, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/process", metaManager: metamanager, @@ -187,7 +184,7 @@ func TestReconcile(t *testing.T) { assert.NoError(t, err) m := &ManagerImpl{ - mode: workModeBypass, + mode: consts.WorkModeBypass, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/reconcile", metaManager: metamanager, @@ -372,7 +369,7 @@ func TestRun(t *testing.T) { assert.NoError(t, err) m := &ManagerImpl{ - mode: workModeBypass, + mode: consts.WorkModeBypass, reconcilePeriod: 2 * time.Second, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/run", @@ -675,257 +672,3 @@ func (m *MockEndpoint) GetTopologyAwareAllocatableResources(c context.Context, r } return nil, nil } - -type fakeNRIStub struct{} - -func (f *fakeNRIStub) Run(ctx context.Context) error { - return nil -} - -func (f *fakeNRIStub) Start(ctx context.Context) error { - return nil -} - -func (f *fakeNRIStub) Stop() { - return -} - -func (f *fakeNRIStub) Wait() { - return -} - -func (f *fakeNRIStub) UpdateContainers(_ []*api.ContainerUpdate) ([]*api.ContainerUpdate, error) { - return nil, nil -} - -func TestManagerImpl_Configure(t *testing.T) { - t.Parallel() - conf := "{\"events\":[\"RunPodSandbox\",\"CreateContainer\",\"UpdateContainer\",\"RemovePodSandbox\"]}" - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - eventMask, err := m.Configure(context.TODO(), conf, "", "") - res := stub.EventMask(141) - assert.NoError(t, err) - assert.Equal(t, eventMask, res) -} - -func TestManagerImpl_Synchronize(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - update, err := m.Synchronize(context.TODO(), []*api.PodSandbox{}, []*api.Container{}) - assert.NoError(t, err) - assert.Nil(t, update) -} - -func TestManagerImpl_RunPodSandbox(t *testing.T) { - t.Parallel() - - pods := []*v1.Pod{ - makePod("testPod1", v1.ResourceList{ - "cpu": *resource.NewQuantity(2, resource.DecimalSI), - "memory": *resource.NewQuantity(2, resource.DecimalSI), - }), - makePod("testPod2", v1.ResourceList{ - "cpu": *resource.NewQuantity(2, resource.DecimalSI), - "memory": *resource.NewQuantity(2, resource.DecimalSI), - }), - } - - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - endpoints: map[string]endpoint.EndpointInfo{}, - podResources: newPodResourcesChk(), - } - topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ - { - Id: 0, - }, - }, "none", nil) - topologyManager.AddHintProvider(m) - m.topologyManager = topologyManager - - ckDir, err := ioutil.TempDir("", "checkpoint-Test") - assert.NoError(t, err) - defer func() { _ = os.RemoveAll(ckDir) }() - - conf := generateTestConfiguration(ckDir) - metaServer, err := generateTestMetaServer(conf, pods) - assert.NoError(t, err) - metaManager := metamanager.NewManager(metrics.DummyMetrics{}, m.podResources.pods, metaServer) - m.metaManager = metaManager - - checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") - assert.NoError(t, err) - - m.checkpointManager = checkpointManager - podUID1 := "testPodUID1" - err = m.RunPodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID1}) - assert.Error(t, err, fmt.Errorf("failed to find pod by uid testPod1")) -} - -func TestManagerImpl_CreateContainer(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - podResources: newPodResourcesChk(), - } - - // test CpuSetCpus - resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() - podUID1 := "testPodUID1" - podName1 := "testPodName1" - containerName1 := "testContainer1" - containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" - m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) - pod1 := &api.PodSandbox{ - Name: podName1, - Uid: podUID1, - } - container1 := &api.Container{ - Id: containerID1, - Name: containerName1, - } - containerAdjust1, _, err := m.CreateContainer(context.TODO(), pod1, container1) - assert.NoError(t, err) - res1 := &api.ContainerAdjustment{ - Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ - Cpus: "5-6,10", - }}}, - } - assert.Equal(t, containerAdjust1, res1) - - // test CpuSetMems - resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() - podUID2 := "testPodUID2" - podName2 := "testPodName2" - containerName2 := "testContainer2" - containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" - m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) - pod2 := &api.PodSandbox{ - Name: podName2, - Uid: podUID2, - } - container2 := &api.Container{ - Id: containerID2, - Name: containerName2, - } - containerAdjust2, _, err := m.CreateContainer(context.TODO(), pod2, container2) - assert.NoError(t, err) - res2 := &api.ContainerAdjustment{ - Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ - Mems: "7-8,11", - }}}, - } - assert.Equal(t, containerAdjust2, res2) - - // test pod not exist - podNotExist := &api.PodSandbox{ - Name: "PodNotExist", - Uid: "PodUIDNotExist", - } - containerNotExist := &api.Container{ - Id: "ContainerIDNotExist", - Name: "ContainerNotExist", - } - containerAdjust, _, err := m.CreateContainer(context.TODO(), podNotExist, containerNotExist) - assert.Nil(t, containerAdjust) - assert.NoError(t, err) -} - -func TestManagerImpl_UpdateContainer(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - update, err := m.UpdateContainer(context.TODO(), &api.PodSandbox{}, &api.Container{}, &api.LinuxResources{}) - assert.NoError(t, err) - assert.Nil(t, update) -} - -func TestManagerImpl_RemovePodSandbox(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - endpoints: map[string]endpoint.EndpointInfo{}, - podResources: newPodResourcesChk(), - } - topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ - { - Id: 0, - }, - }, "none", nil) - topologyManager.AddHintProvider(m) - m.topologyManager = topologyManager - - checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") - assert.NoError(t, err) - - m.checkpointManager = checkpointManager - podUID := "testPodUID" - err = m.RemovePodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID}) - assert.NoError(t, err) -} - -func TestManagerImpl_onClose(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - m.onClose() -} - -func TestManagerImpl_updateContainerByNRI(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - podResources: newPodResourcesChk(), - nriStub: &fakeNRIStub{}, - } - - resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() - podUID1 := "testPodUID1" - containerName1 := "testContainer1" - containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" - m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) - m.updateContainerByNRI(podUID1, containerID1, containerName1) -} - -func TestManagerImpl_getNRIContainerUpdate(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - podResources: newPodResourcesChk(), - } - - resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() - podUID1 := "testPodUID1" - containerName1 := "testContainer1" - containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" - m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) - containerUpdate1 := m.getNRIContainerUpdate(podUID1, containerID1, containerName1) - res1 := &api.ContainerUpdate{ - ContainerId: containerID1, - Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ - Cpu: &api.LinuxCPU{ - Cpus: "5-6,10", - }, - }}, - } - assert.Equal(t, containerUpdate1, res1) - - resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() - podUID2 := "testPodUID2" - containerName2 := "testContainer2" - containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" - m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) - containerUpdate2 := m.getNRIContainerUpdate(podUID2, containerID2, containerName2) - res2 := &api.ContainerUpdate{ - ContainerId: containerID2, - Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ - Cpu: &api.LinuxCPU{ - Mems: "7-8,11", - }, - }}, - } - assert.Equal(t, containerUpdate2, res2) -} diff --git a/pkg/agent/orm/types.go b/pkg/agent/orm/types.go index e69962247..cba34043d 100644 --- a/pkg/agent/orm/types.go +++ b/pkg/agent/orm/types.go @@ -48,10 +48,3 @@ const ( NoneDevicesProvider = "" ) - -type workMode string - -const ( - workModeNri workMode = "nri" - workModeBypass workMode = "bypass" -) diff --git a/pkg/config/agent/orm/orm_base.go b/pkg/config/agent/orm/orm_base.go index eb7b2d647..ddde9deb3 100644 --- a/pkg/config/agent/orm/orm_base.go +++ b/pkg/config/agent/orm/orm_base.go @@ -16,10 +16,14 @@ limitations under the License. package orm -import "time" +import ( + "time" + + "github.com/kubewharf/katalyst-core/pkg/consts" +) type GenericORMConfiguration struct { - ORMWorkMode string + ORMWorkMode consts.WorkMode ORMReconcilePeriod time.Duration ORMResourceNamesMap map[string]string ORMPodNotifyChanLen int @@ -36,7 +40,7 @@ type GenericORMConfiguration struct { func NewGenericORMConfiguration() *GenericORMConfiguration { return &GenericORMConfiguration{ - ORMWorkMode: "bypass", + ORMWorkMode: consts.WorkModeBypass, ORMReconcilePeriod: time.Second * 5, ORMResourceNamesMap: map[string]string{}, ORMPodNotifyChanLen: 10, diff --git a/pkg/consts/orm.go b/pkg/consts/orm.go new file mode 100644 index 000000000..dc30bcd0e --- /dev/null +++ b/pkg/consts/orm.go @@ -0,0 +1,24 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consts + +type WorkMode string + +const ( + WorkModeNri WorkMode = "nri" + WorkModeBypass WorkMode = "bypass" +)