Update strategy used to reuse CPUs from init containers in CPUManager

With the old strategy, it was possible for an init container to end up
running without some of its CPUs being exclusive if it requested more
guaranteed CPUs than the sum of all guaranteed CPUs requested by app
containers. Unfortunately, this case was not caught by our unit tests
because they didn't validate the state of the defaultCPUSet to ensure
there was no overlap with CPUs assigned to containers. This patch
updates the strategy to reuse the CPUs assigned to init containers
across into app containers, while avoiding this edge case. It also
updates the unit tests to now catch this type of error in the future.
This commit is contained in:
Kevin Klues 2020-04-23 20:27:43 +00:00
parent 775feed217
commit 751b9f3e13
3 changed files with 49 additions and 19 deletions

View File

@ -503,6 +503,8 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
testCase.expInitCSets,
testCase.expCSets...)
cumCSet := cpuset.NewCPUSet()
for i := range containers {
err := mgr.Allocate(testCase.pod, &containers[i])
if err != nil {
@ -525,6 +527,13 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
testCase.description, expCSets[i], containers[i].Name, cset)
}
cumCSet = cumCSet.Union(cset)
}
if !testCase.stDefaultCPUSet.Difference(cumCSet).Equals(state.defaultCPUSet) {
t.Errorf("StaticPolicy error (%v). expected final state for defaultCPUSet %v but got %v",
testCase.description, testCase.stDefaultCPUSet.Difference(cumCSet), state.defaultCPUSet)
}
}
}

View File

@ -77,6 +77,8 @@ type staticPolicy struct {
reserved cpuset.CPUSet
// topology manager reference to get container Topology affinity
affinity topologymanager.Store
// set of CPUs to reuse across allocations in a pod
cpusToReuse map[string]cpuset.CPUSet
}
// Ensure staticPolicy implements Policy interface
@ -107,9 +109,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
return &staticPolicy{
topology: topology,
reserved: reserved,
affinity: affinity,
topology: topology,
reserved: reserved,
affinity: affinity,
cpusToReuse: make(map[string]cpuset.CPUSet),
}, nil
}
@ -188,12 +191,37 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) {
// If pod entries to m.cpusToReuse other than the current pod exist, delete them.
for podUID := range p.cpusToReuse {
if podUID != string(pod.UID) {
delete(p.cpusToReuse, podUID)
}
}
// If no cpuset exists for cpusToReuse by this pod yet, create one.
if _, ok := p.cpusToReuse[string(pod.UID)]; !ok {
p.cpusToReuse[string(pod.UID)] = cpuset.NewCPUSet()
}
// Check if the container is an init container.
// If so, add its cpuset to the cpuset of reusable CPUs for any new allocations.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset)
return
}
}
// Otherwise it is an app container.
// Remove its cpuset from the cpuset of reusable CPUs for any new allocations.
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset)
}
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name)
// container belongs in an exclusively allocated pool
if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
return nil
}
@ -203,23 +231,14 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity)
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
if err != nil {
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err)
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
p.updateCPUsToReuse(pod, container, cpuset)
// Check if the container that has just been allocated resources is an init container.
// If so, release its CPUs back into the shared pool so they can be reallocated.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
// Mutate the shared pool, adding released cpus.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
}
}
}
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
@ -235,15 +254,17 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
return nil
}
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask) (cpuset.CPUSet, error) {
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)
assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs)
// If there are aligned CPUs in numaAffinity, attempt to take those first.
result := cpuset.NewCPUSet()
if numaAffinity != nil {
alignedCPUs := cpuset.NewCPUSet()
for _, numaNodeID := range numaAffinity.GetBits() {
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
}
numAlignedToAlloc := alignedCPUs.Size()
@ -260,7 +281,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
}
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size())
remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size())
if err != nil {
return cpuset.NewCPUSet(), err
}

View File

@ -639,7 +639,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
continue
}
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask)
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.NewCPUSet())
if err != nil {
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
tc.description, tc.expCSet, err)