diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index 17686d75643..08622568ab3 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -196,6 +196,58 @@ func (s *socketsFirst) sortAvailableCores() []int { return result } +type availableCPUSorter interface { + sort() []int +} + +type sortCPUsPacked struct{ acc *cpuAccumulator } +type sortCPUsSpread struct{ acc *cpuAccumulator } + +var _ availableCPUSorter = (*sortCPUsPacked)(nil) +var _ availableCPUSorter = (*sortCPUsSpread)(nil) + +func (s sortCPUsPacked) sort() []int { + return s.acc.sortAvailableCPUsPacked() +} + +func (s sortCPUsSpread) sort() []int { + return s.acc.sortAvailableCPUsSpread() +} + +// CPUSortingStrategy describes the CPU sorting solution within the socket scope. +// Using topoDualSocketHT (12 CPUs, 2 sockets, 6 cores) as an example: +// +// CPUDetails: map[int]topology.CPUInfo{ +// 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0}, +// 1: {CoreID: 1, SocketID: 1, NUMANodeID: 1}, +// 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0}, +// 3: {CoreID: 3, SocketID: 1, NUMANodeID: 1}, +// 4: {CoreID: 4, SocketID: 0, NUMANodeID: 0}, +// 5: {CoreID: 5, SocketID: 1, NUMANodeID: 1}, +// 6: {CoreID: 0, SocketID: 0, NUMANodeID: 0}, +// 7: {CoreID: 1, SocketID: 1, NUMANodeID: 1}, +// 8: {CoreID: 2, SocketID: 0, NUMANodeID: 0}, +// 9: {CoreID: 3, SocketID: 1, NUMANodeID: 1}, +// 10: {CoreID: 4, SocketID: 0, NUMANodeID: 0}, +// 11: {CoreID: 5, SocketID: 1, NUMANodeID: 1}, +// }, +// +// - CPUSortingOptionPacked sorts CPUs in a packed manner, where CPUs are grouped by core +// before moving to the next core, resulting in packed cores, like: +// 0, 2, 4, 6, 8, 10, 1, 3, 5, 7, 9, 11 +// - CPUSortingOptionSpread sorts CPUs in a spread manner, where CPUs are spread across cores +// before moving to the next CPU, resulting in spread-out cores, like: +// 0, 6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11 +// +// By default, CPUSortingOptionPacked will be used, and CPUSortingOptionSpread will only be activated +// when the user specifies the `DistributeCPUsAcrossCoresOption` static policy option. +type CPUSortingStrategy string + +const ( + CPUSortingStrategyPacked CPUSortingStrategy = "packed" + CPUSortingStrategySpread CPUSortingStrategy = "spread" +) + type cpuAccumulator struct { // `topo` describes the layout of CPUs (i.e. hyper-threads if hyperthreading is on) between // cores (i.e. physical CPUs if hyper-threading is on), NUMA nodes, and sockets on the K8s @@ -223,9 +275,15 @@ type cpuAccumulator struct { result cpuset.CPUSet numaOrSocketsFirst numaOrSocketsFirstFuncs + + // availableCPUSorter is used to control the cpu sorting result. + // The sequence of returned CPU IDs depends on the policy. + // By default, cpus is sorted by sortAvailableCPUsPacked() + // If packed is false, cpu is sorted by sortAvailableCPUsSpread() + availableCPUSorter availableCPUSorter } -func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator { +func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) *cpuAccumulator { acc := &cpuAccumulator{ topo: topo, details: topo.CPUDetails.KeepOnly(availableCPUs), @@ -239,6 +297,12 @@ func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, acc.numaOrSocketsFirst = &socketsFirst{acc} } + if cpuSortingStrategy == CPUSortingStrategyPacked { + acc.availableCPUSorter = &sortCPUsPacked{acc} + } else { + acc.availableCPUSorter = &sortCPUsSpread{acc} + } + return acc } @@ -293,9 +357,9 @@ func (a *cpuAccumulator) freeCores() []int { return free } -// Returns free CPU IDs as a slice sorted by sortAvailableCPUs(). +// Returns free CPU IDs as a slice sorted by sortAvailableCPUsPacked(). func (a *cpuAccumulator) freeCPUs() []int { - return a.sortAvailableCPUs() + return a.availableCPUSorter.sort() } // Sorts the provided list of NUMA nodes/sockets/cores/cpus referenced in 'ids' @@ -404,7 +468,7 @@ func (a *cpuAccumulator) sortAvailableCores() []int { // same way as described in the previous paragraph, except that the priority of NUMA nodes and // sockets is inverted (e.g. first sort the CPUs by number of free CPUs in their NUMA nodes, then, // for each NUMA node, sort the CPUs by number of free CPUs in their sockets, etc...). -func (a *cpuAccumulator) sortAvailableCPUs() []int { +func (a *cpuAccumulator) sortAvailableCPUsPacked() []int { var result []int for _, core := range a.sortAvailableCores() { cpus := a.details.CPUsInCores(core).UnsortedList() @@ -414,6 +478,19 @@ func (a *cpuAccumulator) sortAvailableCPUs() []int { return result } +// Sort all available CPUs: +// - First by core using sortAvailableSockets(). +// - Then within each socket, sort cpus directly using the sort() algorithm defined above. +func (a *cpuAccumulator) sortAvailableCPUsSpread() []int { + var result []int + for _, socket := range a.sortAvailableSockets() { + cpus := a.details.CPUsInSockets(socket).UnsortedList() + sort.Ints(cpus) + result = append(result, cpus...) + } + return result +} + func (a *cpuAccumulator) take(cpus cpuset.CPUSet) { a.result = a.result.Union(cpus) a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result)) @@ -454,7 +531,7 @@ func (a *cpuAccumulator) takeFullCores() { } func (a *cpuAccumulator) takeRemainingCPUs() { - for _, cpu := range a.sortAvailableCPUs() { + for _, cpu := range a.availableCPUSorter.sort() { klog.V(4).InfoS("takeRemainingCPUs: claiming CPU", "cpu", cpu) a.take(cpuset.New(cpu)) if a.isSatisfied() { @@ -581,8 +658,8 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC // the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending // order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with // the least amount of free CPUs to the one with the highest amount of free CPUs. -func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { - acc := newCPUAccumulator(topo, availableCPUs, numCPUs) +func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) { + acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy) if acc.isSatisfied() { return acc.result, nil } @@ -606,9 +683,12 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C // 2. Acquire whole cores, if available and the container requires at least // a core's-worth of CPUs. - acc.takeFullCores() - if acc.isSatisfied() { - return acc.result, nil + // If `CPUSortingStrategySpread` is specified, skip taking the whole core. + if cpuSortingStrategy != CPUSortingStrategySpread { + acc.takeFullCores() + if acc.isSatisfied() { + return acc.result, nil + } } // 3. Acquire single threads, preferring to fill partially-allocated cores @@ -685,16 +765,16 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C // of size 'cpuGroupSize' according to the algorithm described above. This is // important, for example, to ensure that all CPUs (i.e. all hyperthreads) from // a single core are allocated together. -func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int) (cpuset.CPUSet, error) { +func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) { // If the number of CPUs requested cannot be handed out in chunks of // 'cpuGroupSize', then we just call out the packing algorithm since we // can't distribute CPUs in this chunk size. if (numCPUs % cpuGroupSize) != 0 { - return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs) + return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy) } // Otherwise build an accumulator to start allocating CPUs from. - acc := newCPUAccumulator(topo, availableCPUs, numCPUs) + acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy) if acc.isSatisfied() { return acc.result, nil } @@ -873,7 +953,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu // size 'cpuGroupSize' from 'bestCombo'. distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize for _, numa := range bestCombo { - cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution) + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy) acc.take(cpus) } @@ -888,7 +968,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize { continue } - cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize) + cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy) acc.take(cpus) remainder -= cpuGroupSize } @@ -912,5 +992,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu // If we never found a combination of NUMA nodes that we could properly // distribute CPUs across, fall back to the packing algorithm. - return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs) + return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy) } diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 63b026b1979..961f55e465b 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -114,7 +114,7 @@ func TestCPUAccumulatorFreeSockets(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked) result := acc.freeSockets() sort.Ints(result) if !reflect.DeepEqual(result, tc.expect) { @@ -214,7 +214,7 @@ func TestCPUAccumulatorFreeNUMANodes(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked) result := acc.freeNUMANodes() if !reflect.DeepEqual(result, tc.expect) { t.Errorf("expected %v to equal %v", result, tc.expect) @@ -263,7 +263,7 @@ func TestCPUAccumulatorFreeSocketsAndNUMANodes(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked) resultNUMANodes := acc.freeNUMANodes() if !reflect.DeepEqual(resultNUMANodes, tc.expectNUMANodes) { t.Errorf("expected NUMA Nodes %v to equal %v", resultNUMANodes, tc.expectNUMANodes) @@ -335,7 +335,7 @@ func TestCPUAccumulatorFreeCores(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked) result := acc.freeCores() if !reflect.DeepEqual(result, tc.expect) { t.Errorf("expected %v to equal %v", result, tc.expect) @@ -391,7 +391,7 @@ func TestCPUAccumulatorFreeCPUs(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked) result := acc.freeCPUs() if !reflect.DeepEqual(result, tc.expect) { t.Errorf("expected %v to equal %v", result, tc.expect) @@ -477,7 +477,7 @@ func TestCPUAccumulatorTake(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs) + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs, CPUSortingStrategyPacked) totalTaken := 0 for _, cpus := range tc.takeCPUs { acc.take(cpus) @@ -509,6 +509,7 @@ func TestCPUAccumulatorTake(t *testing.T) { type takeByTopologyTestCase struct { description string topo *topology.CPUTopology + opts StaticPolicyOptions availableCPUs cpuset.CPUSet numCPUs int expErr string @@ -520,6 +521,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take more cpus than are available from single socket with HT", topoSingleSocketHT, + StaticPolicyOptions{}, cpuset.New(0, 2, 4, 6), 5, "not enough cpus available to satisfy request: requested=5, available=4", @@ -528,6 +530,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take zero cpus from single socket with HT", topoSingleSocketHT, + StaticPolicyOptions{}, cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), 0, "", @@ -536,6 +539,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take one cpu from single socket with HT", topoSingleSocketHT, + StaticPolicyOptions{}, cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), 1, "", @@ -544,6 +548,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take one cpu from single socket with HT, some cpus are taken", topoSingleSocketHT, + StaticPolicyOptions{}, cpuset.New(1, 3, 5, 6, 7), 1, "", @@ -552,6 +557,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take two cpus from single socket with HT", topoSingleSocketHT, + StaticPolicyOptions{}, cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), 2, "", @@ -560,6 +566,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take all cpus from single socket with HT", topoSingleSocketHT, + StaticPolicyOptions{}, cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), 8, "", @@ -568,6 +575,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take two cpus from single socket with HT, only one core totally free", topoSingleSocketHT, + StaticPolicyOptions{}, cpuset.New(0, 1, 2, 3, 6), 2, "", @@ -576,6 +584,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take a socket of cpus from dual socket with HT", topoDualSocketHT, + StaticPolicyOptions{}, cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 6, "", @@ -584,6 +593,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take a socket of cpus from dual socket with multi-numa-per-socket with HT", topoDualSocketMultiNumaPerSocketHT, + StaticPolicyOptions{}, mustParseCPUSet(t, "0-79"), 40, "", @@ -592,6 +602,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take a NUMA node of cpus from dual socket with multi-numa-per-socket with HT", topoDualSocketMultiNumaPerSocketHT, + StaticPolicyOptions{}, mustParseCPUSet(t, "0-79"), 20, "", @@ -600,6 +611,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take a NUMA node of cpus from dual socket with multi-numa-per-socket with HT, with 1 NUMA node already taken", topoDualSocketMultiNumaPerSocketHT, + StaticPolicyOptions{}, mustParseCPUSet(t, "10-39,50-79"), 20, "", @@ -608,6 +620,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take a socket and a NUMA node of cpus from dual socket with multi-numa-per-socket with HT", topoDualSocketMultiNumaPerSocketHT, + StaticPolicyOptions{}, mustParseCPUSet(t, "0-79"), 60, "", @@ -616,6 +629,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase { { "take a socket and a NUMA node of cpus from dual socket with multi-numa-per-socket with HT, a core taken", topoDualSocketMultiNumaPerSocketHT, + StaticPolicyOptions{}, mustParseCPUSet(t, "1-39,41-79"), // reserve the first (phys) core (0,40) 60, "", @@ -630,6 +644,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { { "take one cpu from dual socket with HT - core from Socket 0", topoDualSocketHT, + StaticPolicyOptions{}, cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), 1, "", @@ -638,6 +653,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { { "allocate 4 full cores with 3 coming from the first NUMA node (filling it up) and 1 coming from the second NUMA node", topoDualSocketHT, + StaticPolicyOptions{}, mustParseCPUSet(t, "0-11"), 8, "", @@ -646,6 +662,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { { "allocate 32 full cores with 30 coming from the first 3 NUMA nodes (filling them up) and 2 coming from the fourth NUMA node", topoDualSocketMultiNumaPerSocketHT, + StaticPolicyOptions{}, mustParseCPUSet(t, "0-79"), 64, "", @@ -655,7 +672,12 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs) + strategy := CPUSortingStrategyPacked + if tc.opts.DistributeCPUsAcrossCores { + strategy = CPUSortingStrategySpread + } + + result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy) if tc.expErr != "" && err != nil && err.Error() != tc.expErr { t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err) } @@ -666,6 +688,106 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) { } } +func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + opts StaticPolicyOptions + availableCPUs cpuset.CPUSet + numCPUs int + expErr string + expResult cpuset.CPUSet + }{ + { + "take a socket of cpus from single socket with HT, 3 cpus", + topoSingleSocketHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), + 3, + "", + cpuset.New(0, 1, 2), + }, + { + "take a socket of cpus from dual socket with HT, 2 cpus", + topoDualSocketHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + 2, + "", + cpuset.New(0, 2), + }, + { + "take a socket of cpus from dual socket with HT, 3 cpus", + topoDualSocketHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + 3, + "", + cpuset.New(0, 2, 4), + }, + { + "take a socket of cpus from dual socket with HT, 6 cpus", + topoDualSocketHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + 6, + "", + cpuset.New(0, 2, 4, 6, 8, 10), + }, + { + "take cpus from dual socket with HT, 8 cpus", + topoDualSocketHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + 8, + "", + cpuset.New(0, 2, 4, 6, 8, 10, 1, 3), + }, + { + "take a socket of cpus from dual socket without HT, 2 cpus", + topoDualSocketNoHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + cpuset.New(0, 1, 2, 3, 4, 5, 6, 7), + 2, + "", + cpuset.New(0, 1), + }, + { + // DistributeCPUsAcrossCores doesn't care socket and numa ranking. such setting in test is transparent. + "take a socket of cpus from dual socket with multi numa per socket and HT, 12 cpus", + topoDualSocketMultiNumaPerSocketHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + mustParseCPUSet(t, "0-79"), + 8, + "", + mustParseCPUSet(t, "0-7"), + }, + { + "take a socket of cpus from quad socket four way with HT, 12 cpus", + topoQuadSocketFourWayHT, + StaticPolicyOptions{DistributeCPUsAcrossCores: true}, + mustParseCPUSet(t, "0-287"), + 12, + "", + mustParseCPUSet(t, "0-2,9-10,13-14,21-22,25-26,33"), + }, + } + + for _, tc := range testCases { + strategy := CPUSortingStrategyPacked + if tc.opts.DistributeCPUsAcrossCores { + strategy = CPUSortingStrategySpread + } + result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy) + if tc.expErr != "" && err.Error() != tc.expErr { + t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err) + } + if !result.Equals(tc.expResult) { + t.Errorf("testCase %q failed, expected result [%s] to equal [%s]", tc.description, result, tc.expResult) + } + } +} + type takeByTopologyExtendedTestCase struct { description string topo *topology.CPUTopology @@ -858,7 +980,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize) + result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize, CPUSortingStrategyPacked) if err != nil { if tc.expErr == "" { t.Errorf("unexpected error [%v]", err) diff --git a/pkg/kubelet/cm/cpumanager/policy_options.go b/pkg/kubelet/cm/cpumanager/policy_options.go index 368fc63624f..ac6d15ed0d3 100644 --- a/pkg/kubelet/cm/cpumanager/policy_options.go +++ b/pkg/kubelet/cm/cpumanager/policy_options.go @@ -29,15 +29,17 @@ import ( // Names of the options, as part of the user interface. const ( - FullPCPUsOnlyOption string = "full-pcpus-only" - DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa" - AlignBySocketOption string = "align-by-socket" + FullPCPUsOnlyOption string = "full-pcpus-only" + DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa" + AlignBySocketOption string = "align-by-socket" + DistributeCPUsAcrossCoresOption string = "distribute-cpus-across-cores" ) var ( alphaOptions = sets.New[string]( DistributeCPUsAcrossNUMAOption, AlignBySocketOption, + DistributeCPUsAcrossCoresOption, ) betaOptions = sets.New[string]( FullPCPUsOnlyOption, @@ -80,6 +82,10 @@ type StaticPolicyOptions struct { // Flag to ensure CPUs are considered aligned at socket boundary rather than // NUMA boundary AlignBySocket bool + // flag to enable extra allocation restrictions to spread + // cpus (HT) on different physical core. + // This is a preferred policy so do not throw error if they have to packed in one physical core. + DistributeCPUsAcrossCores bool } // NewStaticPolicyOptions creates a StaticPolicyOptions struct from the user configuration. @@ -109,12 +115,29 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption return opts, fmt.Errorf("bad value for option %q: %w", name, err) } opts.AlignBySocket = optValue + case DistributeCPUsAcrossCoresOption: + optValue, err := strconv.ParseBool(value) + if err != nil { + return opts, fmt.Errorf("bad value for option %q: %w", name, err) + } + opts.DistributeCPUsAcrossCores = optValue + default: // this should never be reached, we already detect unknown options, // but we keep it as further safety. return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value) } } + + if opts.FullPhysicalCPUsOnly && opts.DistributeCPUsAcrossCores { + return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", FullPCPUsOnlyOption, DistributeCPUsAcrossCoresOption) + } + + // TODO(@Jeffwan): Remove this check after more compatibility tests are done. + if opts.DistributeCPUsAcrossNUMA && opts.DistributeCPUsAcrossCores { + return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", DistributeCPUsAcrossNUMAOption, DistributeCPUsAcrossCoresOption) + } + return opts, nil } diff --git a/pkg/kubelet/cm/cpumanager/policy_options_test.go b/pkg/kubelet/cm/cpumanager/policy_options_test.go index 98729d62c25..2b14b90429f 100644 --- a/pkg/kubelet/cm/cpumanager/policy_options_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_options_test.go @@ -94,6 +94,30 @@ func TestPolicyOptionsAvailable(t *testing.T) { featureGateEnable: true, expectedAvailable: false, }, + { + option: DistributeCPUsAcrossNUMAOption, + featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions, + featureGateEnable: true, + expectedAvailable: true, + }, + { + option: DistributeCPUsAcrossNUMAOption, + featureGate: pkgfeatures.CPUManagerPolicyBetaOptions, + featureGateEnable: true, + expectedAvailable: false, + }, + { + option: DistributeCPUsAcrossCoresOption, + featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions, + featureGateEnable: true, + expectedAvailable: true, + }, + { + option: DistributeCPUsAcrossCoresOption, + featureGate: pkgfeatures.CPUManagerPolicyBetaOptions, + featureGateEnable: true, + expectedAvailable: false, + }, } for _, testCase := range testCases { t.Run(testCase.option, func(t *testing.T) { @@ -163,7 +187,6 @@ func TestValidateStaticPolicyOptions(t *testing.T) { topoMgrPolicy := topologymanager.NewNonePolicy() if testCase.topoMgrPolicy == topologymanager.PolicySingleNumaNode { topoMgrPolicy = topologymanager.NewSingleNumaNodePolicy(&topologymanager.NUMAInfo{}, topologymanager.PolicyOptions{}) - } topoMgrStore := topologymanager.NewFakeManagerWithPolicy(topoMgrPolicy) @@ -177,3 +200,49 @@ func TestValidateStaticPolicyOptions(t *testing.T) { }) } } + +func TestPolicyOptionsCompatibility(t *testing.T) { + // take feature gate into the consideration + testCases := []struct { + description string + featureGate featuregate.Feature + policyOptions map[string]string + expectedErr bool + }{ + { + description: "FullPhysicalCPUsOnly set to true only", + featureGate: pkgfeatures.CPUManagerPolicyBetaOptions, + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "true", + }, + expectedErr: false, + }, + { + description: "DistributeCPUsAcrossCores set to true only", + featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions, + policyOptions: map[string]string{ + DistributeCPUsAcrossCoresOption: "true", + }, + expectedErr: false, + }, + { + description: "FullPhysicalCPUsOnly and DistributeCPUsAcrossCores options can not coexist", + featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions, + policyOptions: map[string]string{ + FullPCPUsOnlyOption: "true", + DistributeCPUsAcrossCoresOption: "true", + }, + expectedErr: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, testCase.featureGate, true) + _, err := NewStaticPolicyOptions(testCase.policyOptions) + gotError := err != nil + if gotError != testCase.expectedErr { + t.Errorf("testCase %q failed, got %v expected %v", testCase.description, gotError, testCase.expectedErr) + } + }) + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index d22a6a64d5e..049e35f9883 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -483,14 +483,19 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int { } func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + cpuSortingStrategy := CPUSortingStrategyPacked + if p.options.DistributeCPUsAcrossCores { + cpuSortingStrategy = CPUSortingStrategySpread + } + if p.options.DistributeCPUsAcrossNUMA { cpuGroupSize := 1 if p.options.FullPhysicalCPUsOnly { cpuGroupSize = p.topology.CPUsPerCore() } - return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize) + return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize, cpuSortingStrategy) } - return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs) + return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy) } func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {