From 462544d0796853a3105a4f21ec383369d3d08e2c Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 12 Oct 2021 10:07:54 +0000 Subject: [PATCH] Split CPUManager takeByTopology() into two different algorithms The first implements the original algorithm which packs CPUs onto NUMA nodes if more than one NUMA node is required to satisfy the allocation. The second disitributes CPUs across NUMA nodes if they can't all fit into one. The "distributing" algorithm is currently a noop and just returns an error of "unimplemented". A subsequent commit will add the logic to implement this algorithm according to KEP 2902: https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/cpu_assignment.go | 6 +++- .../cm/cpumanager/cpu_assignment_test.go | 4 +-- pkg/kubelet/cm/cpumanager/policy_static.go | 29 ++++++++++++------- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index 50011eae887..969cd72c6e6 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -318,7 +318,7 @@ func (a *cpuAccumulator) isFailed() bool { return a.numCPUsNeeded > a.details.CPUs().Size() } -func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { +func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { acc := newCPUAccumulator(topo, availableCPUs, numCPUs) if acc.isSatisfied() { return acc.result, nil @@ -358,3 +358,7 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") } + +func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + return cpuset.NewCPUSet(), fmt.Errorf("unimplemented") +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 5dfc80b4340..02d5e777aba 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -506,7 +506,7 @@ func TestCPUAccumulatorTake(t *testing.T) { } } -func TestTakeByTopology(t *testing.T) { +func TestTakeByTopologyNUMAPacked(t *testing.T) { testCases := []struct { description string topo *topology.CPUTopology @@ -631,7 +631,7 @@ func TestTakeByTopology(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - result, err := takeByTopology(tc.topo, tc.availableCPUs, tc.numCPUs) + result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs) if tc.expErr != "" && err.Error() != tc.expErr { t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err) } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index f5d275d8ea8..c44717ff033 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -118,6 +118,13 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv klog.InfoS("Static policy created with configuration", "options", opts) + policy := &staticPolicy{ + topology: topology, + affinity: affinity, + cpusToReuse: make(map[string]cpuset.CPUSet), + options: opts, + } + allCPUs := topology.CPUDetails.CPUs() var reserved cpuset.CPUSet if reservedCPUs.Size() > 0 { @@ -128,7 +135,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv // // For example: Given a system with 8 CPUs available and HT enabled, // if numReservedCPUs=2, then reserved={0,4} - reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs) + reserved, _ = policy.takeByTopology(allCPUs, numReservedCPUs) } if reserved.Size() != numReservedCPUs { @@ -137,14 +144,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv } klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved) + policy.reserved = reserved - return &staticPolicy{ - topology: topology, - reserved: reserved, - affinity: affinity, - cpusToReuse: make(map[string]cpuset.CPUSet), - options: opts, - }, nil + return policy, nil } func (p *staticPolicy) Name() string { @@ -318,7 +320,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit numAlignedToAlloc = numCPUs } - alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc) + alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) if err != nil { return cpuset.NewCPUSet(), err } @@ -327,7 +329,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size()) + remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } @@ -381,6 +383,13 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int { return requestedByAppContainers } +func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + if p.options.DistributeCPUsAcrossNUMA { + return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs) + } + return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs) +} + func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { // Get a count of how many guaranteed CPUs have been requested. requested := p.guaranteedCPUs(pod, container)