diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index 50011eae887..df6cc3b8265 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -18,6 +18,7 @@ package cpumanager import ( "fmt" + "math" "sort" "k8s.io/klog/v2" @@ -26,6 +27,63 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) +type LoopControl int + +const ( + Continue LoopControl = iota + Break +) + +type mapIntInt map[int]int + +func (m mapIntInt) Clone() mapIntInt { + cp := make(mapIntInt, len(m)) + for k, v := range m { + cp[k] = v + } + return cp +} + +func (m mapIntInt) Keys() []int { + keys := make([]int, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +func (m mapIntInt) Values() []int { + values := make([]int, len(m)) + for _, v := range m { + values = append(values, v) + } + return values +} + +func mean(xs []int) float64 { + var sum float64 + for _, x := range xs { + sum += float64(x) + } + return sum / float64(len(xs)) +} + +func standardDeviation(xs []int) float64 { + m := mean(xs) + var sum float64 + for _, x := range xs { + sum += (float64(x) - m) * (float64(x) - m) + } + return math.Sqrt(sum / float64(len(xs))) +} + +func min(x, y int) int { + if x < y { + return x + } + return y +} + type numaOrSocketsFirstFuncs interface { takeFullFirstLevel() takeFullSecondLevel() @@ -306,6 +364,31 @@ func (a *cpuAccumulator) takeRemainingCPUs() { } } +func (a *cpuAccumulator) rangeNUMANodesNeededToSatisfy(cpuGroupSize int) (int, int) { + // Get the total number of NUMA nodes that have CPUs available on them. + numNUMANodesAvailable := a.details.NUMANodes().Size() + + // Get the total number of CPUs available across all NUMA nodes. + numCPUsAvailable := a.details.CPUs().Size() + + // Calculate the number of available 'cpuGroups' across all NUMA nodes as + // well as the number of 'cpuGroups' that need to be allocated (rounding up). + numCPUGroupsAvailable := (numCPUsAvailable-1)/cpuGroupSize + 1 + numCPUGroupsNeeded := (a.numCPUsNeeded-1)/cpuGroupSize + 1 + + // Calculate the number of available 'cpuGroups' per NUMA Node (rounding up). + numCPUGroupsPerNUMANode := (numCPUGroupsAvailable-1)/numNUMANodesAvailable + 1 + + // Calculate the minimum number of numa nodes required to satisfy the + // allocation (rounding up). + minNUMAs := (numCPUGroupsNeeded-1)/numCPUGroupsPerNUMANode + 1 + + // Calculate the maximum number of numa nodes required to satisfy the allocation. + maxNUMAs := min(numCPUGroupsNeeded, numNUMANodesAvailable) + + return minNUMAs, maxNUMAs +} + func (a *cpuAccumulator) needs(n int) bool { return a.numCPUsNeeded >= n } @@ -318,7 +401,33 @@ func (a *cpuAccumulator) isFailed() bool { return a.numCPUsNeeded > a.details.CPUs().Size() } -func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { +// iterateCombinations walks through all n-choose-k subsets of size k in n and +// calls function 'f()' on each subset. For example, if n={0,1,2}, and k=2, +// then f() will be called on the subsets {0,1}, {0,2}. and {1,2}. If f() ever +// returns 'Break', we break early and exit the loop. +func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopControl) { + if k < 1 { + return + } + + var helper func(n []int, k int, start int, accum []int, f func([]int) LoopControl) LoopControl + helper = func(n []int, k int, start int, accum []int, f func([]int) LoopControl) LoopControl { + if k == 0 { + return f(accum) + } + for i := start; i <= len(n)-k; i++ { + control := helper(n, k-1, i+1, append(accum, n[i]), f) + if control == Break { + return Break + } + } + return Continue + } + + helper(n, k, 0, []int{}, f) +} + +func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { acc := newCPUAccumulator(topo, availableCPUs, numCPUs) if acc.isSatisfied() { return acc.result, nil @@ -358,3 +467,241 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") } + +// takeByTopologyNUMADistributed returns a CPUSet of size 'numCPUs'. +// +// It generates this CPUset by allocating CPUs from 'availableCPUs' according +// to the algorithm outlined in KEP-2902: +// +// https://github.com/kubernetes/enhancements/tree/e7f51ffbe2ee398ffd1fba4a6d854f276bfad9fb/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option +// +// This algorithm evenly distribute CPUs across NUMA nodes in cases where more +// than one NUMA node is required to satisfy the allocation. This is in +// contrast to the takeByTopologyNUMAPacked algorithm, which attempts to 'pack' +// CPUs onto NUMA nodes and fill them up before moving on to the next one. +// +// At a high-level this algorithm can be summarized as: +// +// For each NUMA single node: +// * If all requested CPUs can be allocated from this NUMA node; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in that NUMA node and return +// +// Otherwise, for each pair of NUMA nodes: +// * If the set of requested CPUs (modulo 2) can be evenly split across +// the 2 NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in both NUMA nodes and return +// +// Otherwise, for each 3-tuple of NUMA nodes: +// * If the set of requested CPUs (modulo 3) can be evenly distributed +// across the 3 NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in all three NUMA nodes and return +// +// ... +// +// Otherwise, for the set of all NUMA nodes: +// * If the set of requested CPUs (modulo NUM_NUMA_NODES) can be evenly +// distributed across all NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in all NUMA nodes and return +// +// If none of the above conditions can be met, then resort back to a +// best-effort fit of packing CPUs into NUMA nodes by calling +// takeByTopologyNUMAPacked() over all available CPUs. +// +// NOTE: A "balance score" will be calculated to help find the best subset of +// NUMA nodes to allocate any 'remainder' CPUs from (in cases where the total +// number of CPUs to allocate cannot be evenly distributed across the chosen +// set of NUMA nodes). This "balance score" is calculated as the standard +// deviation of how many CPUs will be available on each NUMA node after all +// evenly distributed and remainder CPUs are allocated. The subset with the +// lowest "balance score" will receive the CPUs in order to keep the overall +// allocation of CPUs as "balanced" as possible. +// +// NOTE: This algorithm has been generalized to take an additional +// 'cpuGroupSize' parameter to ensure that CPUs are always allocated in groups +// of size 'cpuGroupSize' according to the algorithm described above. This is +// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from +// a single core are allocated together. +func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int) (cpuset.CPUSet, error) { + acc := newCPUAccumulator(topo, availableCPUs, numCPUs) + if acc.isSatisfied() { + return acc.result, nil + } + if acc.isFailed() { + return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request") + } + + // Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'. + numas := acc.sortAvailableNUMANodes() + + // Calculate the minimum and maximum possible number of NUMA nodes that + // could satisfy this request. This is used to optimize how many iterations + // of the loop we need to go through below. + minNUMAs, maxNUMAs := acc.rangeNUMANodesNeededToSatisfy(cpuGroupSize) + + // Try combinations of 1,2,3,... NUMA nodes until we find a combination + // where we can evenly distribute CPUs across them. To optimize things, we + // don't always start at 1 and end at len(numas). Instead, we use the + // values of 'minNUMAs' and 'maxNUMAs' calculated above. + for k := minNUMAs; k <= maxNUMAs; k++ { + // Iterate through the various n-choose-k NUMA node combinations, + // looking for the combination of NUMA nodes that can best have CPUs + // distributed across them. + var bestBalance float64 = math.MaxFloat64 + var bestRemainder []int = nil + var bestCombo []int = nil + acc.iterateCombinations(numas, k, func(combo []int) LoopControl { + // If we've already found a combo with a balance of 0 in a + // different iteration, then don't bother checking any others. + if bestBalance == 0 { + return Break + } + + // Check that this combination of NUMA nodes has enough CPUs to + // satisfy the allocation overall. + cpus := acc.details.CPUsInNUMANodes(combo...) + if cpus.Size() < numCPUs { + return Continue + } + + // Check that CPUs can be handed out in groups of size + // 'cpuGroupSize' across the NUMA nodes in this combo. + numCPUGroups := 0 + for _, numa := range combo { + numCPUGroups += (acc.details.CPUsInNUMANodes(numa).Size() / cpuGroupSize) + } + if (numCPUGroups * cpuGroupSize) < numCPUs { + return Continue + } + + // Check that each NUMA node in this combination can allocate an + // even distribution of CPUs in groups of size 'cpuGroupSize', + // modulo some remainder. + distribution := (numCPUs / len(combo) / cpuGroupSize) * cpuGroupSize + for _, numa := range combo { + cpus := acc.details.CPUsInNUMANodes(numa) + if cpus.Size() < distribution { + return Continue + } + } + + // Calculate how many CPUs will be available on each NUMA node in + // 'combo' after allocating an even distribution of CPU groups of + // size 'cpuGroupSize' from them. This will be used in the "balance + // score" calculation to help decide if this combo should + // ultimately be chosen. + availableAfterAllocation := make(mapIntInt, len(combo)) + for _, numa := range combo { + availableAfterAllocation[numa] = acc.details.CPUsInNUMANodes(numa).Size() - distribution + } + + // Check if there are any remaining CPUs to distribute across the + // NUMA nodes once CPUs have been evenly distributed in groups of + // size 'cpuGroupSize'. + remainder := numCPUs - (distribution * len(combo)) + + // Declare a set of local variables to help track the "balance + // scores" calculated when using different subsets of 'combo' to + // allocate remainder CPUs from. + var bestLocalBalance float64 = math.MaxFloat64 + var bestLocalRemainder []int = nil + + // If there aren't any remainder CPUs to allocate, then calculate + // the "balance score" of this combo as the standard deviation of + // the values contained in 'availableAfterAllocation'. + if remainder == 0 { + bestLocalBalance = standardDeviation(availableAfterAllocation.Values()) + bestLocalRemainder = nil + } + + // Otherwise, find the best "balance score" when allocating the + // remainder CPUs across different subsets of NUMA nodes in 'combo'. + // These remainder CPUs are handed out in groups of size 'cpuGroupSize'. + acc.iterateCombinations(combo, remainder/cpuGroupSize, func(subset []int) LoopControl { + // Make a local copy of 'availableAfterAllocation'. + availableAfterAllocation := availableAfterAllocation.Clone() + + // For all NUMA nodes in 'subset', remove another + // 'cpuGroupSize' number of CPUs (to account for any remainder + // CPUs that will be allocated on them). + for _, numa := range subset { + availableAfterAllocation[numa] -= cpuGroupSize + } + + // Calculate the "balance score" as the standard deviation of + // the number of CPUs available on all NUMA nodes in 'combo' + // after the remainder CPUs have been allocated across 'subset' + // in groups of size 'cpuGroupSize'. + balance := standardDeviation(availableAfterAllocation.Values()) + if balance < bestLocalBalance { + bestLocalBalance = balance + bestLocalRemainder = subset + } + + return Continue + }) + + // If the best "balance score" for this combo is less than the + // lowest "balance score" of all previous combos, then update this + // combo (and remainder set) to be the best one found so far. + if bestLocalBalance < bestBalance { + bestBalance = bestLocalBalance + bestRemainder = bestLocalRemainder + bestCombo = combo + } + + return Continue + }) + + // If we made it through all of the iterations above without finding a + // combination of NUMA nodes that can properly balance CPU allocations, + // then move on to the next larger set of NUMA node combinations. + if bestCombo == nil { + continue + } + + // Otherwise, start allocating CPUs from the NUMA node combination + // chosen. First allocate an even distribution of CPUs in groups of + // size 'cpuGroupSize' from 'bestCombo'. + distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize + for _, numa := range bestCombo { + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution) + acc.take(cpus) + } + + // Then allocate any remaining CPUs in groups of size 'cpuGroupSize' + // from each NUMA node in the remainder set. + for _, numa := range bestRemainder { + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize) + acc.take(cpus) + } + + // If we haven't allocated all of our CPUs at this point, then something + // went wrong in our accounting and we should error out. + if acc.numCPUsNeeded > 0 { + return cpuset.NewCPUSet(), fmt.Errorf("accounting error, not enough CPUs allocated, remaining: %v", acc.numCPUsNeeded) + } + + // Likewise, if we have allocated too many CPUs at this point, then something + // went wrong in our accounting and we should error out. + if acc.numCPUsNeeded < 0 { + return cpuset.NewCPUSet(), fmt.Errorf("accounting error, too many CPUs allocated, remaining: %v", acc.numCPUsNeeded) + } + + // Otherwise, return the result + return acc.result, nil + } + + // If we never found a combination of NUMA nodes that we could properly + // distribute CPUs across, fall back to the packing algorithm. + return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs) +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 5dfc80b4340..59059feb0cc 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -506,15 +506,17 @@ func TestCPUAccumulatorTake(t *testing.T) { } } -func TestTakeByTopology(t *testing.T) { - testCases := []struct { - description string - topo *topology.CPUTopology - availableCPUs cpuset.CPUSet - numCPUs int - expErr string - expResult cpuset.CPUSet - }{ +type takeByTopologyTestCase struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + numCPUs int + expErr string + expResult cpuset.CPUSet +} + +func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { + return []takeByTopologyTestCase{ { "take more cpus than are available from single socket with HT", topoSingleSocketHT, @@ -628,10 +630,199 @@ func TestTakeByTopology(t *testing.T) { mustParseCPUSet(t, "10-39,50-79"), }, } +} + +func TestTakeByTopologyNUMAPacked(t *testing.T) { + testCases := commonTakeByTopologyTestCases(t) + testCases = append(testCases, []takeByTopologyTestCase{ + { + "allocate 4 full cores with 3 coming from the first NUMA node (filling it up) and 1 coming from the second NUMA node", + topoDualSocketHT, + mustParseCPUSet(t, "0-11"), + 8, + "", + mustParseCPUSet(t, "0,6,2,8,4,10,1,7"), + }, + { + "allocate 32 full cores with 30 coming from the first 3 NUMA nodes (filling them up) and 2 coming from the fourth NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 64, + "", + mustParseCPUSet(t, "0-29,40-69,30,31,70,71"), + }, + }...) for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - result, err := takeByTopology(tc.topo, tc.availableCPUs, tc.numCPUs) + result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs) + if tc.expErr != "" && err.Error() != tc.expErr { + t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err) + } + if !result.Equals(tc.expResult) { + t.Errorf("expected result [%s] to equal [%s]", result, tc.expResult) + } + }) + } +} + +type takeByTopologyExtendedTestCase struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + numCPUs int + cpuGroupSize int + expErr string + expResult cpuset.CPUSet +} + +func commonTakeByTopologyExtendedTestCases(t *testing.T) []takeByTopologyExtendedTestCase { + var extendedTestCases []takeByTopologyExtendedTestCase + + testCases := commonTakeByTopologyTestCases(t) + for _, tc := range testCases { + extendedTestCases = append(extendedTestCases, takeByTopologyExtendedTestCase{ + tc.description, + tc.topo, + tc.availableCPUs, + tc.numCPUs, + 1, + tc.expErr, + tc.expResult, + }) + } + + extendedTestCases = append(extendedTestCases, []takeByTopologyExtendedTestCase{ + { + "allocate 4 full cores with 2 distributed across each NUMA node", + topoDualSocketHT, + mustParseCPUSet(t, "0-11"), + 8, + 1, + "", + mustParseCPUSet(t, "0,6,2,8,1,7,3,9"), + }, + { + "allocate 32 full cores with 8 distributed across each NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 64, + 1, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-47,50-57,60-67,70-77"), + }, + { + "allocate 24 full cores with 8 distributed across the first 3 NUMA nodes", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 48, + 1, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,40-47,50-57,60-67"), + }, + { + "allocate 24 full cores with 8 distributed across the first 3 NUMA nodes (filling the first NUMA node)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "2-39,42-79"), + 48, + 1, + "", + mustParseCPUSet(t, "2-9,10-17,20-27,42-49,50-57,60-67"), + }, + { + "allocate 24 full cores with 8 distributed across the last 3 NUMA nodes (no room on the first NUMA node to distribute)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "3-39,43-79"), + 48, + 1, + "", + mustParseCPUSet(t, "10-17,20-27,30-37,50-57,60-67,70-77"), + }, + { + "allocate 8 full cores with 2 distributed across each NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-2,10-12,20-22,30-32,40-41,50-51,60-61,70-71"), + 16, + 1, + "", + mustParseCPUSet(t, "0-1,10-11,20-21,30-31,40-41,50-51,60-61,70-71"), + }, + { + "allocate 8 full cores with 2 distributed across each NUMA node", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-2,10-12,20-22,30-32,40-41,50-51,60-61,70-71"), + 16, + 1, + "", + mustParseCPUSet(t, "0-1,10-11,20-21,30-31,40-41,50-51,60-61,70-71"), + }, + }...) + + return extendedTestCases +} + +func TestTakeByTopologyNUMADistributed(t *testing.T) { + testCases := commonTakeByTopologyExtendedTestCases(t) + testCases = append(testCases, []takeByTopologyExtendedTestCase{ + { + "allocate 13 full cores distributed across the first 2 NUMA nodes", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 26, + 1, + "", + mustParseCPUSet(t, "0-6,10-16,40-45,50-55"), + }, + { + "allocate 13 full cores distributed across the first 2 NUMA nodes (cpuGroupSize 2)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 26, + 2, + "", + mustParseCPUSet(t, "0-6,10-15,40-46,50-55"), + }, + { + "allocate 31 full cores with 15 CPUs distributed across each NUMA node and 1 CPU spilling over to each of NUMA 0, 1", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 62, + 1, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-47,50-57,60-66,70-76"), + }, + { + "allocate 31 full cores with 14 CPUs distributed across each NUMA node and 2 CPUs spilling over to each of NUMA 0, 1, 2 (cpuGroupSize 2)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-79"), + 62, + 2, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-36,40-47,50-57,60-67,70-76"), + }, + { + "allocate 31 full cores with 15 CPUs distributed across each NUMA node and 1 CPU spilling over to each of NUMA 2, 3 (to keep balance)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-8,10-18,20-39,40-48,50-58,60-79"), + 62, + 1, + "", + mustParseCPUSet(t, "0-7,10-17,20-27,30-37,40-46,50-56,60-67,70-77"), + }, + { + "allocate 31 full cores with 14 CPUs distributed across each NUMA node and 2 CPUs spilling over to each of NUMA 0, 2, 3 (to keep balance with cpuGroupSize 2)", + topoDualSocketMultiNumaPerSocketHT, + mustParseCPUSet(t, "0-8,10-18,20-39,40-48,50-58,60-79"), + 62, + 2, + "", + mustParseCPUSet(t, "0-7,10-16,20-27,30-37,40-47,50-56,60-67,70-77"), + }, + }...) + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize) if tc.expErr != "" && err.Error() != tc.expErr { t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err) } diff --git a/pkg/kubelet/cm/cpumanager/policy_options.go b/pkg/kubelet/cm/cpumanager/policy_options.go index 84461cf1b94..64b14833e58 100644 --- a/pkg/kubelet/cm/cpumanager/policy_options.go +++ b/pkg/kubelet/cm/cpumanager/policy_options.go @@ -26,13 +26,15 @@ import ( ) const ( - // FullPCPUsOnlyOption is the name of the CPU Manager policy option - FullPCPUsOnlyOption string = "full-pcpus-only" + FullPCPUsOnlyOption string = "full-pcpus-only" + DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa" ) var ( - alphaOptions = sets.NewString() - betaOptions = sets.NewString( + alphaOptions = sets.NewString( + DistributeCPUsAcrossNUMAOption, + ) + betaOptions = sets.NewString( FullPCPUsOnlyOption, ) stableOptions = sets.NewString() @@ -64,6 +66,9 @@ type StaticPolicyOptions struct { // any possible naming scheme will lead to ambiguity to some extent. // We picked "pcpu" because it the established docs hints at vCPU already. FullPhysicalCPUsOnly bool + // Flag to evenly distribute CPUs across NUMA nodes in cases where more + // than one NUMA node is required to satisfy the allocation. + DistributeCPUsAcrossNUMA bool } func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOptions, error) { @@ -80,6 +85,12 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption return opts, fmt.Errorf("bad value for option %q: %w", name, err) } opts.FullPhysicalCPUsOnly = optValue + case DistributeCPUsAcrossNUMAOption: + optValue, err := strconv.ParseBool(value) + if err != nil { + return opts, fmt.Errorf("bad value for option %q: %w", name, err) + } + opts.DistributeCPUsAcrossNUMA = optValue default: // this should never be reached, we already detect unknown options, // but we keep it as further safety. diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index f5d275d8ea8..f7ff26cd313 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -118,6 +118,13 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv klog.InfoS("Static policy created with configuration", "options", opts) + policy := &staticPolicy{ + topology: topology, + affinity: affinity, + cpusToReuse: make(map[string]cpuset.CPUSet), + options: opts, + } + allCPUs := topology.CPUDetails.CPUs() var reserved cpuset.CPUSet if reservedCPUs.Size() > 0 { @@ -128,7 +135,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv // // For example: Given a system with 8 CPUs available and HT enabled, // if numReservedCPUs=2, then reserved={0,4} - reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs) + reserved, _ = policy.takeByTopology(allCPUs, numReservedCPUs) } if reserved.Size() != numReservedCPUs { @@ -137,14 +144,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv } klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved) + policy.reserved = reserved - return &staticPolicy{ - topology: topology, - reserved: reserved, - affinity: affinity, - cpusToReuse: make(map[string]cpuset.CPUSet), - options: opts, - }, nil + return policy, nil } func (p *staticPolicy) Name() string { @@ -318,7 +320,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit numAlignedToAlloc = numCPUs } - alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc) + alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) if err != nil { return cpuset.NewCPUSet(), err } @@ -327,7 +329,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size()) + remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } @@ -381,6 +383,17 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int { return requestedByAppContainers } +func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + if p.options.DistributeCPUsAcrossNUMA { + cpuGroupSize := 1 + if p.options.FullPhysicalCPUsOnly { + cpuGroupSize = p.topology.CPUsPerCore() + } + return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize) + } + return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs) +} + func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { // Get a count of how many guaranteed CPUs have been requested. requested := p.guaranteedCPUs(pod, container)