Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
grpc: honour CPU constraints in Kubernetes
Browse files Browse the repository at this point in the history
Once all vCPUs have been connected, cpuset cgroup MUST BE updated,
to achieve that, each cpuset cgroup parent of the container
MUST BE updated with the actual range of vCPUs.

fixes #239
fixes #232

Signed-off-by: Julio Montes <[email protected]>
  • Loading branch information
Julio Montes committed May 17, 2018
1 parent d4580c7 commit 4bad43e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
57 changes: 39 additions & 18 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -57,7 +56,7 @@ var (
sysfsCPUOnlinePath = "/sys/devices/system/cpu"
sysfsMemOnlinePath = "/sys/devices/system/memory"
sysfsConnectedCPUsPath = filepath.Join(sysfsCPUOnlinePath, "online")
sysfsDockerCpusetPath = "/sys/fs/cgroup/cpuset/docker/cpuset.cpus"
sysfsCpusetPath = "/sys/fs/cgroup/cpuset"
)

type onlineResource struct {
Expand All @@ -67,13 +66,11 @@ type onlineResource struct {

var emptyResp = &gpb.Empty{}

var onlineCPUMemLock sync.Mutex

const onlineCPUMemWaitTime = 100 * time.Millisecond

const onlineCPUMaxTries = 10

const dockerCpusetMode = 0644
const cpusetMode = 0644

// handleError will log the specified error if wait is true
func handleError(wait bool, err error) error {
Expand Down Expand Up @@ -163,9 +160,11 @@ func (a *agentGRPC) onlineCPUMem(req *pb.OnlineCPUMemRequest) error {
return handleError(req.Wait, fmt.Errorf("requested number of CPUs '%d' must be greater than 0", req.NbCpus))
}

onlineCPUMemLock.Lock()
defer onlineCPUMemLock.Unlock()
// we are going to update the containers of the sandbox, we have to lock it
a.sandbox.Lock()
defer a.sandbox.Unlock()

agentLog.WithField("vcpus-to-connect", req.NbCpus).Debug("connecting vCPUs")
if err := onlineCPUResources(req.NbCpus); err != nil {
return handleError(req.Wait, err)
}
Expand All @@ -181,29 +180,51 @@ func (a *agentGRPC) onlineCPUMem(req *pb.OnlineCPUMemRequest) error {
return handleError(req.Wait, fmt.Errorf("Could not get the actual range of connected CPUs: %v", err))
}
connectedCpus := strings.Trim(string(cpus), "\t\n ")
agentLog.WithField("range-of-vcpus", connectedCpus).Debug("connecting vCPUs")

// In order to update container's cpuset cgroups, docker's cpuset cgroup MUST BE updated with
// the actual number of connected CPUs
if err := ioutil.WriteFile(sysfsDockerCpusetPath, []byte(connectedCpus), dockerCpusetMode); err != nil {
return handleError(req.Wait, fmt.Errorf("Could not update docker cpuset cgroup '%s': %v", connectedCpus, err))
}
mapCgroupPaths := make(map[string]bool)

// Now that we know the actual range of connected CPUs, we need to iterate over
// all containers an update each cpuset cgroup. This is not required in docker
// containers since they don't hot add/remove CPUs.
for _, c := range a.sandbox.containers {
agentLog.WithField("container", c.container.ID()).Debug("updating cpuset cgroup")
contConfig := c.container.Config()

// Don't update cpuset cgroup if one was already defined.
if contConfig.Cgroups.Resources.CpusetCpus != "" {
agentLog.WithField("cpuset", contConfig.Cgroups.Resources.CpusetCpus).Debug("cpuset value is not empty")
continue
}
contConfig.Cgroups.Resources = &configs.Resources{
CpusetCpus: connectedCpus,
}
if err := c.container.Set(contConfig); err != nil {
return handleError(req.Wait, err)
}

// Each cpuset cgroup MUST BE updated with the actual number of vCPUs.
cpusetPath := sysfsCpusetPath
cgroupsPaths := strings.Split(contConfig.Cgroups.Path, "/")
for _, path := range cgroupsPaths {
// Skip if empty.
if path == "" {
continue
}

cpusetPath = filepath.Join(cpusetPath, path)

// check if the cgroup was already updated.
if mapCgroupPaths[cpusetPath] == true {
agentLog.WithField("path", cpusetPath).Debug("cpuset cgroup already updated")
continue
}

// Don't use c.container.Set because of it will modify container's config.
// c.container.Set MUST BE used only on update.
cpusetCpusPath := filepath.Join(cpusetPath, "cpuset.cpus")
agentLog.WithField("path", cpusetPath).Debug("updating cpuset cgroup")
if err := ioutil.WriteFile(cpusetCpusPath, []byte(connectedCpus), cpusetMode); err != nil {
return handleError(req.Wait, fmt.Errorf("Could not update cpuset cgroup '%s': %v", connectedCpus, err))
}

// add cgroup path to the map.
mapCgroupPaths[cpusetPath] = true
}
}

return nil
Expand Down
20 changes: 17 additions & 3 deletions grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ func TestOnlineCPUMem(t *testing.T) {
},
}

containerID := "1"
containerID2 := "2"
container := &container{
container: &mockContainer{
id: containerID,
processes: []int{1},
},
}
a.sandbox.containers[containerID] = container
a.sandbox.containers[containerID2] = container

req := &pb.OnlineCPUMemRequest{
NbCpus: 1,
Wait: true,
Expand Down Expand Up @@ -165,10 +176,13 @@ func TestOnlineCPUMem(t *testing.T) {
_, err = a.OnlineCPUMem(context.TODO(), req)
assert.Error(err, "docker cgroup path does not exist")

dockerCpusetPath, err := ioutil.TempDir("", "docker")
sysfsCpusetPath, err = ioutil.TempDir("", "cgroup")
assert.NoError(err)
cfg := container.container.Config()
cgroupPath := filepath.Join(sysfsCpusetPath, cfg.Cgroups.Path)
err = os.MkdirAll(cgroupPath, 0777)
assert.NoError(err)
defer os.RemoveAll(dockerCpusetPath)
sysfsDockerCpusetPath = filepath.Join(dockerCpusetPath, "cpuset.cpus")
defer os.RemoveAll(sysfsCpusetPath)

err = ioutil.WriteFile(memory0Online, []byte("0"), 0755)
assert.NoError(err)
Expand Down
2 changes: 2 additions & 0 deletions mockcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package main

import (
"fmt"
"os"

"github.com/opencontainers/runc/libcontainer"
Expand Down Expand Up @@ -37,6 +38,7 @@ func (m *mockContainer) Config() configs.Config {
Capabilities: &configs.Capabilities{},
Cgroups: &configs.Cgroup{
Resources: &configs.Resources{},
Path: fmt.Sprintf("/cgroup/%s", m.id),
},
Seccomp: &configs.Seccomp{},
}
Expand Down

0 comments on commit 4bad43e

Please sign in to comment.