From d0bcbbb4370c8af5e9326a362f0022896258c208 Mon Sep 17 00:00:00 2001 From: Connor Doyle Date: Tue, 22 Aug 2017 23:49:30 -0700 Subject: [PATCH] Added static cpumanager policy. --- cmd/kubelet/app/options/options.go | 2 +- pkg/kubelet/cm/cpumanager/BUILD | 4 + pkg/kubelet/cm/cpumanager/cpu_manager.go | 32 +++- pkg/kubelet/cm/cpumanager/policy_static.go | 172 +++++++++++++++++++++ 4 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 pkg/kubelet/cm/cpumanager/policy_static.go diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 920b8573453..a227a41d800 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -327,7 +327,7 @@ func AddKubeletConfigFlags(fs *pflag.FlagSet, c *kubeletconfig.KubeletConfigurat fs.BoolVar(&c.CgroupsPerQOS, "cgroups-per-qos", c.CgroupsPerQOS, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.") fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'") fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.") - fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, " CPU Manager policy to use. Possible values: 'none'. Default: 'none'") + fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, " CPU Manager policy to use. Possible values: 'none', 'static'. Default: 'none'") fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, " CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to `NodeStatusUpdateFrequency`") fs.StringVar(&c.ContainerRuntime, "container-runtime", c.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'.") fs.DurationVar(&c.RuntimeRequestTimeout.Duration, "runtime-request-timeout", c.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later.") diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index 8065b5231de..d7169c6296f 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -3,15 +3,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "cpu_assignment.go", "cpu_manager.go", "fake_cpu_manager.go", "policy.go", "policy_none.go", + "policy_static.go", ], visibility = ["//visibility:public"], deps = [ + "//pkg/api/v1/helper/qos:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/cm/cpumanager/state:go_default_library", + "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/status:go_default_library", diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 0ee042cbe4a..aade9f92667 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -18,6 +18,7 @@ package cpumanager import ( "fmt" + "math" "sync" "time" @@ -28,6 +29,7 @@ import ( runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/status" @@ -105,8 +107,36 @@ func NewManager( case PolicyNone: policy = NewNonePolicy() + case PolicyStatic: + topo, err := topology.Discover(machineInfo) + if err != nil { + return nil, err + } + glog.Infof("[cpumanager] detected CPU topology: %v", topo) + reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU] + if !ok { + // The static policy cannot initialize without this information. Panic! + panic("[cpumanager] unable to determine reserved CPU resources for static policy") + } + if reservedCPUs.IsZero() { + // Panic! + // + // The static policy requires this to be nonzero. Zero CPU reservation + // would allow the shared pool to be completely exhausted. At that point + // either we would violate our guarantee of exclusivity or need to evict + // any pod that has at least one container that requires zero CPUs. + // See the comments in policy_static.go for more details. + panic("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero") + } + + // Take the ceiling of the reservation, since fractional CPUs cannot be + // exclusively allocated. + reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 + numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) + policy = NewStaticPolicy(topo, numReservedCPUs) + default: - glog.Warningf("[cpumanager] Unknown policy (\"%s\"), falling back to \"%s\" policy (\"%s\")", cpuPolicyName, PolicyNone) + glog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone) policy = NewNonePolicy() } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go new file mode 100644 index 00000000000..1eca50b91f4 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -0,0 +1,172 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +// PolicyStatic is the name of the static policy +const PolicyStatic policyName = "static" + +var _ Policy = &staticPolicy{} + +// staticPolicy is a CPU manager policy that does not change CPU +// assignments for exclusively pinned guaranteed containers after the main +// container process starts. +// +// This policy allocates CPUs exclusively for a container if all the following +// conditions are met: +// +// - The pod QoS class is Guaranteed. +// - The CPU request is a positive integer. +// +// The static policy maintains the following sets of logical CPUs: +// +// - SHARED: Burstable, BestEffort, and non-integral Guaranteed containers +// run here. Initially this contains all CPU IDs on the system. As +// exclusive allocations are created and destroyed, this CPU set shrinks +// and grows, accordingly. This is stored in the state as the default +// CPU set. +// +// - RESERVED: A subset of the shared pool which is not exclusively +// allocatable. The membership of this pool is static for the lifetime of +// the Kubelet. The size of the reserved pool is +// ceil(systemreserved.cpu + kubereserved.cpu). +// Reserved CPUs are taken topologically starting with lowest-indexed +// physical core, as reported by cAdvisor. +// +// - ASSIGNABLE: Equal to SHARED - RESERVED. Exclusive CPUs are allocated +// from this pool. +// +// - EXCLUSIVE ALLOCATIONS: CPU sets assigned exclusively to one container. +// These are stored as explicit assignments in the state. +// +// When an exclusive allocation is made, the static policy also updates the +// default cpuset in the state abstraction. The CPU manager's periodic +// reconcile loop takes care of rewriting the cpuset in cgroupfs for any +// containers that may be running in the shared pool. For this reason, +// applications running within exclusively-allocated containers must tolerate +// potentially sharing their allocated CPUs for up to the CPU manager +// reconcile period. +type staticPolicy struct { + // cpu socket topology + topology *topology.CPUTopology + // set of CPUs that is not available for exclusive assignment + reserved cpuset.CPUSet +} + +// Ensure staticPolicy implements Policy interface +var _ Policy = &staticPolicy{} + +// NewStaticPolicy returns a CPU manager policy that does not change CPU +// assignments for exclusively pinned guaranteed containers after the main +// container process starts. +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy { + allCPUs := topology.CPUDetails.CPUs() + // takeByTopology allocates CPUs associated with low-numbered cores from + // allCPUs. + // + // For example: Given a system with 8 CPUs available and HT enabled, + // if numReservedCPUs=2, then reserved={0,4} + reserved, _ := takeByTopology(topology, allCPUs, numReservedCPUs) + + if reserved.Size() != numReservedCPUs { + panic(fmt.Sprintf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)) + } + + glog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) + + return &staticPolicy{ + topology: topology, + reserved: reserved, + } +} + +func (p *staticPolicy) Name() string { + return string(PolicyStatic) +} + +func (p *staticPolicy) Start(s state.State) { + // Configure the shared pool to include all detected CPU IDs. + allCPUs := p.topology.CPUDetails.CPUs() + s.SetDefaultCPUSet(allCPUs) +} + +// assignableCPUs returns the set of unassigned CPUs minus the reserved set. +func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { + return s.GetDefaultCPUSet().Difference(p.reserved) +} + +func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { + glog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) + if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 { + // container belongs in an exclusively allocated pool + cpuset, err := p.allocateCPUs(s, numCPUs) + if err != nil { + glog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err) + return err + } + s.SetCPUSet(containerID, cpuset) + } + // container belongs in the shared pool (nothing to do; use default cpuset) + return nil +} + +func (p *staticPolicy) RemoveContainer(s state.State, containerID string) error { + glog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID) + if toRelease, ok := s.GetCPUSet(containerID); ok { + s.Delete(containerID) + // Mutate the shared pool, adding released cpus. + s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) + } + return nil +} + +func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) { + glog.Infof("[cpumanager] allocateCpus: (numCPUs: %d)", numCPUs) + result, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs) + if err != nil { + return cpuset.NewCPUSet(), err + } + // Remove allocated CPUs from the shared CPUSet. + s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result)) + + glog.Infof("[cpumanager] allocateCPUs: returning \"%v\"", result) + return result, nil +} + +func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { + if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { + return 0 + } + cpuQuantity := container.Resources.Requests[v1.ResourceCPU] + if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() { + return 0 + } + // Safe downcast to do for all systems with < 2.1 billion CPUs. + // Per the language spec, `int` is guaranteed to be at least 32 bits wide. + // https://golang.org/ref/spec#Numeric_types + return int(cpuQuantity.Value()) +}