From 751b9f3e1388698b8e510efb70654014b75145dc Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Thu, 23 Apr 2020 20:27:43 +0000 Subject: [PATCH] Update strategy used to reuse CPUs from init containers in CPUManager With the old strategy, it was possible for an init container to end up running without some of its CPUs being exclusive if it requested more guaranteed CPUs than the sum of all guaranteed CPUs requested by app containers. Unfortunately, this case was not caught by our unit tests because they didn't validate the state of the defaultCPUSet to ensure there was no overlap with CPUs assigned to containers. This patch updates the strategy to reuse the CPUs assigned to init containers across into app containers, while avoiding this edge case. It also updates the unit tests to now catch this type of error in the future. --- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 9 +++ pkg/kubelet/cm/cpumanager/policy_static.go | 57 +++++++++++++------ .../cm/cpumanager/policy_static_test.go | 2 +- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 746a5845a63..86d2063e59b 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -503,6 +503,8 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { testCase.expInitCSets, testCase.expCSets...) + cumCSet := cpuset.NewCPUSet() + for i := range containers { err := mgr.Allocate(testCase.pod, &containers[i]) if err != nil { @@ -525,6 +527,13 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v", testCase.description, expCSets[i], containers[i].Name, cset) } + + cumCSet = cumCSet.Union(cset) + } + + if !testCase.stDefaultCPUSet.Difference(cumCSet).Equals(state.defaultCPUSet) { + t.Errorf("StaticPolicy error (%v). expected final state for defaultCPUSet %v but got %v", + testCase.description, testCase.stDefaultCPUSet.Difference(cumCSet), state.defaultCPUSet) } } } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index da68ed808bd..02b88e9b4c0 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -77,6 +77,8 @@ type staticPolicy struct { reserved cpuset.CPUSet // topology manager reference to get container Topology affinity affinity topologymanager.Store + // set of CPUs to reuse across allocations in a pod + cpusToReuse map[string]cpuset.CPUSet } // Ensure staticPolicy implements Policy interface @@ -107,9 +109,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) return &staticPolicy{ - topology: topology, - reserved: reserved, - affinity: affinity, + topology: topology, + reserved: reserved, + affinity: affinity, + cpusToReuse: make(map[string]cpuset.CPUSet), }, nil } @@ -188,12 +191,37 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } +func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) { + // If pod entries to m.cpusToReuse other than the current pod exist, delete them. + for podUID := range p.cpusToReuse { + if podUID != string(pod.UID) { + delete(p.cpusToReuse, podUID) + } + } + // If no cpuset exists for cpusToReuse by this pod yet, create one. + if _, ok := p.cpusToReuse[string(pod.UID)]; !ok { + p.cpusToReuse[string(pod.UID)] = cpuset.NewCPUSet() + } + // Check if the container is an init container. + // If so, add its cpuset to the cpuset of reusable CPUs for any new allocations. + for _, initContainer := range pod.Spec.InitContainers { + if container.Name == initContainer.Name { + p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset) + return + } + } + // Otherwise it is an app container. + // Remove its cpuset from the cpuset of reusable CPUs for any new allocations. + p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset) +} + func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name) // container belongs in an exclusively allocated pool - if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + p.updateCPUsToReuse(pod, container, cpuset) klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name) return nil } @@ -203,23 +231,14 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint) // Allocate CPUs according to the NUMA affinity contained in the hint. - cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity) + cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) if err != nil { klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err) return err } s.SetCPUSet(string(pod.UID), container.Name, cpuset) + p.updateCPUsToReuse(pod, container, cpuset) - // Check if the container that has just been allocated resources is an init container. - // If so, release its CPUs back into the shared pool so they can be reallocated. - for _, initContainer := range pod.Spec.InitContainers { - if container.Name == initContainer.Name { - if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { - // Mutate the shared pool, adding released cpus. - s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) - } - } - } } // container belongs in the shared pool (nothing to do; use default cpuset) return nil @@ -235,15 +254,17 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa return nil } -func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask) (cpuset.CPUSet, error) { +func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity) + assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs) + // If there are aligned CPUs in numaAffinity, attempt to take those first. result := cpuset.NewCPUSet() if numaAffinity != nil { alignedCPUs := cpuset.NewCPUSet() for _, numaNodeID := range numaAffinity.GetBits() { - alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) + alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) } numAlignedToAlloc := alignedCPUs.Size() @@ -260,7 +281,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size()) + remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index ea2bcf11333..b4b46c68c17 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -639,7 +639,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { continue } - cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask) + cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.NewCPUSet()) if err != nil { t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v", tc.description, tc.expCSet, err)