Added static cpumanager policy.

This commit is contained in:
Connor Doyle 2017-08-22 23:49:30 -07:00
parent e03a6435bb
commit d0bcbbb437
4 changed files with 208 additions and 2 deletions

View File

@ -327,7 +327,7 @@ func AddKubeletConfigFlags(fs *pflag.FlagSet, c *kubeletconfig.KubeletConfigurat
fs.BoolVar(&c.CgroupsPerQOS, "cgroups-per-qos", c.CgroupsPerQOS, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.")
fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'")
fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.")
fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, "<Warning: Alpha feature> CPU Manager policy to use. Possible values: 'none'. Default: 'none'")
fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, "<Warning: Alpha feature> CPU Manager policy to use. Possible values: 'none', 'static'. Default: 'none'")
fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, "<Warning: Alpha feature> CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to `NodeStatusUpdateFrequency`")
fs.StringVar(&c.ContainerRuntime, "container-runtime", c.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'.")
fs.DurationVar(&c.RuntimeRequestTimeout.Duration, "runtime-request-timeout", c.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later.")

View File

@ -3,15 +3,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cpu_assignment.go",
"cpu_manager.go",
"fake_cpu_manager.go",
"policy.go",
"policy_none.go",
"policy_static.go",
],
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/status:go_default_library",

View File

@ -18,6 +18,7 @@ package cpumanager
import (
"fmt"
"math"
"sync"
"time"
@ -28,6 +29,7 @@ import (
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
@ -105,8 +107,36 @@ func NewManager(
case PolicyNone:
policy = NewNonePolicy()
case PolicyStatic:
topo, err := topology.Discover(machineInfo)
if err != nil {
return nil, err
}
glog.Infof("[cpumanager] detected CPU topology: %v", topo)
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
if !ok {
// The static policy cannot initialize without this information. Panic!
panic("[cpumanager] unable to determine reserved CPU resources for static policy")
}
if reservedCPUs.IsZero() {
// Panic!
//
// The static policy requires this to be nonzero. Zero CPU reservation
// would allow the shared pool to be completely exhausted. At that point
// either we would violate our guarantee of exclusivity or need to evict
// any pod that has at least one container that requires zero CPUs.
// See the comments in policy_static.go for more details.
panic("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
}
// Take the ceiling of the reservation, since fractional CPUs cannot be
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy = NewStaticPolicy(topo, numReservedCPUs)
default:
glog.Warningf("[cpumanager] Unknown policy (\"%s\"), falling back to \"%s\" policy (\"%s\")", cpuPolicyName, PolicyNone)
glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone)
policy = NewNonePolicy()
}

View File

@ -0,0 +1,172 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)
// PolicyStatic is the name of the static policy
const PolicyStatic policyName = "static"
var _ Policy = &staticPolicy{}
// staticPolicy is a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
//
// This policy allocates CPUs exclusively for a container if all the following
// conditions are met:
//
// - The pod QoS class is Guaranteed.
// - The CPU request is a positive integer.
//
// The static policy maintains the following sets of logical CPUs:
//
// - SHARED: Burstable, BestEffort, and non-integral Guaranteed containers
// run here. Initially this contains all CPU IDs on the system. As
// exclusive allocations are created and destroyed, this CPU set shrinks
// and grows, accordingly. This is stored in the state as the default
// CPU set.
//
// - RESERVED: A subset of the shared pool which is not exclusively
// allocatable. The membership of this pool is static for the lifetime of
// the Kubelet. The size of the reserved pool is
// ceil(systemreserved.cpu + kubereserved.cpu).
// Reserved CPUs are taken topologically starting with lowest-indexed
// physical core, as reported by cAdvisor.
//
// - ASSIGNABLE: Equal to SHARED - RESERVED. Exclusive CPUs are allocated
// from this pool.
//
// - EXCLUSIVE ALLOCATIONS: CPU sets assigned exclusively to one container.
// These are stored as explicit assignments in the state.
//
// When an exclusive allocation is made, the static policy also updates the
// default cpuset in the state abstraction. The CPU manager's periodic
// reconcile loop takes care of rewriting the cpuset in cgroupfs for any
// containers that may be running in the shared pool. For this reason,
// applications running within exclusively-allocated containers must tolerate
// potentially sharing their allocated CPUs for up to the CPU manager
// reconcile period.
type staticPolicy struct {
// cpu socket topology
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reserved cpuset.CPUSet
}
// Ensure staticPolicy implements Policy interface
var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy {
allCPUs := topology.CPUDetails.CPUs()
// takeByTopology allocates CPUs associated with low-numbered cores from
// allCPUs.
//
// For example: Given a system with 8 CPUs available and HT enabled,
// if numReservedCPUs=2, then reserved={0,4}
reserved, _ := takeByTopology(topology, allCPUs, numReservedCPUs)
if reserved.Size() != numReservedCPUs {
panic(fmt.Sprintf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs))
}
glog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
return &staticPolicy{
topology: topology,
reserved: reserved,
}
}
func (p *staticPolicy) Name() string {
return string(PolicyStatic)
}
func (p *staticPolicy) Start(s state.State) {
// Configure the shared pool to include all detected CPU IDs.
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
}
// assignableCPUs returns the set of unassigned CPUs minus the reserved set.
func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
glog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 {
// container belongs in an exclusively allocated pool
cpuset, err := p.allocateCPUs(s, numCPUs)
if err != nil {
glog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)
return err
}
s.SetCPUSet(containerID, cpuset)
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
}
func (p *staticPolicy) RemoveContainer(s state.State, containerID string) error {
glog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID)
if toRelease, ok := s.GetCPUSet(containerID); ok {
s.Delete(containerID)
// Mutate the shared pool, adding released cpus.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
}
return nil
}
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) {
glog.Infof("[cpumanager] allocateCpus: (numCPUs: %d)", numCPUs)
result, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs)
if err != nil {
return cpuset.NewCPUSet(), err
}
// Remove allocated CPUs from the shared CPUSet.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
glog.Infof("[cpumanager] allocateCPUs: returning \"%v\"", result)
return result, nil
}
func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return 0
}
cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() {
return 0
}
// Safe downcast to do for all systems with < 2.1 billion CPUs.
// Per the language spec, `int` is guaranteed to be at least 32 bits wide.
// https://golang.org/ref/spec#Numeric_types
return int(cpuQuantity.Value())
}