From 1c614d757870d3226fe1e5d23a68c3da5470ab42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gary=20Liu=20=28=E5=88=98=E5=B9=BF=E6=BA=90=29?= Date: Tue, 7 Jan 2025 15:46:04 +0800 Subject: [PATCH] feat(qrm-plugins): validate GetAdvice request on response --- .../dynamicpolicy/policy_advisor_handler.go | 15 ++++-- .../cpu/dynamicpolicy/policy_test.go | 2 +- .../validator/validator_cpu_advisor.go | 47 +++++++++++++++++++ 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go index 3270c1cd9..1a2f52318 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go @@ -309,7 +309,7 @@ func (p *DynamicPolicy) getAdviceFromAdvisor(ctx context.Context) (isImplemented return true, fmt.Errorf("GetAdvice failed with error: %w", err) } - err = p.allocateByCPUAdvisor(&advisorapi.ListAndWatchResponse{ + err = p.allocateByCPUAdvisor(request, &advisorapi.ListAndWatchResponse{ Entries: resp.Entries, AllowSharedCoresOverlapReclaimedCores: resp.AllowSharedCoresOverlapReclaimedCores, ExtraEntries: resp.ExtraEntries, @@ -381,7 +381,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error { err, status.Code(err)) } - err = p.allocateByCPUAdvisor(resp) + err = p.allocateByCPUAdvisor(nil, resp) if err != nil { general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err) } @@ -395,7 +395,10 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error { } // allocateByCPUAdvisor perform allocate actions based on allocation response from cpu-advisor. -func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchResponse) (err error) { +func (p *DynamicPolicy) allocateByCPUAdvisor( + req *advisorapi.GetAdviceRequest, + resp *advisorapi.ListAndWatchResponse, +) (err error) { if resp == nil { return fmt.Errorf("allocateByCPUAdvisor got nil qos aware lw response") } @@ -412,6 +415,12 @@ func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchRespon general.InfoS("finished", "duration", time.Since(startTime)) }() + if req != nil { + vErr := p.advisorValidator.ValidateRequest(req) + if vErr != nil { + return fmt.Errorf("ValidateCPUAdvisorReq failed with error: %v", vErr) + } + } vErr := p.advisorValidator.Validate(resp) if vErr != nil { return fmt.Errorf("ValidateCPUAdvisorResp failed with error: %v", vErr) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 031c9ca05..831ba9dc8 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -5465,7 +5465,7 @@ func TestAllocateByQoSAwareServerListAndWatchResp(t *testing.T) { dynamicPolicy.state.SetMachineState(machineState) dynamicPolicy.initReservePool() - err = dynamicPolicy.allocateByCPUAdvisor(tc.lwResp) + err = dynamicPolicy.allocateByCPUAdvisor(nil, tc.lwResp) as.Nilf(err, "dynamicPolicy.allocateByCPUAdvisorServerListAndWatchResp got err: %v, case: %s", err, tc.name) getPodEntries := dynamicPolicy.state.GetPodEntries() diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator/validator_cpu_advisor.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator/validator_cpu_advisor.go index 37dba6ebe..7ec696962 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator/validator_cpu_advisor.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator/validator_cpu_advisor.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/util/errors" + "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" @@ -42,6 +43,52 @@ func NewCPUAdvisorValidator(state state.State, machineInfo *machine.KatalystMach } } +// ValidateRequest validates the GetAdvice request. +// We validate the request because we cannot infer the container metadata from sys-advisor response. +func (c *CPUAdvisorValidator) ValidateRequest(req *advisorapi.GetAdviceRequest) error { + if req == nil { + return fmt.Errorf("got nil req") + } + + entries := c.state.GetPodEntries() + + // validate shared_cores with numa_binding entries + sharedNUMABindingAllocationInfos := entries.GetFilteredPodEntries(state.WrapAllocationMetaFilter((*commonstate.AllocationMeta).CheckSharedNUMABinding)) + + for podUID, containerEntries := range sharedNUMABindingAllocationInfos { + for containerName, containerInfo := range containerEntries { + if req.Entries[podUID] == nil || req.Entries[podUID].Entries[containerName] == nil { + return fmt.Errorf("missing request entry for shared_cores with numa_binding pod: %s container: %s", podUID, containerName) + } + requestInfo := req.Entries[podUID].Entries[containerName] + // This container may have been changed from shared_cores without numa_binding to shared_cores with numa_binding. + // Verify if we have included this information in the request. + // If we have, sys-advisor must have observed it. + if requestInfo.Metadata.Annotations[consts.PodAnnotationMemoryEnhancementNumaBinding] != consts.PodAnnotationMemoryEnhancementNumaBindingEnable { + return fmt.Errorf( + "shared_cores with numa_binding pod: %s container: %s has invalid owner pool name: %s in request, expected %s", + podUID, containerName, requestInfo.AllocationInfo.OwnerPoolName, containerInfo.OwnerPoolName) + } + } + } + + for podUID, containerEntries := range req.Entries { + if containerEntries == nil { + continue + } + for containerName, requestInfo := range containerEntries.Entries { + if requestInfo.Metadata.QosLevel == consts.PodAnnotationQoSLevelSharedCores && + requestInfo.Metadata.Annotations[consts.PodAnnotationMemoryEnhancementNumaBinding] == consts.PodAnnotationMemoryEnhancementNumaBindingEnable { + if entries[podUID][containerName] == nil { + return fmt.Errorf("missing state entry for shared_cores with numa_binding pod: %s container: %s", podUID, containerName) + } + } + } + } + + return nil +} + func (c *CPUAdvisorValidator) Validate(resp *advisorapi.ListAndWatchResponse) error { if resp == nil { return fmt.Errorf("got nil cpu advisor resp")