From 4bad43e49774c1e55b022990be1c333bbd1da432 Mon Sep 17 00:00:00 2001 From: Julio Montes Date: Tue, 15 May 2018 15:07:18 -0500 Subject: [PATCH] grpc: honour CPU constraints in Kubernetes Once all vCPUs have been connected, cpuset cgroup MUST BE updated, to achieve that, each cpuset cgroup parent of the container MUST BE updated with the actual range of vCPUs. fixes #239 fixes #232 Signed-off-by: Julio Montes --- grpc.go | 57 +++++++++++++++++++++++++++++++++--------------- grpc_test.go | 20 ++++++++++++++--- mockcontainer.go | 2 ++ 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/grpc.go b/grpc.go index 155d7f2976..fc251d6785 100644 --- a/grpc.go +++ b/grpc.go @@ -17,7 +17,6 @@ import ( "regexp" "strconv" "strings" - "sync" "syscall" "time" @@ -57,7 +56,7 @@ var ( sysfsCPUOnlinePath = "/sys/devices/system/cpu" sysfsMemOnlinePath = "/sys/devices/system/memory" sysfsConnectedCPUsPath = filepath.Join(sysfsCPUOnlinePath, "online") - sysfsDockerCpusetPath = "/sys/fs/cgroup/cpuset/docker/cpuset.cpus" + sysfsCpusetPath = "/sys/fs/cgroup/cpuset" ) type onlineResource struct { @@ -67,13 +66,11 @@ type onlineResource struct { var emptyResp = &gpb.Empty{} -var onlineCPUMemLock sync.Mutex - const onlineCPUMemWaitTime = 100 * time.Millisecond const onlineCPUMaxTries = 10 -const dockerCpusetMode = 0644 +const cpusetMode = 0644 // handleError will log the specified error if wait is true func handleError(wait bool, err error) error { @@ -163,9 +160,11 @@ func (a *agentGRPC) onlineCPUMem(req *pb.OnlineCPUMemRequest) error { return handleError(req.Wait, fmt.Errorf("requested number of CPUs '%d' must be greater than 0", req.NbCpus)) } - onlineCPUMemLock.Lock() - defer onlineCPUMemLock.Unlock() + // we are going to update the containers of the sandbox, we have to lock it + a.sandbox.Lock() + defer a.sandbox.Unlock() + agentLog.WithField("vcpus-to-connect", req.NbCpus).Debug("connecting vCPUs") if err := onlineCPUResources(req.NbCpus); err != nil { return handleError(req.Wait, err) } @@ -181,29 +180,51 @@ func (a *agentGRPC) onlineCPUMem(req *pb.OnlineCPUMemRequest) error { return handleError(req.Wait, fmt.Errorf("Could not get the actual range of connected CPUs: %v", err)) } connectedCpus := strings.Trim(string(cpus), "\t\n ") + agentLog.WithField("range-of-vcpus", connectedCpus).Debug("connecting vCPUs") - // In order to update container's cpuset cgroups, docker's cpuset cgroup MUST BE updated with - // the actual number of connected CPUs - if err := ioutil.WriteFile(sysfsDockerCpusetPath, []byte(connectedCpus), dockerCpusetMode); err != nil { - return handleError(req.Wait, fmt.Errorf("Could not update docker cpuset cgroup '%s': %v", connectedCpus, err)) - } + mapCgroupPaths := make(map[string]bool) // Now that we know the actual range of connected CPUs, we need to iterate over // all containers an update each cpuset cgroup. This is not required in docker // containers since they don't hot add/remove CPUs. for _, c := range a.sandbox.containers { + agentLog.WithField("container", c.container.ID()).Debug("updating cpuset cgroup") contConfig := c.container.Config() + // Don't update cpuset cgroup if one was already defined. if contConfig.Cgroups.Resources.CpusetCpus != "" { + agentLog.WithField("cpuset", contConfig.Cgroups.Resources.CpusetCpus).Debug("cpuset value is not empty") continue } - contConfig.Cgroups.Resources = &configs.Resources{ - CpusetCpus: connectedCpus, - } - if err := c.container.Set(contConfig); err != nil { - return handleError(req.Wait, err) - } + // Each cpuset cgroup MUST BE updated with the actual number of vCPUs. + cpusetPath := sysfsCpusetPath + cgroupsPaths := strings.Split(contConfig.Cgroups.Path, "/") + for _, path := range cgroupsPaths { + // Skip if empty. + if path == "" { + continue + } + + cpusetPath = filepath.Join(cpusetPath, path) + + // check if the cgroup was already updated. + if mapCgroupPaths[cpusetPath] == true { + agentLog.WithField("path", cpusetPath).Debug("cpuset cgroup already updated") + continue + } + + // Don't use c.container.Set because of it will modify container's config. + // c.container.Set MUST BE used only on update. + cpusetCpusPath := filepath.Join(cpusetPath, "cpuset.cpus") + agentLog.WithField("path", cpusetPath).Debug("updating cpuset cgroup") + if err := ioutil.WriteFile(cpusetCpusPath, []byte(connectedCpus), cpusetMode); err != nil { + return handleError(req.Wait, fmt.Errorf("Could not update cpuset cgroup '%s': %v", connectedCpus, err)) + } + + // add cgroup path to the map. + mapCgroupPaths[cpusetPath] = true + } } return nil diff --git a/grpc_test.go b/grpc_test.go index d7078224da..41a39bbf40 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -116,6 +116,17 @@ func TestOnlineCPUMem(t *testing.T) { }, } + containerID := "1" + containerID2 := "2" + container := &container{ + container: &mockContainer{ + id: containerID, + processes: []int{1}, + }, + } + a.sandbox.containers[containerID] = container + a.sandbox.containers[containerID2] = container + req := &pb.OnlineCPUMemRequest{ NbCpus: 1, Wait: true, @@ -165,10 +176,13 @@ func TestOnlineCPUMem(t *testing.T) { _, err = a.OnlineCPUMem(context.TODO(), req) assert.Error(err, "docker cgroup path does not exist") - dockerCpusetPath, err := ioutil.TempDir("", "docker") + sysfsCpusetPath, err = ioutil.TempDir("", "cgroup") + assert.NoError(err) + cfg := container.container.Config() + cgroupPath := filepath.Join(sysfsCpusetPath, cfg.Cgroups.Path) + err = os.MkdirAll(cgroupPath, 0777) assert.NoError(err) - defer os.RemoveAll(dockerCpusetPath) - sysfsDockerCpusetPath = filepath.Join(dockerCpusetPath, "cpuset.cpus") + defer os.RemoveAll(sysfsCpusetPath) err = ioutil.WriteFile(memory0Online, []byte("0"), 0755) assert.NoError(err) diff --git a/mockcontainer.go b/mockcontainer.go index bcc5fb4212..cbda967915 100644 --- a/mockcontainer.go +++ b/mockcontainer.go @@ -7,6 +7,7 @@ package main import ( + "fmt" "os" "github.com/opencontainers/runc/libcontainer" @@ -37,6 +38,7 @@ func (m *mockContainer) Config() configs.Config { Capabilities: &configs.Capabilities{}, Cgroups: &configs.Cgroup{ Resources: &configs.Resources{}, + Path: fmt.Sprintf("/cgroup/%s", m.id), }, Seccomp: &configs.Seccomp{}, }