Skip to content

Commit

Permalink
feat(qrm-plugins): validate GetAdvice request on response
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-lgy committed Jan 9, 2025
1 parent 87c8350 commit 1c614d7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
15 changes: 12 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (p *DynamicPolicy) getAdviceFromAdvisor(ctx context.Context) (isImplemented
return true, fmt.Errorf("GetAdvice failed with error: %w", err)
}

err = p.allocateByCPUAdvisor(&advisorapi.ListAndWatchResponse{
err = p.allocateByCPUAdvisor(request, &advisorapi.ListAndWatchResponse{
Entries: resp.Entries,
AllowSharedCoresOverlapReclaimedCores: resp.AllowSharedCoresOverlapReclaimedCores,
ExtraEntries: resp.ExtraEntries,
Expand Down Expand Up @@ -381,7 +381,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
err, status.Code(err))
}

err = p.allocateByCPUAdvisor(resp)
err = p.allocateByCPUAdvisor(nil, resp)
if err != nil {
general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err)
}
Expand All @@ -395,7 +395,10 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
}

// allocateByCPUAdvisor perform allocate actions based on allocation response from cpu-advisor.
func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchResponse) (err error) {
func (p *DynamicPolicy) allocateByCPUAdvisor(
req *advisorapi.GetAdviceRequest,
resp *advisorapi.ListAndWatchResponse,
) (err error) {
if resp == nil {
return fmt.Errorf("allocateByCPUAdvisor got nil qos aware lw response")
}
Expand All @@ -412,6 +415,12 @@ func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchRespon
general.InfoS("finished", "duration", time.Since(startTime))
}()

if req != nil {
vErr := p.advisorValidator.ValidateRequest(req)
if vErr != nil {
return fmt.Errorf("ValidateCPUAdvisorReq failed with error: %v", vErr)
}
}
vErr := p.advisorValidator.Validate(resp)
if vErr != nil {
return fmt.Errorf("ValidateCPUAdvisorResp failed with error: %v", vErr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5465,7 +5465,7 @@ func TestAllocateByQoSAwareServerListAndWatchResp(t *testing.T) {
dynamicPolicy.state.SetMachineState(machineState)
dynamicPolicy.initReservePool()

err = dynamicPolicy.allocateByCPUAdvisor(tc.lwResp)
err = dynamicPolicy.allocateByCPUAdvisor(nil, tc.lwResp)
as.Nilf(err, "dynamicPolicy.allocateByCPUAdvisorServerListAndWatchResp got err: %v, case: %s", err, tc.name)

getPodEntries := dynamicPolicy.state.GetPodEntries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/apimachinery/pkg/util/errors"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate"
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
Expand All @@ -42,6 +43,52 @@ func NewCPUAdvisorValidator(state state.State, machineInfo *machine.KatalystMach
}
}

// ValidateRequest validates the GetAdvice request.
// We validate the request because we cannot infer the container metadata from sys-advisor response.
func (c *CPUAdvisorValidator) ValidateRequest(req *advisorapi.GetAdviceRequest) error {
if req == nil {
return fmt.Errorf("got nil req")
}

entries := c.state.GetPodEntries()

// validate shared_cores with numa_binding entries
sharedNUMABindingAllocationInfos := entries.GetFilteredPodEntries(state.WrapAllocationMetaFilter((*commonstate.AllocationMeta).CheckSharedNUMABinding))

for podUID, containerEntries := range sharedNUMABindingAllocationInfos {
for containerName, containerInfo := range containerEntries {
if req.Entries[podUID] == nil || req.Entries[podUID].Entries[containerName] == nil {
return fmt.Errorf("missing request entry for shared_cores with numa_binding pod: %s container: %s", podUID, containerName)
}
requestInfo := req.Entries[podUID].Entries[containerName]
// This container may have been changed from shared_cores without numa_binding to shared_cores with numa_binding.
// Verify if we have included this information in the request.
// If we have, sys-advisor must have observed it.
if requestInfo.Metadata.Annotations[consts.PodAnnotationMemoryEnhancementNumaBinding] != consts.PodAnnotationMemoryEnhancementNumaBindingEnable {
return fmt.Errorf(
"shared_cores with numa_binding pod: %s container: %s has invalid owner pool name: %s in request, expected %s",
podUID, containerName, requestInfo.AllocationInfo.OwnerPoolName, containerInfo.OwnerPoolName)
}
}
}

for podUID, containerEntries := range req.Entries {
if containerEntries == nil {
continue
}
for containerName, requestInfo := range containerEntries.Entries {
if requestInfo.Metadata.QosLevel == consts.PodAnnotationQoSLevelSharedCores &&
requestInfo.Metadata.Annotations[consts.PodAnnotationMemoryEnhancementNumaBinding] == consts.PodAnnotationMemoryEnhancementNumaBindingEnable {
if entries[podUID][containerName] == nil {
return fmt.Errorf("missing state entry for shared_cores with numa_binding pod: %s container: %s", podUID, containerName)
}
}
}
}

return nil
}

func (c *CPUAdvisorValidator) Validate(resp *advisorapi.ListAndWatchResponse) error {
if resp == nil {
return fmt.Errorf("got nil cpu advisor resp")
Expand Down

0 comments on commit 1c614d7

Please sign in to comment.