mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #90419 from klueska/upstream-update-reusable-cpus-strategy
Update strategy used to reuse CPUs from init containers in CPUManager
This commit is contained in:
commit
561b2aba94
@ -503,6 +503,8 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
|
||||
testCase.expInitCSets,
|
||||
testCase.expCSets...)
|
||||
|
||||
cumCSet := cpuset.NewCPUSet()
|
||||
|
||||
for i := range containers {
|
||||
err := mgr.Allocate(testCase.pod, &containers[i])
|
||||
if err != nil {
|
||||
@ -525,6 +527,13 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
|
||||
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
|
||||
testCase.description, expCSets[i], containers[i].Name, cset)
|
||||
}
|
||||
|
||||
cumCSet = cumCSet.Union(cset)
|
||||
}
|
||||
|
||||
if !testCase.stDefaultCPUSet.Difference(cumCSet).Equals(state.defaultCPUSet) {
|
||||
t.Errorf("StaticPolicy error (%v). expected final state for defaultCPUSet %v but got %v",
|
||||
testCase.description, testCase.stDefaultCPUSet.Difference(cumCSet), state.defaultCPUSet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +77,8 @@ type staticPolicy struct {
|
||||
reserved cpuset.CPUSet
|
||||
// topology manager reference to get container Topology affinity
|
||||
affinity topologymanager.Store
|
||||
// set of CPUs to reuse across allocations in a pod
|
||||
cpusToReuse map[string]cpuset.CPUSet
|
||||
}
|
||||
|
||||
// Ensure staticPolicy implements Policy interface
|
||||
@ -107,9 +109,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
|
||||
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
|
||||
|
||||
return &staticPolicy{
|
||||
topology: topology,
|
||||
reserved: reserved,
|
||||
affinity: affinity,
|
||||
topology: topology,
|
||||
reserved: reserved,
|
||||
affinity: affinity,
|
||||
cpusToReuse: make(map[string]cpuset.CPUSet),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -188,12 +191,37 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
|
||||
return s.GetDefaultCPUSet().Difference(p.reserved)
|
||||
}
|
||||
|
||||
func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) {
|
||||
// If pod entries to m.cpusToReuse other than the current pod exist, delete them.
|
||||
for podUID := range p.cpusToReuse {
|
||||
if podUID != string(pod.UID) {
|
||||
delete(p.cpusToReuse, podUID)
|
||||
}
|
||||
}
|
||||
// If no cpuset exists for cpusToReuse by this pod yet, create one.
|
||||
if _, ok := p.cpusToReuse[string(pod.UID)]; !ok {
|
||||
p.cpusToReuse[string(pod.UID)] = cpuset.NewCPUSet()
|
||||
}
|
||||
// Check if the container is an init container.
|
||||
// If so, add its cpuset to the cpuset of reusable CPUs for any new allocations.
|
||||
for _, initContainer := range pod.Spec.InitContainers {
|
||||
if container.Name == initContainer.Name {
|
||||
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset)
|
||||
return
|
||||
}
|
||||
}
|
||||
// Otherwise it is an app container.
|
||||
// Remove its cpuset from the cpuset of reusable CPUs for any new allocations.
|
||||
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset)
|
||||
}
|
||||
|
||||
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
|
||||
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
|
||||
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name)
|
||||
// container belongs in an exclusively allocated pool
|
||||
|
||||
if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
|
||||
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
|
||||
p.updateCPUsToReuse(pod, container, cpuset)
|
||||
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
|
||||
return nil
|
||||
}
|
||||
@ -203,23 +231,14 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
|
||||
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
|
||||
|
||||
// Allocate CPUs according to the NUMA affinity contained in the hint.
|
||||
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity)
|
||||
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err)
|
||||
return err
|
||||
}
|
||||
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
|
||||
p.updateCPUsToReuse(pod, container, cpuset)
|
||||
|
||||
// Check if the container that has just been allocated resources is an init container.
|
||||
// If so, release its CPUs back into the shared pool so they can be reallocated.
|
||||
for _, initContainer := range pod.Spec.InitContainers {
|
||||
if container.Name == initContainer.Name {
|
||||
if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
|
||||
// Mutate the shared pool, adding released cpus.
|
||||
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// container belongs in the shared pool (nothing to do; use default cpuset)
|
||||
return nil
|
||||
@ -235,15 +254,17 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask) (cpuset.CPUSet, error) {
|
||||
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
|
||||
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)
|
||||
|
||||
assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs)
|
||||
|
||||
// If there are aligned CPUs in numaAffinity, attempt to take those first.
|
||||
result := cpuset.NewCPUSet()
|
||||
if numaAffinity != nil {
|
||||
alignedCPUs := cpuset.NewCPUSet()
|
||||
for _, numaNodeID := range numaAffinity.GetBits() {
|
||||
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
|
||||
alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
|
||||
}
|
||||
|
||||
numAlignedToAlloc := alignedCPUs.Size()
|
||||
@ -260,7 +281,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
|
||||
}
|
||||
|
||||
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
|
||||
remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size())
|
||||
remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size())
|
||||
if err != nil {
|
||||
return cpuset.NewCPUSet(), err
|
||||
}
|
||||
|
@ -639,7 +639,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask)
|
||||
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.NewCPUSet())
|
||||
if err != nil {
|
||||
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
|
||||
tc.description, tc.expCSet, err)
|
||||
|
Loading…
Reference in New Issue
Block a user