diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index 969cd72c6e6..1a844495450 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -18,6 +18,7 @@ package cpumanager import ( "fmt" + "math" "sort" "k8s.io/klog/v2" @@ -26,6 +27,49 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) +type mapIntInt map[int]int + +func (m mapIntInt) Clone() mapIntInt { + cp := make(mapIntInt, len(m)) + for k, v := range m { + cp[k] = v + } + return cp +} + +func (m mapIntInt) Keys() []int { + keys := make([]int, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +func (m mapIntInt) Values() []int { + values := make([]int, len(m)) + for _, v := range m { + values = append(values, v) + } + return values +} + +func mean(xs []int) float64 { + var sum float64 + for _, x := range xs { + sum += float64(x) + } + return sum / float64(len(xs)) +} + +func standardDeviation(xs []int) float64 { + m := mean(xs) + var sum float64 + for _, x := range xs { + sum += (float64(x) - m) * (float64(x) - m) + } + return math.Sqrt(sum / float64(len(xs))) +} + type numaOrSocketsFirstFuncs interface { takeFullFirstLevel() takeFullSecondLevel() @@ -318,6 +362,28 @@ func (a *cpuAccumulator) isFailed() bool { return a.numCPUsNeeded > a.details.CPUs().Size() } +// iterateCombinations walks through all n-choose-k subsets of size k in n and +// calls function 'f()' on each subset. For example, if n={0,1,2}, and k=2, +// then f() will be called on the subsets {0,1}, {0,2}. and {1,2}. +func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int)) { + if k < 1 { + return + } + + var helper func(n []int, k int, start int, accum []int, f func([]int)) + helper = func(n []int, k int, start int, accum []int, f func([]int)) { + if k == 0 { + f(accum) + return + } + for i := start; i <= len(n)-k; i++ { + helper(n, k-1, i+1, append(accum, n[i]), f) + } + } + + helper(n, k, 0, []int{}, f) +} + func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { acc := newCPUAccumulator(topo, availableCPUs, numCPUs) if acc.isSatisfied() { @@ -359,6 +425,207 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") } +// takeByTopologyNUMADistributed returns a CPUSet of size 'numCPUs'. +// +// It generates this CPUset by allocating CPUs from 'availableCPUs' according +// to the algorithm outlined in KEP-2902: +// +// https://github.com/kubernetes/enhancements/tree/e7f51ffbe2ee398ffd1fba4a6d854f276bfad9fb/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option +// +// This algorithm evenly distribute CPUs across NUMA nodes in cases where more +// than one NUMA node is required to satisfy the allocation. This is in +// contrast to the takeByTopologyNUMAPacked algorithm, which attempts to 'pack' +// CPUs onto NUMA nodes and fill them up before moving on to the next one. +// +// At a high-level this algorithm can be summarized as: +// +// For each NUMA single node: +// * If all requested CPUs can be allocated from this NUMA node; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in that NUMA node and return +// +// Otherwise, for each pair of NUMA nodes: +// * If the set of requested CPUs (modulo 2) can be evenly split across +// the 2 NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in both NUMA nodes and return +// +// Otherwise, for each 3-tuple of NUMA nodes: +// * If the set of requested CPUs (modulo 3) can be evenly distributed +// across the 3 NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in all three NUMA nodes and return +// +// ... +// +// Otherwise, for the set of all NUMA nodes: +// * If the set of requested CPUs (modulo NUM_NUMA_NODES) can be evenly +// distributed across all NUMA nodes; AND +// * Any remaining CPUs (after the modulo operation) can be striped across +// some subset of the NUMA nodes; +// --> Do the allocation by running takeByTopologyNUMAPacked() over the +// available CPUs in all NUMA nodes and return +// +// If none of the above conditions can be met, then resort back to a +// best-effort fit of packing CPUs into NUMA nodes by calling +// takeByTopologyNUMAPacked() over all available CPUs. +// +// NOTE: A "balance score" will be calculated to help find the best subset of +// NUMA nodes to allocate any 'remainder' CPUs from (in cases where the total +// number of CPUs to allocate cannot be evenly distributed across the chosen +// set of NUMA nodes). This "balance score" is calculated as the standard +// deviation of how many CPUs will be available on each NUMA node after all +// evenly distributed and remainder CPUs are allocated. The subset with the +// lowest "balance score" will receive the CPUs in order to keep the overall +// allocation of CPUs as "balanced" as possible. func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { - return cpuset.NewCPUSet(), fmt.Errorf("unimplemented") + acc := newCPUAccumulator(topo, availableCPUs, numCPUs) + if acc.isSatisfied() { + return acc.result, nil + } + if acc.isFailed() { + return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request") + } + + // Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'. + numas := acc.sortAvailableNUMANodes() + + // Try combinations of 1,2,3,... NUMA nodes until we find a combination + // where we can evenly distribute CPUs across them. + for i := range numas { + // Iterate through the various n-choose-k NUMA node combinations (where + // k=i+1 for this iteration of the loop), looking for the combination + // of NUMA nodes that can best have CPUs distributed across them. + var bestBalance float64 = math.MaxFloat64 + var bestRemainder []int = nil + var bestCombo []int = nil + acc.iterateCombinations(numas, i+1, func(combo []int) { + // If we've already found a combo with a balance of 0 in a + // different iteration, then don't bother checking any others. + // TODO: Add a way to just short circuit iterateCombinations() so + // we don't keep looping once such a combo is found. + if bestBalance == 0 { + return + } + + // Check that this combination of NUMA nodes has enough CPUs to + // satisfy the allocation overall. + cpus := acc.details.CPUsInNUMANodes(combo...) + if cpus.Size() < numCPUs { + return + } + + // Check that each NUMA node in this combination can provide + // (at least) numCPUs/len(combo) of the total cpus required. + distribution := numCPUs / len(combo) + for _, numa := range combo { + cpus := acc.details.CPUsInNUMANodes(numa) + if cpus.Size() < distribution { + return + } + } + + // Calculate how many CPUs will be available on each NUMA node + // in 'combo' ater allocating an even distribution of CPUs from + // them. This will be used to calculate a "balance" score for the + // combo to help decide which combo should ultimately be chosen. + availableAfterAllocation := make(mapIntInt, len(combo)) + for _, numa := range combo { + availableAfterAllocation[numa] = acc.details.CPUsInNUMANodes(numa).Size() - distribution + } + + // Check if there are any remaining CPUs to distribute across the + // NUMA nodes once CPUs have been evenly distributed. + remainder := numCPUs - (distribution * len(combo)) + + // Declare a set of local variables to help track the "balance + // scores" calculated when using different subsets of 'combo' to + // allocate remainder CPUs from. + var bestLocalBalance float64 = math.MaxFloat64 + var bestLocalRemainder []int = nil + + // If there aren't any remainder CPUs to allocate, then calculate + // the "balance score" of this combo as the standard deviation of + // the values contained in 'availableAfterAllocation'. + if remainder == 0 { + bestLocalBalance = standardDeviation(availableAfterAllocation.Values()) + bestLocalRemainder = nil + } + + // Otherwise, find the best "balance score" when allocating the + // remainder CPUs across different subsets of NUMA nodes in 'combo'. + acc.iterateCombinations(combo, remainder, func(subset []int) { + // Make a local copy of 'availableAfterAllocation'. + availableAfterAllocation := availableAfterAllocation.Clone() + + // For all NUMA nodes in 'subset', remove 1 more CPU (to account + // for any remainder CPUs that will be allocated on them. + for _, numa := range subset { + availableAfterAllocation[numa] -= 1 + } + + // Calculate the "balance score" as the standard deviation of + // the number of CPUs available on all NUMA nodes in 'combo' + // assuming the remainder CPUs are spread across 'subset'. + balance := standardDeviation(availableAfterAllocation.Values()) + if balance < bestLocalBalance { + bestLocalBalance = balance + bestLocalRemainder = subset + } + }) + + // If the best "balance score" for this combo is less than the + // lowest "balance score" of all previous combos, then update this + // combo (and remainder set) to be the best one found so far. + if bestLocalBalance < bestBalance { + bestBalance = bestLocalBalance + bestRemainder = bestLocalRemainder + bestCombo = combo + } + }) + + // If we made it through all of the iterations above without finding a + // combination of NUMA nodes that can properly balance CPU allocations, + // then move on to the next larger set of NUMA node combinations. + if bestCombo == nil { + continue + } + + // Otherwise, start allocating CPUs from the NUMA node combination + // chosen. First allocate numCPUs / len(bestCombo) CPUs from each node. + distribution := numCPUs / len(bestCombo) + for _, numa := range bestCombo { + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution) + acc.take(cpus) + } + + // Then allocate any remaining CPUs from each NUMA node in the remainder set. + for _, numa := range bestRemainder { + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), 1) + acc.take(cpus) + } + + // If we haven't allocated all of our CPUs at this point, then something + // went wrong in our accounting and we should error out. + if acc.numCPUsNeeded > 0 { + return cpuset.NewCPUSet(), fmt.Errorf("accounting error, not enough CPUs allocated, remaining: %v", acc.numCPUsNeeded) + } + + // Likewise, if we have allocated too many CPUs at this point, then something + // went wrong in our accounting and we should error out. + if acc.numCPUsNeeded < 0 { + return cpuset.NewCPUSet(), fmt.Errorf("accounting error, too many CPUs allocated, remaining: %v", acc.numCPUsNeeded) + } + + // Otherwise, return the result + return acc.result, nil + } + + // If we never found a combination of NUMA nodes that we could properly + // distribute CPUs across, fall back to the packing algorithm. + return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs) }