diff --git a/cmd/katalyst-agent/app/options/qrm/memory_plugin.go b/cmd/katalyst-agent/app/options/qrm/memory_plugin.go index 64f16160d..62d0805a1 100644 --- a/cmd/katalyst-agent/app/options/qrm/memory_plugin.go +++ b/cmd/katalyst-agent/app/options/qrm/memory_plugin.go @@ -33,6 +33,7 @@ type MemoryOptions struct { OOMPriorityPinnedMapAbsPath string SockMemOptions + MemLow } type SockMemOptions struct { @@ -43,6 +44,11 @@ type SockMemOptions struct { SetCgroupTCPMemRatio int } +type MemLow struct { + EnableSettingMemLow bool + MemLowQoSLevelConfigFile string +} + func NewMemoryOptions() *MemoryOptions { return &MemoryOptions{ PolicyName: "dynamic", @@ -56,6 +62,10 @@ func NewMemoryOptions() *MemoryOptions { SetGlobalTCPMemRatio: 20, // default: 20% * {host total memory} SetCgroupTCPMemRatio: 100, // default: 100% * {cgroup memory} }, + MemLow: MemLow{ + EnableSettingMemLow: false, + MemLowQoSLevelConfigFile: "", + }, } } @@ -84,6 +94,10 @@ func (o *MemoryOptions) AddFlags(fss *cliflag.NamedFlagSets) { o.SetGlobalTCPMemRatio, "limit global max tcp memory usage") fs.IntVar(&o.SetCgroupTCPMemRatio, "qrm-memory-cgroup-tcpmem-ratio", o.SetCgroupTCPMemRatio, "limit cgroup max tcp memory usage") + fs.BoolVar(&o.EnableSettingMemLow, "enable-setting-mem-low", + o.EnableSettingMemLow, "if set true, we will do memory soft protection in qos level") + fs.StringVar(&o.MemLowQoSLevelConfigFile, "mem-low-qos-config-file", + o.MemLowQoSLevelConfigFile, "the absolute path of mem.low qos level config file") } func (o *MemoryOptions) ApplyTo(conf *qrmconfig.MemoryQRMPluginConfig) error { @@ -98,5 +112,7 @@ func (o *MemoryOptions) ApplyTo(conf *qrmconfig.MemoryQRMPluginConfig) error { conf.EnableSettingSockMem = o.EnableSettingSockMem conf.SetGlobalTCPMemRatio = o.SetGlobalTCPMemRatio conf.SetCgroupTCPMemRatio = o.SetCgroupTCPMemRatio + conf.EnableSettingMemLow = o.EnableSettingMemLow + conf.MemLowQoSLevelConfigFile = o.MemLowQoSLevelConfigFile return nil } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 44a599c3e..bc37389bc 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -40,6 +40,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/memlow" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/sockmem" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler" @@ -146,6 +147,7 @@ type DynamicPolicy struct { enableSettingMemoryMigrate bool enableSettingSockMem bool + enableSettingMemLow bool enableMemoryAdvisor bool memoryAdvisorSocketAbsPath string memoryPluginSocketAbsPath string @@ -211,6 +213,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration defaultAsyncLimitedWorkers: asyncworker.NewAsyncLimitedWorkers(memoryPluginAsyncWorkersName, defaultAsyncWorkLimit, wrappedEmitter), enableSettingMemoryMigrate: conf.EnableSettingMemoryMigrate, enableSettingSockMem: conf.EnableSettingSockMem, + enableSettingMemLow: conf.EnableSettingMemLow, enableMemoryAdvisor: conf.EnableMemoryAdvisor, memoryAdvisorSocketAbsPath: conf.MemoryAdvisorSocketAbsPath, memoryPluginSocketAbsPath: conf.MemoryPluginSocketAbsPath, @@ -367,6 +370,15 @@ func (p *DynamicPolicy) Start() (err error) { } } + if p.enableSettingMemLow { + general.Infof("setMemLow enabled") + err := periodicalhandler.RegisterPeriodicalHandler(qrm.QRMMemoryPluginPeriodicalHandlerGroupName, + memlow.EnableSetMemLowPeriodicalHandlerName, memlow.MemLowTaskFunc, 90*time.Second) + if err != nil { + general.Infof("setMemLow failed, err=%v", err) + } + } + go wait.Until(func() { periodicalhandler.ReadyToStartHandlersByGroup(qrm.QRMMemoryPluginPeriodicalHandlerGroupName) }, 5*time.Second, p.stopCh) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index d999cdb45..76ae13cc2 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -902,6 +902,7 @@ func TestAllocate(t *testing.T) { } dynamicPolicy.enableMemoryAdvisor = true + dynamicPolicy.enableSettingMemLow = true dynamicPolicy.advisorClient = advisorsvc.NewStubAdvisorServiceClient() resp, err := dynamicPolicy.Allocate(context.Background(), tc.req) diff --git a/pkg/agent/qrm-plugins/memory/handlers/memlow/const.go b/pkg/agent/qrm-plugins/memory/handlers/memlow/const.go new file mode 100644 index 000000000..1b4d09eca --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/memlow/const.go @@ -0,0 +1,31 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memlow + +const EnableSetMemLowPeriodicalHandlerName = "SetCGMemLow" + +const ( + // Constants for cgroup memory statistics + cgroupMemory32M = 33554432 + cgroupMemoryUnlimited = 9223372036854771712 + + controlKnobKeyMemLow = "mem_low" +) + +const ( + metricNameMemLow = "async_handler_cgroup_memlow" +) diff --git a/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_linux.go b/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_linux.go new file mode 100644 index 000000000..78b99c6ff --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_linux.go @@ -0,0 +1,200 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memlow + +import ( + "context" + "fmt" + "math" + "strconv" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + cgroupcm "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +func calculatedBestLow(memUsage, memFileInactive, userLow uint64) uint64 { + if memFileInactive > memUsage { + return 0 + } + minLow := memUsage - memFileInactive + cgroupMemory32M + low := math.Max(float64(userLow), float64(minLow)) + return uint64(low) +} + +func getUserSpecifiedMemoryLowInBytes(memLimit, memUsage, ratio uint64) uint64 { + if ratio > 100 || ratio <= 0 { + general.Infof("Bad ratio %v", ratio) + return 0 + } + + maxLimit := memLimit + if memLimit >= cgroupMemoryUnlimited { + maxLimit = memUsage + cgroupMemory32M + } + low := uint64(float64(maxLimit) / 100.0 * float64(ratio)) + low = general.AlignToPageSize(low) + return low +} + +func calculateMemLow(absCgPath string, ratio uint64) (uint64, error) { + /* + * I hope to protect cgroup from System-Thrashing(insufficient hot file memory) + * during kswapd reclaiming through cgv2 memory.low. + */ + // Step1, get cgroup memory.limit, memory.usage, inactive-file-memory. + memStat, err := cgroupmgr.GetMemoryWithAbsolutePath(absCgPath) + if err != nil { + general.Warningf("GetMemoryWithAbsolutePath failed with err: %v", err) + return 0, err + } + memDetailedStat, err := cgroupmgr.GetDetailedMemoryWithAbsolutePath(absCgPath) + if err != nil { + general.Warningf("GetDetailedMemoryWithAbsolutePath failed with err: %v", err) + return 0, err + } + + // Step2, Reserve a certain ratio of file memory for high-QoS cgroups. + userLow := getUserSpecifiedMemoryLowInBytes(memStat.Limit, memStat.Usage, ratio) + if userLow == 0 { + general.Warningf("getUserSpecifiedMemoryLowInBytes return 0") + return 0, fmt.Errorf("getUserSpecifiedMemoryLowInBytes return 0") + } + + // Step3, I don't want to hurt existing hot file-memory. + // If the reserve file memory is not sufficient for current hot file-memory, + // then the final memory.low will be based on current hot file-memory. + low := calculatedBestLow(memStat.Usage, memDetailedStat.FileInactive, userLow) + + general.Infof("calculateMemLow: target=%v, usr=%v, ratio=%v, cg=%v, limit=%v, usage=%v, file-inactive=%v", low, userLow, ratio, absCgPath, memStat.Limit, memStat.Usage, memDetailedStat.FileInactive) + return low, nil +} + +func applyMemLowQoSLevelConfig(conf *coreconfig.Configuration, + emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, +) { + if conf.MemLowQoSLevelConfigFile == "" { + general.Infof("no MemLowQoSLevelConfigFile found") + return + } + + var extraControlKnobConfigs commonstate.ExtraControlKnobConfigs + if err := general.LoadJsonConfig(conf.MemLowQoSLevelConfigFile, &extraControlKnobConfigs); err != nil { + general.Errorf("MemLowQoSLevelConfigFile load failed:%v", err) + return + } + ctx := context.Background() + podList, err := metaServer.GetPodList(ctx, native.PodIsActive) + if err != nil { + general.Infof("get pod list failed: %v", err) + return + } + + for _, pod := range podList { + if pod == nil { + general.Warningf("get nil pod from metaServer") + continue + } + if conf.QoSConfiguration == nil { + continue + } + qosConfig := conf.QoSConfiguration + qosLevel, err := qosConfig.GetQoSLevelForPod(pod) + if err != nil { + general.Warningf("GetQoSLevelForPod failed:%v", err) + continue + } + qosLevelDefaultValue, ok := extraControlKnobConfigs[controlKnobKeyMemLow].QoSLevelToDefaultValue[qosLevel] + if !ok { + continue + } + + ratio, err := strconv.ParseFloat(qosLevelDefaultValue, 64) + if err != nil { + general.Infof("Atoi failed with err: %v", err) + continue + } + + absCgPath, err := common.GetPodAbsCgroupPath(common.CgroupSubsysMemory, string(pod.UID)) + if err != nil { + continue + } + low, err := calculateMemLow(absCgPath, uint64(ratio*100)) + if err != nil { + general.Errorf("calculateMemLow for relativeCgPath: %s failed with error: %v", + absCgPath, err) + continue + } + + // OK. Set the value for memory.low. + var data *cgroupcm.MemoryData + data = &cgroupcm.MemoryData{SoftLimitInBytes: int64(low)} + if err := cgroupmgr.ApplyMemoryWithAbsolutePath(absCgPath, data); err != nil { + general.Warningf("ApplyMemoryWithRelativePath failed, cgpath=%v, err=%v", absCgPath, err) + continue + } + _ = emitter.StoreInt64(metricNameMemLow, int64(low), metrics.MetricTypeNameRaw, + metrics.ConvertMapToTags(map[string]string{ + "podUID": string(pod.UID), + })...) + + } +} + +func MemLowTaskFunc(conf *coreconfig.Configuration, + _ interface{}, _ *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, +) { + general.Infof("called") + + if conf == nil { + general.Errorf("nil extraConf") + return + } else if emitter == nil { + general.Errorf("nil emitter") + return + } else if metaServer == nil { + general.Errorf("nil metaServer") + return + } + + // SettingMemLow featuregate. + if !conf.EnableSettingMemLow { + general.Infof("EnableSettingMemLow disabled") + return + } + + if !cgroupcm.CheckCgroup2UnifiedMode() { + general.Infof("not in cgv2 environment, skip MemLowTaskFunc") + return + } + + // checking qos-level memory.low configuration. + if len(conf.MemLowQoSLevelConfigFile) > 0 { + applyMemLowQoSLevelConfig(conf, emitter, metaServer) + } +} diff --git a/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_linux_test.go b/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_linux_test.go new file mode 100644 index 000000000..08c92707c --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_linux_test.go @@ -0,0 +1,365 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memlow + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + "github.com/kubewharf/katalyst-core/pkg/config" + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent" + configagent "github.com/kubewharf/katalyst-core/pkg/config/agent" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + metaagent "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +func generateTestConfiguration(t *testing.T, checkpointDir, stateFileDir string) *config.Configuration { + testConfiguration, err := options.NewOptions().Config() + require.NoError(t, err) + require.NotNil(t, testConfiguration) + + testConfiguration.GenericSysAdvisorConfiguration.StateFileDirectory = stateFileDir + testConfiguration.MetaServerConfiguration.CheckpointManagerDir = checkpointDir + + return testConfiguration +} + +func makeMetaServer() (*metaserver.MetaServer, error) { + server := &metaserver.MetaServer{ + MetaAgent: &metaagent.MetaAgent{}, + } + + cpuTopology, err := machine.GenerateDummyCPUTopology(16, 1, 2) + if err != nil { + return nil, err + } + + server.KatalystMachineInfo = &machine.KatalystMachineInfo{ + CPUTopology: cpuTopology, + } + server.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}) + return server, nil +} + +func TestMemLow(t *testing.T) { + t.Parallel() + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: false, + }, + }, + }, + }, + }, + }, nil, &dynamicconfig.DynamicAgentConfiguration{}, nil, nil) + + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + }, + }, + }, + }, + }, + }, nil, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, nil) + + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, nil) + + metaServer, err := makeMetaServer() + assert.NoError(t, err) + metaServer.PodFetcher = &pod.PodFetcherStub{PodList: []*v1.Pod{}} + + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) + + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: false, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) + + normalPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "normalPod", + Name: "normalPod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c", + }, + }, + }, + } + + metaServer.PodFetcher = &pod.PodFetcherStub{PodList: []*v1.Pod{normalPod}} + + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) + + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + MemLowQoSLevelConfigFile: "", + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) + + MemLowTaskFunc(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + MemLowQoSLevelConfigFile: "fake", + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) + + applyMemLowQoSLevelConfig(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + MemLowQoSLevelConfigFile: "", + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, metaServer) + + jsonContent := `{ + "mem_low": { + "control_knob_info": { + "cgroup_subsys_name": "memory", + "cgroup_version_to_iface_name": { + "v1": "", + "v2": "memory.low" + }, + "control_knob_value": "0", + "oci_property_name": "" + }, + "pod_explicitly_annotation_key": "MemcgLowValue", + "qos_level_to_default_value": { + "dedicated_cores": "15", + "shared_cores": "15" + } + } + }` + + // Create a temporary file + tempFile, err := ioutil.TempFile("", "test.json") + if err != nil { + fmt.Println("Error creating temporary file:", err) + return + } + defer os.Remove(tempFile.Name()) // Defer removing the temporary file + + // Write the JSON content to the temporary file + if _, err := tempFile.WriteString(jsonContent); err != nil { + fmt.Println("Error writing to temporary file:", err) + return + } + + absPath, err := filepath.Abs(tempFile.Name()) + if err != nil { + fmt.Println("Error obtaining absolute path:", err) + return + } + + applyMemLowQoSLevelConfig(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + MemLowQoSLevelConfigFile: absPath, + }, + }, + }, + }, + }, + GenericConfiguration: &generic.GenericConfiguration{ + QoSConfiguration: nil, + }, + }, metrics.DummyMetrics{}, metaServer) + + checkpointDir, err := ioutil.TempDir("", "checkpoint-FetchModelResult") + require.NoError(t, err) + defer func() { _ = os.RemoveAll(checkpointDir) }() + + stateFileDir, err := ioutil.TempDir("", "statefile-FetchModelResult") + require.NoError(t, err) + defer func() { _ = os.RemoveAll(stateFileDir) }() + + conf := generateTestConfiguration(t, checkpointDir, stateFileDir) + + applyMemLowQoSLevelConfig(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + MemLowQoSLevelConfigFile: absPath, + }, + }, + }, + }, + }, + GenericConfiguration: &generic.GenericConfiguration{ + QoSConfiguration: conf.QoSConfiguration, + }, + }, metrics.DummyMetrics{}, metaServer) + + metaServerNil, err := makeMetaServer() + assert.NoError(t, err) + metaServerNil.PodFetcher = &pod.PodFetcherStub{PodList: []*v1.Pod{}} + + applyMemLowQoSLevelConfig(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + MemLowOptions: qrm.MemLowOptions{ + EnableSettingMemLow: true, + MemLowQoSLevelConfigFile: absPath, + }, + }, + }, + }, + }, + GenericConfiguration: &generic.GenericConfiguration{ + QoSConfiguration: nil, + }, + }, metrics.DummyMetrics{}, metaServerNil) + + calculateMemLow("fake", 10) +} + +func TestGetUserSpecifiedMemoryLowInBytes(t *testing.T) { + t.Parallel() + + result := getUserSpecifiedMemoryLowInBytes(1073741824, 107374182, 10) + var expected uint64 = 107376640 + assert.Equal(t, expected, result, "Test getUserSpecifiedMemLowInBytes failed") + + result = getUserSpecifiedMemoryLowInBytes(1073741824, 107374182, 123) + expected = 0 + assert.Equal(t, expected, result, "Test getUserSpecifiedMemLowInBytes failed") + + result = getUserSpecifiedMemoryLowInBytes(9223372036854771712, 1073741824, 10) + expected = 0x699a000 + assert.Equal(t, expected, result, "Test getUserSpecifiedMemLowInBytes failed") +} + +func TestCalculatedBestLow(t *testing.T) { + t.Parallel() + + result := calculatedBestLow(1073741824.0, 536870912.0, 536870912.0) + var expected uint64 = 0x22000000 + assert.Equal(t, expected, result, "Test alculatedBestLow failed") + + result = calculatedBestLow(1073741824.0, 5368709120.0, 536870912.0) + expected = 0 + assert.Equal(t, expected, result, "Test alculatedBestLow failed") +} diff --git a/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_unsupported.go b/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_unsupported.go new file mode 100644 index 000000000..112b64aaf --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/memlow/memlow_unsupported.go @@ -0,0 +1,31 @@ +//go:build !linux + +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memlow + +import ( + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +func MemLowTaskFunc(conf *coreconfig.Configuration, + _ interface{}, _ *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer) { +} diff --git a/pkg/config/agent/qrm/memory_plugin.go b/pkg/config/agent/qrm/memory_plugin.go index 02004d773..04714b3ff 100644 --- a/pkg/config/agent/qrm/memory_plugin.go +++ b/pkg/config/agent/qrm/memory_plugin.go @@ -36,6 +36,8 @@ type MemoryQRMPluginConfig struct { // SockMemQRMPluginConfig: the configuration for sockmem limitation in cgroup and host level SockMemQRMPluginConfig + // MemLowOptions: the configuration for cgroup memory.low in qos level + MemLowOptions } type SockMemQRMPluginConfig struct { @@ -47,6 +49,11 @@ type SockMemQRMPluginConfig struct { SetCgroupTCPMemRatio int } +type MemLowOptions struct { + EnableSettingMemLow bool + MemLowQoSLevelConfigFile string +} + func NewMemoryQRMPluginConfig() *MemoryQRMPluginConfig { return &MemoryQRMPluginConfig{} } diff --git a/pkg/util/cgroup/common/types.go b/pkg/util/cgroup/common/types.go index 8062a2271..9c5a57bac 100644 --- a/pkg/util/cgroup/common/types.go +++ b/pkg/util/cgroup/common/types.go @@ -162,6 +162,12 @@ type MemoryStats struct { Usage uint64 } +// MemoryDetailedStats get detailed cgroup memory data +type MemoryDetailedStats struct { + File uint64 + FileInactive uint64 +} + // CPUStats get cgroup cpu data type CPUStats struct { CpuPeriod uint64 diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index 794cc443c..5b30678b1 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -43,6 +43,14 @@ func ApplyMemoryWithRelativePath(relCgroupPath string, data *common.MemoryData) return GetManager().ApplyMemory(absCgroupPath, data) } +func ApplyMemoryWithAbsolutePath(absCgroupPath string, data *common.MemoryData) error { + if data == nil { + return fmt.Errorf("ApplyMemoryWithAbsolutePath with nil cgroup data") + } + + return GetManager().ApplyMemory(absCgroupPath, data) +} + func ApplyCPUWithRelativePath(relCgroupPath string, data *common.CPUData) error { if data == nil { return fmt.Errorf("ApplyCPUWithRelativePath with nil cgroup data") @@ -179,6 +187,10 @@ func GetMemoryWithAbsolutePath(absCgroupPath string) (*common.MemoryStats, error return GetManager().GetMemory(absCgroupPath) } +func GetDetailedMemoryWithAbsolutePath(absCgroupPath string) (*common.MemoryDetailedStats, error) { + return GetManager().GetDetailedMemory(absCgroupPath) +} + func GetIOCostQoSWithRelativePath(relCgroupPath string) (map[string]*common.IOCostQoSData, error) { absCgroupPath := common.GetAbsCgroupPath(common.CgroupSubsysIO, relCgroupPath) return GetIOCostQoSWithAbsolutePath(absCgroupPath) diff --git a/pkg/util/cgroup/manager/cgroup_test.go b/pkg/util/cgroup/manager/cgroup_test.go index 702090d4e..6e4fbb57f 100644 --- a/pkg/util/cgroup/manager/cgroup_test.go +++ b/pkg/util/cgroup/manager/cgroup_test.go @@ -58,6 +58,7 @@ func testV2Manager(t *testing.T) { testManager(t, "v2") testSwapMax(t) + testDetailedMem(t) } func testManager(t *testing.T, version string) { @@ -67,6 +68,8 @@ func testManager(t *testing.T, version string) { err = ApplyMemoryWithRelativePath("/test", &common.MemoryData{}) assert.NoError(t, err) + err = ApplyMemoryWithAbsolutePath("/test", &common.MemoryData{}) + assert.NoError(t, err) err = ApplyCPUWithRelativePath("/test", &common.CPUData{}) assert.NoError(t, err) err = ApplyCPUSetWithRelativePath("/test", &common.CPUSetData{}) @@ -80,6 +83,7 @@ func testManager(t *testing.T, version string) { _, _ = GetMemoryWithRelativePath("/") _, _ = GetMemoryWithAbsolutePath("/") + _, _ = GetDetailedMemoryWithAbsolutePath("/") _, _ = GetCPUWithRelativePath("/") _, _ = GetMetricsWithRelativePath("/", map[string]struct{}{"cpu": {}}) _, _ = GetPidsWithRelativePath("/") @@ -167,3 +171,43 @@ func testSwapMax(t *testing.T) { assert.NoError(t, err) assert.Equal(t, fmt.Sprintf("%v", 0), string(s)) } + +func testDetailedMem(t *testing.T) { + defer monkey.UnpatchAll() + monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) + monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) + monkey.Patch(cgroups.ReadFile, func(dir, file string) (string, error) { + f := filepath.Join(dir, file) + tmp, err := ioutil.ReadFile(f) + if err != nil { + return "", err + } + return string(tmp), nil + }) + monkey.Patch(cgroups.WriteFile, func(dir, file, data string) error { + f := filepath.Join(dir, file) + return ioutil.WriteFile(f, []byte(data), 0o700) + }) + + rootDir := os.TempDir() + dir := filepath.Join(rootDir, "tmp") + err := os.Mkdir(dir, 0o700) + assert.NoError(t, err) + + tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + monkey.Patch(common.GetCgroupRootPath, func(s string) string { + t.Logf("rootDir=%v", rootDir) + return rootDir + }) + + content := "file 1234\ninactive_file 2222\n" + statFile := filepath.Join(tmpDir, "memory.stat") + err = ioutil.WriteFile(statFile, []byte(content), 0o700) + assert.NoError(t, err) + + _, err = GetManager().GetDetailedMemory(tmpDir) + assert.NoError(t, err) +} diff --git a/pkg/util/cgroup/manager/fake_manager.go b/pkg/util/cgroup/manager/fake_manager.go index 477023fee..a3ea24316 100644 --- a/pkg/util/cgroup/manager/fake_manager.go +++ b/pkg/util/cgroup/manager/fake_manager.go @@ -60,6 +60,10 @@ func (f *FakeCgroupManager) GetNumaMemory(absCgroupPath string) (map[int]*common return nil, nil } +func (f *FakeCgroupManager) GetDetailedMemory(absCgroupPath string) (*common.MemoryDetailedStats, error) { + return nil, nil +} + func (f *FakeCgroupManager) GetCPU(absCgroupPath string) (*common.CPUStats, error) { return nil, nil } diff --git a/pkg/util/cgroup/manager/manager.go b/pkg/util/cgroup/manager/manager.go index d48280212..7035cd08a 100644 --- a/pkg/util/cgroup/manager/manager.go +++ b/pkg/util/cgroup/manager/manager.go @@ -45,6 +45,7 @@ type Manager interface { GetMemory(absCgroupPath string) (*common.MemoryStats, error) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) + GetDetailedMemory(absCgroupPath string) (*common.MemoryDetailedStats, error) GetCPU(absCgroupPath string) (*common.CPUStats, error) GetCPUSet(absCgroupPath string) (*common.CPUSetStats, error) GetIOCostQoS(absCgroupPath string) (map[string]*common.IOCostQoSData, error) diff --git a/pkg/util/cgroup/manager/v1/fs_linux.go b/pkg/util/cgroup/manager/v1/fs_linux.go index 00a6ea632..213e6eaf0 100644 --- a/pkg/util/cgroup/manager/v1/fs_linux.go +++ b/pkg/util/cgroup/manager/v1/fs_linux.go @@ -22,6 +22,7 @@ package v1 import ( "errors" "fmt" + "io/ioutil" "path" "path/filepath" "strconv" @@ -217,6 +218,36 @@ func (m *manager) ApplyUnifiedData(absCgroupPath, cgroupFileName, data string) e return nil } +func GetMemoryStatsFromStatFile(absCgroupPath string) (uint64, uint64, error) { + statFile := filepath.Join(absCgroupPath, "memory.stat") + content, err := ioutil.ReadFile(statFile) + if err != nil { + return 0, 0, fmt.Errorf("failed to read %s: %v", statFile, err) + } + + var cache, fileInactive uint64 + lines := strings.Split(string(content), "\n") + for _, line := range lines { + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + key := fields[0] + value, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse value in %s: %v", statFile, err) + } + switch key { + case "cache": + cache = value + case "total_inactive_file": + fileInactive = value + } + } + + return cache, fileInactive, nil +} + func (m *manager) GetMemory(absCgroupPath string) (*common.MemoryStats, error) { memoryStats := &common.MemoryStats{} moduleName := "memory" @@ -238,6 +269,19 @@ func (m *manager) GetMemory(absCgroupPath string) (*common.MemoryStats, error) { return memoryStats, nil } +func (m *manager) GetDetailedMemory(absCgroupPath string) (*common.MemoryDetailedStats, error) { + memoryStats := &common.MemoryDetailedStats{} + + cache, fileInactive, err := GetMemoryStatsFromStatFile(absCgroupPath) + if err != nil { + return nil, err + } + memoryStats.File = cache + memoryStats.FileInactive = fileInactive + + return memoryStats, nil +} + func (m *manager) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) { numaStat, err := common.ParseCgroupNumaValue(absCgroupPath, "memory.numa_stat") if err != nil { diff --git a/pkg/util/cgroup/manager/v1/fs_linux_test.go b/pkg/util/cgroup/manager/v1/fs_linux_test.go index dbb25f989..08c808a1b 100644 --- a/pkg/util/cgroup/manager/v1/fs_linux_test.go +++ b/pkg/util/cgroup/manager/v1/fs_linux_test.go @@ -97,3 +97,49 @@ func Test_manager_ApplyMemory(t *testing.T) { }) } } + +func Test_manager_GetDetailedMemory(t *testing.T) { + t.Parallel() + + type args struct { + absCgroupPath string + } + tests := []struct { + name string + m *manager + args args + want *common.MemoryDetailedStats + wantErr bool + }{ + { + name: "test get memory", + m: NewManager(), + args: args{ + absCgroupPath: "test-fake-path", + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + m := &manager{} + got, err := m.GetDetailedMemory(tt.args.absCgroupPath) + if (err != nil) != tt.wantErr { + t.Errorf("manager.GetMemory() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("manager.GetMemory() = %v, want %v", got, tt.want) + } + + _, _, err = GetMemoryStatsFromStatFile(tt.args.absCgroupPath) + if (err != nil) != tt.wantErr { + t.Errorf("GetMemoryStatsFromStatFile() error = %v, wantErr %v", err, tt.wantErr) + return + } + }) + } +} diff --git a/pkg/util/cgroup/manager/v2/fs_linux.go b/pkg/util/cgroup/manager/v2/fs_linux.go index 2ced86db6..9a0b5c8d1 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux.go +++ b/pkg/util/cgroup/manager/v2/fs_linux.go @@ -280,6 +280,49 @@ func (m *manager) GetMemory(absCgroupPath string) (*common.MemoryStats, error) { return memoryStats, nil } +func GetMemoryStatsFromStatFile(absCgroupPath string) (uint64, uint64, error) { + statFile := filepath.Join(absCgroupPath, "memory.stat") + content, err := ioutil.ReadFile(statFile) + if err != nil { + return 0, 0, fmt.Errorf("failed to read %s: %v", statFile, err) + } + + var cache, fileInactive uint64 + lines := strings.Split(string(content), "\n") + for _, line := range lines { + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + key := fields[0] + value, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse value in %s: %v", statFile, err) + } + switch key { + case "file": + cache = value + case "inactive_file": + fileInactive = value + } + } + + return cache, fileInactive, nil +} + +func (m *manager) GetDetailedMemory(absCgroupPath string) (*common.MemoryDetailedStats, error) { + memoryStats := &common.MemoryDetailedStats{} + + cache, fileInactive, err := GetMemoryStatsFromStatFile(absCgroupPath) + if err != nil { + return nil, err + } + memoryStats.File = cache + memoryStats.FileInactive = fileInactive + + return memoryStats, nil +} + func (m *manager) GetNumaMemory(absCgroupPath string) (map[int]*common.MemoryNumaMetrics, error) { numaStat, err := common.ParseCgroupNumaValue(absCgroupPath, "memory.numa_stat") general.Infof("get cgroup %+v numa stat %+v", absCgroupPath, numaStat) diff --git a/pkg/util/cgroup/manager/v2/fs_linux_test.go b/pkg/util/cgroup/manager/v2/fs_linux_test.go index 0a9fad9d2..c214c8b56 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux_test.go +++ b/pkg/util/cgroup/manager/v2/fs_linux_test.go @@ -949,3 +949,49 @@ func Test_parseDeviceIOCostModel(t *testing.T) { }) } } + +func Test_manager_GetDetailedMemory(t *testing.T) { + t.Parallel() + + type args struct { + absCgroupPath string + } + tests := []struct { + name string + m *manager + args args + want *common.MemoryDetailedStats + wantErr bool + }{ + { + name: "test get memory", + m: NewManager(), + args: args{ + absCgroupPath: "test-fake-path", + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + m := &manager{} + got, err := m.GetDetailedMemory(tt.args.absCgroupPath) + if (err != nil) != tt.wantErr { + t.Errorf("manager.GetMemory() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("manager.GetMemory() = %v, want %v", got, tt.want) + } + + _, _, err = GetMemoryStatsFromStatFile(tt.args.absCgroupPath) + if (err != nil) != tt.wantErr { + t.Errorf("GetMemoryStatsFromStatFile() error = %v, wantErr %v", err, tt.wantErr) + return + } + }) + } +} diff --git a/pkg/util/general/common.go b/pkg/util/general/common.go index 077fbc8ff..a61e073be 100644 --- a/pkg/util/general/common.go +++ b/pkg/util/general/common.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "syscall" "time" "k8s.io/apimachinery/pkg/api/resource" @@ -384,6 +385,13 @@ func Clamp(value, min, max float64) float64 { return math.Max(math.Min(value, max), min) } +// AlignToPageSize returns the value aligned with page size +func AlignToPageSize(number uint64) uint64 { + pageSize := uint64(syscall.Getpagesize()) + alignedNumber := (number + pageSize - 1) &^ (pageSize - 1) + return alignedNumber +} + // FormatMemoryQuantity aligned to Gi Mi Ki func FormatMemoryQuantity(q float64) string { value := int64(q) diff --git a/pkg/util/general/common_test.go b/pkg/util/general/common_test.go index 8cbdb1faf..62c2fbe38 100644 --- a/pkg/util/general/common_test.go +++ b/pkg/util/general/common_test.go @@ -17,8 +17,10 @@ limitations under the License. package general import ( + "syscall" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/sets" ) @@ -149,3 +151,16 @@ func TestFormatMemoryQutantity(t *testing.T) { as.Equal("1.048576e+06[1Mi]", FormatMemoryQuantity(1<<20)) as.Equal("1.073741824e+09[1Gi]", FormatMemoryQuantity(1<<30)) } + +func TestAlignToPageSize(t *testing.T) { + t.Parallel() + pageSize := uint64(syscall.Getpagesize()) + + // Test case 1: Number already aligned to page size + result := AlignToPageSize(pageSize * 2) + assert.Equal(t, pageSize*2, result, "Unexpected result for aligned number") + + // Test case 2: Number smaller than page size + result = AlignToPageSize(pageSize - 1) + assert.Equal(t, pageSize, result, "Unexpected result for number smaller than page size") +}