From 0e7928edce5de164b8f162d51ebeb849873847d3 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 12 Oct 2021 09:24:05 +0000 Subject: [PATCH 1/9] Add new CPUManager policy option for "distribute-cpus-across-numa" This commit only adds the option to the policy options framework. A subsequent commit will add the logic to utilize it. The KEP describing this new option can be found here: https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/policy_options.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_options.go b/pkg/kubelet/cm/cpumanager/policy_options.go index 84461cf1b94..64b14833e58 100644 --- a/pkg/kubelet/cm/cpumanager/policy_options.go +++ b/pkg/kubelet/cm/cpumanager/policy_options.go @@ -26,13 +26,15 @@ import ( ) const ( - // FullPCPUsOnlyOption is the name of the CPU Manager policy option - FullPCPUsOnlyOption string = "full-pcpus-only" + FullPCPUsOnlyOption string = "full-pcpus-only" + DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa" ) var ( - alphaOptions = sets.NewString() - betaOptions = sets.NewString( + alphaOptions = sets.NewString( + DistributeCPUsAcrossNUMAOption, + ) + betaOptions = sets.NewString( FullPCPUsOnlyOption, ) stableOptions = sets.NewString() @@ -64,6 +66,9 @@ type StaticPolicyOptions struct { // any possible naming scheme will lead to ambiguity to some extent. // We picked "pcpu" because it the established docs hints at vCPU already. FullPhysicalCPUsOnly bool + // Flag to evenly distribute CPUs across NUMA nodes in cases where more + // than one NUMA node is required to satisfy the allocation. + DistributeCPUsAcrossNUMA bool } func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOptions, error) { @@ -80,6 +85,12 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption return opts, fmt.Errorf("bad value for option %q: %w", name, err) } opts.FullPhysicalCPUsOnly = optValue + case DistributeCPUsAcrossNUMAOption: + optValue, err := strconv.ParseBool(value) + if err != nil { + return opts, fmt.Errorf("bad value for option %q: %w", name, err) + } + opts.DistributeCPUsAcrossNUMA = optValue default: // this should never be reached, we already detect unknown options, // but we keep it as further safety. From 462544d0796853a3105a4f21ec383369d3d08e2c Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 12 Oct 2021 10:07:54 +0000 Subject: [PATCH 2/9] Split CPUManager takeByTopology() into two different algorithms The first implements the original algorithm which packs CPUs onto NUMA nodes if more than one NUMA node is required to satisfy the allocation. The second disitributes CPUs across NUMA nodes if they can't all fit into one. The "distributing" algorithm is currently a noop and just returns an error of "unimplemented". A subsequent commit will add the logic to implement this algorithm according to KEP 2902: https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/cpu_assignment.go | 6 +++- .../cm/cpumanager/cpu_assignment_test.go | 4 +-- pkg/kubelet/cm/cpumanager/policy_static.go | 29 ++++++++++++------- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index 50011eae887..969cd72c6e6 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -318,7 +318,7 @@ func (a *cpuAccumulator) isFailed() bool { return a.numCPUsNeeded > a.details.CPUs().Size() } -func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { +func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { acc := newCPUAccumulator(topo, availableCPUs, numCPUs) if acc.isSatisfied() { return acc.result, nil @@ -358,3 +358,7 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") } + +func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + return cpuset.NewCPUSet(), fmt.Errorf("unimplemented") +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 5dfc80b4340..02d5e777aba 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -506,7 +506,7 @@ func TestCPUAccumulatorTake(t *testing.T) { } } -func TestTakeByTopology(t *testing.T) { +func TestTakeByTopologyNUMAPacked(t *testing.T) { testCases := []struct { description string topo *topology.CPUTopology @@ -631,7 +631,7 @@ func TestTakeByTopology(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - result, err := takeByTopology(tc.topo, tc.availableCPUs, tc.numCPUs) + result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs) if tc.expErr != "" && err.Error() != tc.expErr { t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err) } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index f5d275d8ea8..c44717ff033 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -118,6 +118,13 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv klog.InfoS("Static policy created with configuration", "options", opts) + policy := &staticPolicy{ + topology: topology, + affinity: affinity, + cpusToReuse: make(map[string]cpuset.CPUSet), + options: opts, + } + allCPUs := topology.CPUDetails.CPUs() var reserved cpuset.CPUSet if reservedCPUs.Size() > 0 { @@ -128,7 +135,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv // // For example: Given a system with 8 CPUs available and HT enabled, // if numReservedCPUs=2, then reserved={0,4} - reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs) + reserved, _ = policy.takeByTopology(allCPUs, numReservedCPUs) } if reserved.Size() != numReservedCPUs { @@ -137,14 +144,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv } klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved) + policy.reserved = reserved - return &staticPolicy{ - topology: topology, - reserved: reserved, - affinity: affinity, - cpusToReuse: make(map[string]cpuset.CPUSet), - options: opts, - }, nil + return policy, nil } func (p *staticPolicy) Name() string { @@ -318,7 +320,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit numAlignedToAlloc = numCPUs } - alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc) + alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) if err != nil { return cpuset.NewCPUSet(), err } @@ -327,7 +329,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size()) + remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } @@ -381,6 +383,13 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int { return requestedByAppContainers } +func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + if p.options.DistributeCPUsAcrossNUMA { + return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs) + } + return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs) +} + func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { // Get a count of how many guaranteed CPUs have been requested. requested := p.guaranteedCPUs(pod, container) From 876dd9b078b6e39b51d75f87a3a8ed5253e99b8f Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Wed, 13 Oct 2021 11:10:40 +0000 Subject: [PATCH 3/9] Added algorithm to CPUManager to distribute CPUs across NUMA nodes Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/cpu_assignment.go | 269 +++++++++++++++++++- 1 file changed, 268 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index 969cd72c6e6..1a844495450 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -18,6 +18,7 @@ package cpumanager import ( "fmt" + "math" "sort" "k8s.io/klog/v2" @@ -26,6 +27,49 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) +type mapIntInt map[int]int + +func (m mapIntInt) Clone() mapIntInt { + cp := make(mapIntInt, len(m)) + for k, v := range m { + cp[k] = v + } + return cp +} + +func (m mapIntInt) Keys() []int { + keys := make([]int, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +func (m mapIntInt) Values() []int { + values := make([]int, len(m)) + for _, v := range m { + values = append(values, v) + } + return values +} + +func mean(xs []int) float64 { + var sum float64 + for _, x := range xs { + sum += float64(x) + } + return sum / float64(len(xs)) +} + +func standardDeviation(xs []int) float64 { + m := mean(xs) + var sum float64 + for _, x := range xs { + sum += (float64(x) - m) * (float64(x) - m) + } + return math.Sqrt(sum / float64(len(xs))) +} + type numaOrSocketsFirstFuncs interface { takeFullFirstLevel() takeFullSecondLevel() @@ -318,6 +362,28 @@ func (a *cpuAccumulator) isFailed() bool { return a.numCPUsNeeded > a.details.CPUs().Size() } +// iterateCombinations walks through all n-choose-k subsets of size k in n and +// calls function 'f()' on each subset. For example, if n={0,1,2}, and k=2, +// then f() will be called on the subsets {0,1}, {0,2}. and {1,2}. +func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int)) { + if k < 1 { + return + } + + var helper func(n []int, k int, start int, accum []int, f func([]int)) + helper = func(n []int, k int, start int, accum []int, f func([]int)) { + if k == 0 { + f(accum) + return + } + for i := start; i <= len(n)-k; i++ { + helper(n, k-1, i+1, append(accum, n[i]), f) + } + } + + helper(n, k, 0, []int{}, f) +} + func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { acc := newCPUAccumulator(topo, availableCPUs, numCPUs) if acc.isSatisfied() { @@ -359,6 +425,207 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") } +// takeByTopologyNUMADistributed returns a CPUSet of size 'numCPUs'. +// +// It generates this CPUset by allocating CPUs from 'availableCPUs' according +// to the algorithm outlined in KEP-2902: +// +// https://github.com/kubernetes/enhancements/tree/e7f51ffbe2ee398ffd1fba4a6d854f276bfad9fb/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option +// +// This algorithm evenly distribute CPUs across NUMA nodes in cases where more +// than one NUMA node is required to satisfy the allocation. This is in +// contrast to the takeByTopologyNUMAPacked algorithm, which attempts to 'pack' +// CPUs onto NUMA nodes and fill them up before moving on to the next one. +// +// At a high-level this algorithm can be summarized as: +// +// For each NUMA single node: +// * If all requested CPUs can be allocated from this NUMA node; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in that NUMA node and return +// +// Otherwise, for each pair of NUMA nodes: +// * If the set of requested CPUs (modulo 2) can be evenly split across +// the 2 NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in both NUMA nodes and return +// +// Otherwise, for each 3-tuple of NUMA nodes: +// * If the set of requested CPUs (modulo 3) can be evenly distributed +// across the 3 NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in all three NUMA nodes and return +// +// ... +// +// Otherwise, for the set of all NUMA nodes: +// * If the set of requested CPUs (modulo NUM_NUMA_NODES) can be evenly +// distributed across all NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in all NUMA nodes and return +// +// If none of the above conditions can be met, then resort back to a +// best-effort fit of packing CPUs into NUMA nodes by calling +// takeByTopologyNUMAPacked() over all available CPUs. +// +// NOTE: A "balance score" will be calculated to help find the best subset of +// NUMA nodes to allocate any 'remainder' CPUs from (in cases where the total +// number of CPUs to allocate cannot be evenly distributed across the chosen +// set of NUMA nodes). This "balance score" is calculated as the standard +// deviation of how many CPUs will be available on each NUMA node after all +// evenly distributed and remainder CPUs are allocated. The subset with the +// lowest "balance score" will receive the CPUs in order to keep the overall +// allocation of CPUs as "balanced" as possible. func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { - return cpuset.NewCPUSet(), fmt.Errorf("unimplemented") + acc := newCPUAccumulator(topo, availableCPUs, numCPUs) + if acc.isSatisfied() { + return acc.result, nil + } + if acc.isFailed() { + return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request") + } + + // Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'. + numas := acc.sortAvailableNUMANodes() + + // Try combinations of 1,2,3,... NUMA nodes until we find a combination + // where we can evenly distribute CPUs across them. + for i := range numas { + // Iterate through the various n-choose-k NUMA node combinations (where + // k=i+1 for this iteration of the loop), looking for the combination + // of NUMA nodes that can best have CPUs distributed across them. + var bestBalance float64 = math.MaxFloat64 + var bestRemainder []int = nil + var bestCombo []int = nil + acc.iterateCombinations(numas, i+1, func(combo []int) { + // If we've already found a combo with a balance of 0 in a + // different iteration, then don't bother checking any others. + // TODO: Add a way to just short circuit iterateCombinations() so + // we don't keep looping once such a combo is found. + if bestBalance == 0 { + return + } + + // Check that this combination of NUMA nodes has enough CPUs to + // satisfy the allocation overall. + cpus := acc.details.CPUsInNUMANodes(combo...) + if cpus.Size() < numCPUs { + return + } + + // Check that each NUMA node in this combination can provide + // (at least) numCPUs/len(combo) of the total cpus required. + distribution := numCPUs / len(combo) + for _, numa := range combo { + cpus := acc.details.CPUsInNUMANodes(numa) + if cpus.Size() < distribution { + return + } + } + + // Calculate how many CPUs will be available on each NUMA node + // in 'combo' ater allocating an even distribution of CPUs from + // them. This will be used to calculate a "balance" score for the + // combo to help decide which combo should ultimately be chosen. + availableAfterAllocation := make(mapIntInt, len(combo)) + for _, numa := range combo { + availableAfterAllocation[numa] = acc.details.CPUsInNUMANodes(numa).Size() - distribution + } + + // Check if there are any remaining CPUs to distribute across the + // NUMA nodes once CPUs have been evenly distributed. + remainder := numCPUs - (distribution * len(combo)) + + // Declare a set of local variables to help track the "balance + // scores" calculated when using different subsets of 'combo' to + // allocate remainder CPUs from. + var bestLocalBalance float64 = math.MaxFloat64 + var bestLocalRemainder []int = nil + + // If there aren't any remainder CPUs to allocate, then calculate + // the "balance score" of this combo as the standard deviation of + // the values contained in 'availableAfterAllocation'. + if remainder == 0 { + bestLocalBalance = standardDeviation(availableAfterAllocation.Values()) + bestLocalRemainder = nil + } + + // Otherwise, find the best "balance score" when allocating the + // remainder CPUs across different subsets of NUMA nodes in 'combo'. + acc.iterateCombinations(combo, remainder, func(subset []int) { + // Make a local copy of 'availableAfterAllocation'. + availableAfterAllocation := availableAfterAllocation.Clone() + + // For all NUMA nodes in 'subset', remove 1 more CPU (to account + // for any remainder CPUs that will be allocated on them. + for _, numa := range subset { + availableAfterAllocation[numa] -= 1 + } + + // Calculate the "balance score" as the standard deviation of + // the number of CPUs available on all NUMA nodes in 'combo' + // assuming the remainder CPUs are spread across 'subset'. + balance := standardDeviation(availableAfterAllocation.Values()) + if balance < bestLocalBalance { + bestLocalBalance = balance + bestLocalRemainder = subset + } + }) + + // If the best "balance score" for this combo is less than the + // lowest "balance score" of all previous combos, then update this + // combo (and remainder set) to be the best one found so far. + if bestLocalBalance < bestBalance { + bestBalance = bestLocalBalance + bestRemainder = bestLocalRemainder + bestCombo = combo + } + }) + + // If we made it through all of the iterations above without finding a + // combination of NUMA nodes that can properly balance CPU allocations, + // then move on to the next larger set of NUMA node combinations. + if bestCombo == nil { + continue + } + + // Otherwise, start allocating CPUs from the NUMA node combination + // chosen. First allocate numCPUs / len(bestCombo) CPUs from each node. + distribution := numCPUs / len(bestCombo) + for _, numa := range bestCombo { + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution) + acc.take(cpus) + } + + // Then allocate any remaining CPUs from each NUMA node in the remainder set. + for _, numa := range bestRemainder { + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), 1) + acc.take(cpus) + } + + // If we haven't allocated all of our CPUs at this point, then something + // went wrong in our accounting and we should error out. + if acc.numCPUsNeeded > 0 { + return cpuset.NewCPUSet(), fmt.Errorf("accounting error, not enough CPUs allocated, remaining: %v", acc.numCPUsNeeded) + } + + // Likewise, if we have allocated too many CPUs at this point, then something + // went wrong in our accounting and we should error out. + if acc.numCPUsNeeded < 0 { + return cpuset.NewCPUSet(), fmt.Errorf("accounting error, too many CPUs allocated, remaining: %v", acc.numCPUsNeeded) + } + + // Otherwise, return the result + return acc.result, nil + } + + // If we never found a combination of NUMA nodes that we could properly + // distribute CPUs across, fall back to the packing algorithm. + return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs) } From eb78e2406b1c5450d87f79ffa94709c2e6ba2616 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Wed, 13 Oct 2021 14:07:49 +0000 Subject: [PATCH 4/9] Add a new TestTakeByTopologyNUMADistributed() test to the CPUManager As part of this, pull out all of the existing "TakeByTopology" tests and have them be called by the original TestTakeByTopologyNUMAPacked() as well as the new TestTakeByTopologyNUMADistributed() test. In a subsequent commit, we will add some tests that should differ between these two algorithms. Signed-off-by: Kevin Klues --- .../cm/cpumanager/cpu_assignment_test.go | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 02d5e777aba..9f146ad4b75 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -506,15 +506,17 @@ func TestCPUAccumulatorTake(t *testing.T) { } } -func TestTakeByTopologyNUMAPacked(t *testing.T) { - testCases := []struct { - description string - topo *topology.CPUTopology - availableCPUs cpuset.CPUSet - numCPUs int - expErr string - expResult cpuset.CPUSet - }{ +type takeByTopologyTestCase struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + numCPUs int + expErr string + expResult cpuset.CPUSet +} + +func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { + return []takeByTopologyTestCase{ { "take more cpus than are available from single socket with HT", topoSingleSocketHT, @@ -628,7 +630,10 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { mustParseCPUSet(t, "10-39,50-79"), }, } +} +func TestTakeByTopologyNUMAPacked(t *testing.T) { + testCases := commonTakeByTopologyTestCases(t) for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs) @@ -642,6 +647,21 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { } } +func TestTakeByTopologyNUMADistributed(t *testing.T) { + testCases := commonTakeByTopologyTestCases(t) + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs) + if tc.expErr != "" && err.Error() != tc.expErr { + t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err) + } + if !result.Equals(tc.expResult) { + t.Errorf("expected result [%s] to equal [%s]", result, tc.expResult) + } + }) + } +} + func mustParseCPUSet(t *testing.T, s string) cpuset.CPUSet { cpus, err := cpuset.Parse(s) if err != nil { From cf3afb86029eeb88005a1517efa595a69cec9bec Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Wed, 13 Oct 2021 14:34:24 +0000 Subject: [PATCH 5/9] Add 2 distinguishing test cases between the 2 takeByTopology algorithms Signed-off-by: Kevin Klues --- .../cm/cpumanager/cpu_assignment_test.go | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 9f146ad4b75..809f0c35128 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -634,6 +634,25 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { func TestTakeByTopologyNUMAPacked(t *testing.T) { testCases := commonTakeByTopologyTestCases(t) + testCases = append(testCases, []takeByTopologyTestCase{ + { + "allocate 4 full cores with 3 coming from the first NUMA node (filling it up) and 1 coming from the second NUMA node", + topoDualSocketHT, + mustParseCPUSet(t, "0-11"), + 8, + "", + mustParseCPUSet(t, "0,6,2,8,4,10,1,7"), + }, + { + "allocate 32 full cores with 30 coming from the first 3 NUMA nodes (filling them up) and 2 coming from the fourth NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 64, + "", + mustParseCPUSet(t, "0-29,40-69,30,31,70,71"), + }, + }...) + for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs) @@ -649,6 +668,25 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { func TestTakeByTopologyNUMADistributed(t *testing.T) { testCases := commonTakeByTopologyTestCases(t) + testCases = append(testCases, []takeByTopologyTestCase{ + { + "allocate 4 full cores with 2 distributed across each NUMA node", + topoDualSocketHT, + mustParseCPUSet(t, "0-11"), + 8, + "", + mustParseCPUSet(t, "0,6,2,8,1,7,3,9"), + }, + { + "allocate 32 full cores with 8 distributed across each NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 64, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-47,50-57,60-67,70-77"), + }, + }...) + for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs) From 1436e336426823a31a5fc5fff4e12c38d1afd523 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Thu, 14 Oct 2021 11:40:09 +0000 Subject: [PATCH 6/9] Add more extensive testing for NUMA distribution algorithm in CPUManager Signed-off-by: Kevin Klues --- .../cm/cpumanager/cpu_assignment_test.go | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 809f0c35128..4432be5aa8c 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -685,6 +685,70 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { "", mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-47,50-57,60-67,70-77"), }, + { + "allocate 24 full cores with 8 distributed across the first 3 NUMA nodes", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 48, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,40-47,50-57,60-67"), + }, + { + "allocate 24 full cores with 8 distributed across the first 3 NUMA nodes (filling the first NUMA node)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "2-39,42-79"), + 48, + "", + mustParseCPUSet(t, "2-9,10-17,20-27,42-49,50-57,60-67"), + }, + { + "allocate 24 full cores with 8 distributed across the last 3 NUMA nodes (no room on the first NUMA node to distribute)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "3-39,43-79"), + 48, + "", + mustParseCPUSet(t, "10-17,20-27,30-37,50-57,60-67,70-77"), + }, + { + "allocate 8 full cores with 2 distributed across each NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-2,10-12,20-22,30-32,40-41,50-51,60-61,70-71"), + 16, + "", + mustParseCPUSet(t, "0-1,10-11,20-21,30-31,40-41,50-51,60-61,70-71"), + }, + { + "allocate 8 full cores with 2 distributed across each NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-2,10-12,20-22,30-32,40-41,50-51,60-61,70-71"), + 16, + "", + mustParseCPUSet(t, "0-1,10-11,20-21,30-31,40-41,50-51,60-61,70-71"), + }, + { + "allocate 13 full cores distributed across the first 2 NUMA nodes", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 26, + "", + mustParseCPUSet(t, "0-6,10-16,40-45,50-55"), + }, + { + "allocate 31 full cores with 15 CPUs distributed across each NUMA node and 1 CPU spilling over to each of NUMA 0, 1", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 62, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-47,50-57,60-66,70-76"), + }, + { + "allocate 31 full cores with 15 CPUs distributed across each NUMA node and 1 CPU spilling over to each of NUMA 2, 3 (to keep balance)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-8,10-18,20-39,40-48,50-58,60-79"), + 62, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-46,50-56,60-67,70-77"), + }, }...) for _, tc := range testCases { From d54445a84da2791b132ee91f8a12f4833a4de8f7 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Thu, 14 Oct 2021 23:20:05 +0000 Subject: [PATCH 7/9] Generalize the NUMA distribution algorithm to take cpuGroupSize This parameter ensures that CPUs are always allocated in groups of size 'cpuGroupSize'. This is important, for example, to ensure that all CPUs (i.e. hyperthreads) from the same core are handed out together. Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/cpu_assignment.go | 60 ++++++++++----- .../cm/cpumanager/cpu_assignment_test.go | 75 ++++++++++++++++++- 2 files changed, 114 insertions(+), 21 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index 1a844495450..b23d021cfeb 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -482,7 +482,13 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C // evenly distributed and remainder CPUs are allocated. The subset with the // lowest "balance score" will receive the CPUs in order to keep the overall // allocation of CPUs as "balanced" as possible. -func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { +// +// NOTE: This algorithm has been generalized to take an additional +// 'cpuGroupSize' parameter to ensure that CPUs are always allocated in groups +// of size 'cpuGroupSize' according to the algorithm described above. This is +// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from +// a single core are allocated together. +func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int) (cpuset.CPUSet, error) { acc := newCPUAccumulator(topo, availableCPUs, numCPUs) if acc.isSatisfied() { return acc.result, nil @@ -519,9 +525,20 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu return } - // Check that each NUMA node in this combination can provide - // (at least) numCPUs/len(combo) of the total cpus required. - distribution := numCPUs / len(combo) + // Check that CPUs can be handed out in groups of size + // 'cpuGroupSize' across the NUMA nodes in this combo. + numCPUGroups := 0 + for _, numa := range combo { + numCPUGroups += (acc.details.CPUsInNUMANodes(numa).Size() / cpuGroupSize) + } + if (numCPUGroups * cpuGroupSize) < numCPUs { + return + } + + // Check that each NUMA node in this combination can allocate an + // even distribution of CPUs in groups of size 'cpuGroupSize', + // modulo some remainder. + distribution := (numCPUs / len(combo) / cpuGroupSize) * cpuGroupSize for _, numa := range combo { cpus := acc.details.CPUsInNUMANodes(numa) if cpus.Size() < distribution { @@ -529,17 +546,19 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu } } - // Calculate how many CPUs will be available on each NUMA node - // in 'combo' ater allocating an even distribution of CPUs from - // them. This will be used to calculate a "balance" score for the - // combo to help decide which combo should ultimately be chosen. + // Calculate how many CPUs will be available on each NUMA node in + // 'combo' after allocating an even distribution of CPU groups of + // size 'cpuGroupSize' from them. This will be used in the "balance + // score" calculation to help decide if this combo should + // ultimately be chosen. availableAfterAllocation := make(mapIntInt, len(combo)) for _, numa := range combo { availableAfterAllocation[numa] = acc.details.CPUsInNUMANodes(numa).Size() - distribution } // Check if there are any remaining CPUs to distribute across the - // NUMA nodes once CPUs have been evenly distributed. + // NUMA nodes once CPUs have been evenly distributed in groups of + // size 'cpuGroupSize'. remainder := numCPUs - (distribution * len(combo)) // Declare a set of local variables to help track the "balance @@ -558,19 +577,22 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu // Otherwise, find the best "balance score" when allocating the // remainder CPUs across different subsets of NUMA nodes in 'combo'. - acc.iterateCombinations(combo, remainder, func(subset []int) { + // These remainder CPUs are handed out in groups of size 'cpuGroupSize'. + acc.iterateCombinations(combo, remainder/cpuGroupSize, func(subset []int) { // Make a local copy of 'availableAfterAllocation'. availableAfterAllocation := availableAfterAllocation.Clone() - // For all NUMA nodes in 'subset', remove 1 more CPU (to account - // for any remainder CPUs that will be allocated on them. + // For all NUMA nodes in 'subset', remove another + // 'cpuGroupSize' number of CPUs (to account for any remainder + // CPUs that will be allocated on them). for _, numa := range subset { - availableAfterAllocation[numa] -= 1 + availableAfterAllocation[numa] -= cpuGroupSize } // Calculate the "balance score" as the standard deviation of // the number of CPUs available on all NUMA nodes in 'combo' - // assuming the remainder CPUs are spread across 'subset'. + // after the remainder CPUs have been allocated across 'subset' + // in groups of size 'cpuGroupSize'. balance := standardDeviation(availableAfterAllocation.Values()) if balance < bestLocalBalance { bestLocalBalance = balance @@ -596,16 +618,18 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu } // Otherwise, start allocating CPUs from the NUMA node combination - // chosen. First allocate numCPUs / len(bestCombo) CPUs from each node. - distribution := numCPUs / len(bestCombo) + // chosen. First allocate an even distribution of CPUs in groups of + // size 'cpuGroupSize' from 'bestCombo'. + distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize for _, numa := range bestCombo { cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution) acc.take(cpus) } - // Then allocate any remaining CPUs from each NUMA node in the remainder set. + // Then allocate any remaining CPUs in groups of size 'cpuGroupSize' + // from each NUMA node in the remainder set. for _, numa := range bestRemainder { - cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), 1) + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize) acc.take(cpus) } diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 4432be5aa8c..59059feb0cc 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -666,14 +666,39 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { } } -func TestTakeByTopologyNUMADistributed(t *testing.T) { +type takeByTopologyExtendedTestCase struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + numCPUs int + cpuGroupSize int + expErr string + expResult cpuset.CPUSet +} + +func commonTakeByTopologyExtendedTestCases(t *testing.T) []takeByTopologyExtendedTestCase { + var extendedTestCases []takeByTopologyExtendedTestCase + testCases := commonTakeByTopologyTestCases(t) - testCases = append(testCases, []takeByTopologyTestCase{ + for _, tc := range testCases { + extendedTestCases = append(extendedTestCases, takeByTopologyExtendedTestCase{ + tc.description, + tc.topo, + tc.availableCPUs, + tc.numCPUs, + 1, + tc.expErr, + tc.expResult, + }) + } + + extendedTestCases = append(extendedTestCases, []takeByTopologyExtendedTestCase{ { "allocate 4 full cores with 2 distributed across each NUMA node", topoDualSocketHT, mustParseCPUSet(t, "0-11"), 8, + 1, "", mustParseCPUSet(t, "0,6,2,8,1,7,3,9"), }, @@ -682,6 +707,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "0-79"), 64, + 1, "", mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-47,50-57,60-67,70-77"), }, @@ -690,6 +716,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "0-79"), 48, + 1, "", mustParseCPUSet(t, "0-7,10-17,20-27,40-47,50-57,60-67"), }, @@ -698,6 +725,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "2-39,42-79"), 48, + 1, "", mustParseCPUSet(t, "2-9,10-17,20-27,42-49,50-57,60-67"), }, @@ -706,6 +734,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "3-39,43-79"), 48, + 1, "", mustParseCPUSet(t, "10-17,20-27,30-37,50-57,60-67,70-77"), }, @@ -714,6 +743,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "0-2,10-12,20-22,30-32,40-41,50-51,60-61,70-71"), 16, + 1, "", mustParseCPUSet(t, "0-1,10-11,20-21,30-31,40-41,50-51,60-61,70-71"), }, @@ -722,38 +752,77 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "0-2,10-12,20-22,30-32,40-41,50-51,60-61,70-71"), 16, + 1, "", mustParseCPUSet(t, "0-1,10-11,20-21,30-31,40-41,50-51,60-61,70-71"), }, + }...) + + return extendedTestCases +} + +func TestTakeByTopologyNUMADistributed(t *testing.T) { + testCases := commonTakeByTopologyExtendedTestCases(t) + testCases = append(testCases, []takeByTopologyExtendedTestCase{ { "allocate 13 full cores distributed across the first 2 NUMA nodes", topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "0-79"), 26, + 1, "", mustParseCPUSet(t, "0-6,10-16,40-45,50-55"), }, + { + "allocate 13 full cores distributed across the first 2 NUMA nodes (cpuGroupSize 2)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 26, + 2, + "", + mustParseCPUSet(t, "0-6,10-15,40-46,50-55"), + }, { "allocate 31 full cores with 15 CPUs distributed across each NUMA node and 1 CPU spilling over to each of NUMA 0, 1", topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "0-79"), 62, + 1, "", mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-47,50-57,60-66,70-76"), }, + { + "allocate 31 full cores with 14 CPUs distributed across each NUMA node and 2 CPUs spilling over to each of NUMA 0, 1, 2 (cpuGroupSize 2)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 62, + 2, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-36,40-47,50-57,60-67,70-76"), + }, { "allocate 31 full cores with 15 CPUs distributed across each NUMA node and 1 CPU spilling over to each of NUMA 2, 3 (to keep balance)", topoDualSocketMultiNumaPerSocketHT, mustParseCPUSet(t, "0-8,10-18,20-39,40-48,50-58,60-79"), 62, + 1, "", mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-46,50-56,60-67,70-77"), }, + { + "allocate 31 full cores with 14 CPUs distributed across each NUMA node and 2 CPUs spilling over to each of NUMA 0, 2, 3 (to keep balance with cpuGroupSize 2)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-8,10-18,20-39,40-48,50-58,60-79"), + 62, + 2, + "", + mustParseCPUSet(t, "0-7,10-16,20-27,30-37,40-47,50-56,60-67,70-77"), + }, }...) for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs) + result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize) if tc.expErr != "" && err.Error() != tc.expErr { t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err) } From 70e0f47191d6d02450ae2d420fbf4645eecd0d90 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Thu, 14 Oct 2021 23:22:22 +0000 Subject: [PATCH 8/9] Support full-pcpus-only with the new NUMA distribution policy option Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/policy_static.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index c44717ff033..f7ff26cd313 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -385,7 +385,11 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int { func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { if p.options.DistributeCPUsAcrossNUMA { - return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs) + cpuGroupSize := 1 + if p.options.FullPhysicalCPUsOnly { + cpuGroupSize = p.topology.CPUsPerCore() + } + return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize) } return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs) } From 86f9c266bc72b56aa6d3ffa2313c99478ca4596e Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Sat, 16 Oct 2021 19:26:56 +0000 Subject: [PATCH 9/9] Add optimizations to reduce iterations in distributed NUMA algorithm Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/cpu_assignment.go | 92 ++++++++++++++++----- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index b23d021cfeb..df6cc3b8265 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -27,6 +27,13 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) +type LoopControl int + +const ( + Continue LoopControl = iota + Break +) + type mapIntInt map[int]int func (m mapIntInt) Clone() mapIntInt { @@ -70,6 +77,13 @@ func standardDeviation(xs []int) float64 { return math.Sqrt(sum / float64(len(xs))) } +func min(x, y int) int { + if x < y { + return x + } + return y +} + type numaOrSocketsFirstFuncs interface { takeFullFirstLevel() takeFullSecondLevel() @@ -350,6 +364,31 @@ func (a *cpuAccumulator) takeRemainingCPUs() { } } +func (a *cpuAccumulator) rangeNUMANodesNeededToSatisfy(cpuGroupSize int) (int, int) { + // Get the total number of NUMA nodes that have CPUs available on them. + numNUMANodesAvailable := a.details.NUMANodes().Size() + + // Get the total number of CPUs available across all NUMA nodes. + numCPUsAvailable := a.details.CPUs().Size() + + // Calculate the number of available 'cpuGroups' across all NUMA nodes as + // well as the number of 'cpuGroups' that need to be allocated (rounding up). + numCPUGroupsAvailable := (numCPUsAvailable-1)/cpuGroupSize + 1 + numCPUGroupsNeeded := (a.numCPUsNeeded-1)/cpuGroupSize + 1 + + // Calculate the number of available 'cpuGroups' per NUMA Node (rounding up). + numCPUGroupsPerNUMANode := (numCPUGroupsAvailable-1)/numNUMANodesAvailable + 1 + + // Calculate the minimum number of numa nodes required to satisfy the + // allocation (rounding up). + minNUMAs := (numCPUGroupsNeeded-1)/numCPUGroupsPerNUMANode + 1 + + // Calculate the maximum number of numa nodes required to satisfy the allocation. + maxNUMAs := min(numCPUGroupsNeeded, numNUMANodesAvailable) + + return minNUMAs, maxNUMAs +} + func (a *cpuAccumulator) needs(n int) bool { return a.numCPUsNeeded >= n } @@ -364,21 +403,25 @@ func (a *cpuAccumulator) isFailed() bool { // iterateCombinations walks through all n-choose-k subsets of size k in n and // calls function 'f()' on each subset. For example, if n={0,1,2}, and k=2, -// then f() will be called on the subsets {0,1}, {0,2}. and {1,2}. -func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int)) { +// then f() will be called on the subsets {0,1}, {0,2}. and {1,2}. If f() ever +// returns 'Break', we break early and exit the loop. +func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopControl) { if k < 1 { return } - var helper func(n []int, k int, start int, accum []int, f func([]int)) - helper = func(n []int, k int, start int, accum []int, f func([]int)) { + var helper func(n []int, k int, start int, accum []int, f func([]int) LoopControl) LoopControl + helper = func(n []int, k int, start int, accum []int, f func([]int) LoopControl) LoopControl { if k == 0 { - f(accum) - return + return f(accum) } for i := start; i <= len(n)-k; i++ { - helper(n, k-1, i+1, append(accum, n[i]), f) + control := helper(n, k-1, i+1, append(accum, n[i]), f) + if control == Break { + return Break + } } + return Continue } helper(n, k, 0, []int{}, f) @@ -500,29 +543,34 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu // Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'. numas := acc.sortAvailableNUMANodes() + // Calculate the minimum and maximum possible number of NUMA nodes that + // could satisfy this request. This is used to optimize how many iterations + // of the loop we need to go through below. + minNUMAs, maxNUMAs := acc.rangeNUMANodesNeededToSatisfy(cpuGroupSize) + // Try combinations of 1,2,3,... NUMA nodes until we find a combination - // where we can evenly distribute CPUs across them. - for i := range numas { - // Iterate through the various n-choose-k NUMA node combinations (where - // k=i+1 for this iteration of the loop), looking for the combination - // of NUMA nodes that can best have CPUs distributed across them. + // where we can evenly distribute CPUs across them. To optimize things, we + // don't always start at 1 and end at len(numas). Instead, we use the + // values of 'minNUMAs' and 'maxNUMAs' calculated above. + for k := minNUMAs; k <= maxNUMAs; k++ { + // Iterate through the various n-choose-k NUMA node combinations, + // looking for the combination of NUMA nodes that can best have CPUs + // distributed across them. var bestBalance float64 = math.MaxFloat64 var bestRemainder []int = nil var bestCombo []int = nil - acc.iterateCombinations(numas, i+1, func(combo []int) { + acc.iterateCombinations(numas, k, func(combo []int) LoopControl { // If we've already found a combo with a balance of 0 in a // different iteration, then don't bother checking any others. - // TODO: Add a way to just short circuit iterateCombinations() so - // we don't keep looping once such a combo is found. if bestBalance == 0 { - return + return Break } // Check that this combination of NUMA nodes has enough CPUs to // satisfy the allocation overall. cpus := acc.details.CPUsInNUMANodes(combo...) if cpus.Size() < numCPUs { - return + return Continue } // Check that CPUs can be handed out in groups of size @@ -532,7 +580,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu numCPUGroups += (acc.details.CPUsInNUMANodes(numa).Size() / cpuGroupSize) } if (numCPUGroups * cpuGroupSize) < numCPUs { - return + return Continue } // Check that each NUMA node in this combination can allocate an @@ -542,7 +590,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu for _, numa := range combo { cpus := acc.details.CPUsInNUMANodes(numa) if cpus.Size() < distribution { - return + return Continue } } @@ -578,7 +626,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu // Otherwise, find the best "balance score" when allocating the // remainder CPUs across different subsets of NUMA nodes in 'combo'. // These remainder CPUs are handed out in groups of size 'cpuGroupSize'. - acc.iterateCombinations(combo, remainder/cpuGroupSize, func(subset []int) { + acc.iterateCombinations(combo, remainder/cpuGroupSize, func(subset []int) LoopControl { // Make a local copy of 'availableAfterAllocation'. availableAfterAllocation := availableAfterAllocation.Clone() @@ -598,6 +646,8 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu bestLocalBalance = balance bestLocalRemainder = subset } + + return Continue }) // If the best "balance score" for this combo is less than the @@ -608,6 +658,8 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu bestRemainder = bestLocalRemainder bestCombo = combo } + + return Continue }) // If we made it through all of the iterations above without finding a