diff --git a/cluster/saltbase/salt/kubelet/default b/cluster/saltbase/salt/kubelet/default index 0962711d3a0..9d228f39620 100644 --- a/cluster/saltbase/salt/kubelet/default +++ b/cluster/saltbase/salt/kubelet/default @@ -97,10 +97,15 @@ {% set pod_cidr = "--pod-cidr=" + grains['cbr-cidr'] %} {% endif %} +{% set cpu_cfs_quota = "" %} +{% if pillar['enable_cpu_cfs_quota'] is defined -%} + {% set cpu_cfs_quota = "--cpu-cfs-quota=" + pillar['enable_cpu_cfs_quota'] -%} +{% endif -%} + {% set test_args = "" -%} {% if pillar['kubelet_test_args'] is defined -%} {% set test_args=pillar['kubelet_test_args'] %} {% endif -%} # test_args has to be kept at the end, so they'll overwrite any prior configuration -DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} {{manifest_url}} --allow-privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}} {{test_args}}" +DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} {{manifest_url}} --allow-privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}} {{cpu_cfs_quota}} {{test_args}}" diff --git a/cluster/vagrant/config-default.sh b/cluster/vagrant/config-default.sh index 0bd26367cd1..598a0556e84 100755 --- a/cluster/vagrant/config-default.sh +++ b/cluster/vagrant/config-default.sh @@ -76,6 +76,9 @@ ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}" #EXTRA_DOCKER_OPTS="-b=cbr0 --selinux-enabled --insecure-registry 10.0.0.0/8" EXTRA_DOCKER_OPTS="-b=cbr0 --insecure-registry 10.0.0.0/8" +# Flag to tell the kubelet to enable CFS quota support +ENABLE_CPU_CFS_QUOTA="${KUBE_ENABLE_CPU_CFS_QUOTA:-true}" + # Optional: Install cluster DNS. ENABLE_CLUSTER_DNS="${KUBE_ENABLE_CLUSTER_DNS:-true}" DNS_SERVER_IP="10.247.0.10" diff --git a/cluster/vagrant/provision-master.sh b/cluster/vagrant/provision-master.sh index ac2cbc69329..d94be740054 100755 --- a/cluster/vagrant/provision-master.sh +++ b/cluster/vagrant/provision-master.sh @@ -126,6 +126,7 @@ cat </srv/salt-overlay/pillar/cluster-params.sls dns_domain: '$(echo "$DNS_DOMAIN" | sed -e "s/'/''/g")' instance_prefix: '$(echo "$INSTANCE_PREFIX" | sed -e "s/'/''/g")' admission_control: '$(echo "$ADMISSION_CONTROL" | sed -e "s/'/''/g")' + enable_cpu_cfs_quota: '$(echo "$ENABLE_CPU_CFS_QUOTA" | sed -e "s/'/''/g")' EOF # Configure the salt-master diff --git a/cluster/vagrant/util.sh b/cluster/vagrant/util.sh index 39cbc883c4c..f0803411d37 100644 --- a/cluster/vagrant/util.sh +++ b/cluster/vagrant/util.sh @@ -153,6 +153,7 @@ function create-provision-scripts { echo "KUBELET_TOKEN='${KUBELET_TOKEN:-}'" echo "KUBE_PROXY_TOKEN='${KUBE_PROXY_TOKEN:-}'" echo "MASTER_EXTRA_SANS='${MASTER_EXTRA_SANS:-}'" + echo "ENABLE_CPU_CFS_QUOTA='${ENABLE_CPU_CFS_QUOTA}'" awk '!/^#/' "${KUBE_ROOT}/cluster/vagrant/provision-network.sh" awk '!/^#/' "${KUBE_ROOT}/cluster/vagrant/provision-master.sh" ) > "${KUBE_TEMP}/master-start.sh" diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index f0a75f556ab..3eb671c148f 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -124,7 +124,7 @@ type KubeletServer struct { MaxPods int DockerExecHandlerName string ResolverConfig string - + CPUCFSQuota bool // Flags intended for testing // Crash immediately, rather than eating panics. @@ -189,6 +189,7 @@ func NewKubeletServer() *KubeletServer { SystemContainer: "", ConfigureCBR0: false, DockerExecHandlerName: "native", + CPUCFSQuota: false, } } @@ -255,6 +256,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.DockerExecHandlerName, "docker-exec-handler", s.DockerExecHandlerName, "Handler to use when executing a command in a container. Valid values are 'native' and 'nsenter'. Defaults to 'native'.") fs.StringVar(&s.PodCIDR, "pod-cidr", "", "The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master.") fs.StringVar(&s.ResolverConfig, "resolv-conf", kubelet.ResolvConfDefault, "Resolver configuration file used as the basis for the container DNS resolution configuration.") + fs.BoolVar(&s.CPUCFSQuota, "cpu-cfs-quota", s.CPUCFSQuota, "Enable CPU CFS quota enforcement for containers that specify CPU limits") // Flags intended for testing, not recommended used in production environments. fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.") fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]") @@ -362,6 +364,7 @@ func (s *KubeletServer) KubeletConfig() (*KubeletConfig, error) { MaxPods: s.MaxPods, DockerExecHandler: dockerExecHandler, ResolverConfig: s.ResolverConfig, + CPUCFSQuota: s.CPUCFSQuota, }, nil } @@ -604,6 +607,7 @@ func SimpleKubelet(client *client.Client, MaxPods: 32, DockerExecHandler: &dockertools.NativeExecHandler{}, ResolverConfig: kubelet.ResolvConfDefault, + CPUCFSQuota: false, } return &kcfg } @@ -774,6 +778,7 @@ type KubeletConfig struct { MaxPods int DockerExecHandler dockertools.ExecHandler ResolverConfig string + CPUCFSQuota bool } func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { @@ -833,7 +838,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.PodCIDR, kc.MaxPods, kc.DockerExecHandler, - kc.ResolverConfig) + kc.ResolverConfig, + kc.CPUCFSQuota) if err != nil { return nil, nil, err diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index ce35b3d1a8d..fe5cab5b89a 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -228,6 +228,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { MaxPods: s.MaxPods, DockerExecHandler: dockerExecHandler, ResolverConfig: s.ResolverConfig, + CPUCFSQuota: s.CPUCFSQuota, } kcfg.NodeName = kcfg.Hostname @@ -330,6 +331,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( kc.MaxPods, kc.DockerExecHandler, kc.ResolverConfig, + kc.CPUCFSQuota, ) if err != nil { return nil, nil, err diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index b555c2216b0..a746b633c99 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -263,3 +263,4 @@ whitelist-override-label www-prefix retry_time file_content_in_loop +cpu-cfs-quota \ No newline at end of file diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 15bbde1df76..d96c201641e 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -50,6 +50,9 @@ const ( minShares = 2 sharesPerCPU = 1024 milliCPUToCPU = 1000 + + // 100000 is equivalent to 100ms + quotaPeriod = 100000 ) // DockerInterface is an abstract interface for testability. It abstracts the interface of docker.Client. @@ -306,6 +309,28 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { return client } +// milliCPUToQuota converts milliCPU to CFS quota and period values +func milliCPUToQuota(milliCPU int64) (quota int64, period int64) { + // CFS quota is measured in two values: + // - cfs_period_us=100ms (the amount of time to measure usage across) + // - cfs_quota=20ms (the amount of cpu time allowed to be used across a period) + // so in the above example, you are limited to 20% of a single CPU + // for multi-cpu environments, you just scale equivalent amounts + + if milliCPU == 0 { + // take the default behavior from docker + return + } + + // we set the period to 100ms by default + period = quotaPeriod + + // we then convert your milliCPU to a value normalized over a period + quota = (milliCPU * quotaPeriod) / milliCPUToCPU + + return +} + func milliCPUToShares(milliCPU int64) int64 { if milliCPU == 0 { // Docker converts zero milliCPU to unset, which maps to kernel default diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 22161b13d0c..760b8f63a5c 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -737,3 +737,43 @@ func TestMakePortsAndBindings(t *testing.T) { } } } + +func TestMilliCPUToQuota(t *testing.T) { + testCases := []struct { + input int64 + quota int64 + period int64 + }{ + { + input: int64(0), + quota: int64(0), + period: int64(0), + }, + { + input: int64(200), + quota: int64(20000), + period: int64(100000), + }, + { + input: int64(500), + quota: int64(50000), + period: int64(100000), + }, + { + input: int64(1000), + quota: int64(100000), + period: int64(100000), + }, + { + input: int64(1500), + quota: int64(150000), + period: int64(100000), + }, + } + for _, testCase := range testCases { + quota, period := milliCPUToQuota(testCase.input) + if quota != testCase.quota || period != testCase.period { + t.Errorf("Input %v, expected quota %v period %v, but got quota %v period %v", testCase.input, testCase.quota, testCase.period, quota, period) + } + } +} diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 3bb0a3cd961..03bf8a6aec3 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -46,7 +46,7 @@ func NewFakeDockerManager( fakeProcFs := procfs.NewFakeProcFs() dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, - fakeOomAdjuster, fakeProcFs) + fakeOomAdjuster, fakeProcFs, false) dm.dockerPuller = &FakeDockerPuller{} dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder) return dm diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index ef7ea29beb9..feb42fc8786 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -132,6 +132,9 @@ type DockerManager struct { // Get information from /proc mount. procFs procfs.ProcFsInterface + + // If true, enforce container cpu limits with CFS quota support + cpuCFSQuota bool } func NewDockerManager( @@ -150,7 +153,8 @@ func NewDockerManager( httpClient kubeletTypes.HttpGetter, execHandler ExecHandler, oomAdjuster *oom.OomAdjuster, - procFs procfs.ProcFsInterface) *DockerManager { + procFs procfs.ProcFsInterface, + cpuCFSQuota bool) *DockerManager { // Work out the location of the Docker runtime, defaulting to /var/lib/docker // if there are any problems. dockerRoot := "/var/lib/docker" @@ -201,6 +205,7 @@ func NewDockerManager( execHandler: execHandler, oomAdjuster: oomAdjuster, procFs: procFs, + cpuCFSQuota: cpuCFSQuota, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder) @@ -673,6 +678,7 @@ func (dm *DockerManager) runContainer( // of CPU shares. cpuShares = milliCPUToShares(cpuRequest.MilliValue()) } + _, containerName := BuildDockerName(dockerName, container) dockerOpts := docker.CreateContainerOptions{ Name: containerName, @@ -742,6 +748,15 @@ func (dm *DockerManager) runContainer( MemorySwap: -1, CPUShares: cpuShares, } + + if dm.cpuCFSQuota { + // if cpuLimit.Amount is nil, then the appropriate default value is returned to allow full usage of cpu resource. + cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue()) + + hc.CPUQuota = cpuQuota + hc.CPUPeriod = cpuPeriod + } + if len(opts.DNS) > 0 { hc.DNS = opts.DNS } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c4d3c13a3cb..6eb6a3222c3 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -168,7 +168,8 @@ func NewMainKubelet( podCIDR string, pods int, dockerExecHandler dockertools.ExecHandler, - resolverConfig string) (*Kubelet, error) { + resolverConfig string, + cpuCFSQuota bool) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -285,6 +286,7 @@ func NewMainKubelet( pods: pods, syncLoopMonitor: util.AtomicValue{}, resolverConfig: resolverConfig, + cpuCFSQuota: cpuCFSQuota, } if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { @@ -321,7 +323,8 @@ func NewMainKubelet( klet.httpClient, dockerExecHandler, oomAdjuster, - procFs) + procFs, + klet.cpuCFSQuota) case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -560,6 +563,9 @@ type Kubelet struct { // Optionally shape the bandwidth of a pod shaper bandwidth.BandwidthShaper + + // True if container cpu limits should be enforced via cgroup CFS quota + cpuCFSQuota bool } // getRootDir returns the full path to the directory under which kubelet can