KEP-4176: Add static policy option to distribute cpus across cores

This commit is contained in:
Jiaxin Shan 2024-06-25 01:12:51 -07:00
parent 535e833aef
commit 6c85fd4ddd
5 changed files with 329 additions and 30 deletions

View File

@ -196,6 +196,58 @@ func (s *socketsFirst) sortAvailableCores() []int {
return result
}
type availableCPUSorter interface {
sort() []int
}
type sortCPUsPacked struct{ acc *cpuAccumulator }
type sortCPUsSpread struct{ acc *cpuAccumulator }
var _ availableCPUSorter = (*sortCPUsPacked)(nil)
var _ availableCPUSorter = (*sortCPUsSpread)(nil)
func (s sortCPUsPacked) sort() []int {
return s.acc.sortAvailableCPUsPacked()
}
func (s sortCPUsSpread) sort() []int {
return s.acc.sortAvailableCPUsSpread()
}
// CPUSortingStrategy describes the CPU sorting solution within the socket scope.
// Using topoDualSocketHT (12 CPUs, 2 sockets, 6 cores) as an example:
//
// CPUDetails: map[int]topology.CPUInfo{
// 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0},
// 1: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
// 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0},
// 3: {CoreID: 3, SocketID: 1, NUMANodeID: 1},
// 4: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
// 5: {CoreID: 5, SocketID: 1, NUMANodeID: 1},
// 6: {CoreID: 0, SocketID: 0, NUMANodeID: 0},
// 7: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
// 8: {CoreID: 2, SocketID: 0, NUMANodeID: 0},
// 9: {CoreID: 3, SocketID: 1, NUMANodeID: 1},
// 10: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
// 11: {CoreID: 5, SocketID: 1, NUMANodeID: 1},
// },
//
// - CPUSortingOptionPacked sorts CPUs in a packed manner, where CPUs are grouped by core
// before moving to the next core, resulting in packed cores, like:
// 0, 2, 4, 6, 8, 10, 1, 3, 5, 7, 9, 11
// - CPUSortingOptionSpread sorts CPUs in a spread manner, where CPUs are spread across cores
// before moving to the next CPU, resulting in spread-out cores, like:
// 0, 6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11
//
// By default, CPUSortingOptionPacked will be used, and CPUSortingOptionSpread will only be activated
// when the user specifies the `DistributeCPUsAcrossCoresOption` static policy option.
type CPUSortingStrategy string
const (
CPUSortingStrategyPacked CPUSortingStrategy = "packed"
CPUSortingStrategySpread CPUSortingStrategy = "spread"
)
type cpuAccumulator struct {
// `topo` describes the layout of CPUs (i.e. hyper-threads if hyperthreading is on) between
// cores (i.e. physical CPUs if hyper-threading is on), NUMA nodes, and sockets on the K8s
@ -223,9 +275,15 @@ type cpuAccumulator struct {
result cpuset.CPUSet
numaOrSocketsFirst numaOrSocketsFirstFuncs
// availableCPUSorter is used to control the cpu sorting result.
// The sequence of returned CPU IDs depends on the policy.
// By default, cpus is sorted by sortAvailableCPUsPacked()
// If packed is false, cpu is sorted by sortAvailableCPUsSpread()
availableCPUSorter availableCPUSorter
}
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator {
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) *cpuAccumulator {
acc := &cpuAccumulator{
topo: topo,
details: topo.CPUDetails.KeepOnly(availableCPUs),
@ -239,6 +297,12 @@ func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet,
acc.numaOrSocketsFirst = &socketsFirst{acc}
}
if cpuSortingStrategy == CPUSortingStrategyPacked {
acc.availableCPUSorter = &sortCPUsPacked{acc}
} else {
acc.availableCPUSorter = &sortCPUsSpread{acc}
}
return acc
}
@ -293,9 +357,9 @@ func (a *cpuAccumulator) freeCores() []int {
return free
}
// Returns free CPU IDs as a slice sorted by sortAvailableCPUs().
// Returns free CPU IDs as a slice sorted by sortAvailableCPUsPacked().
func (a *cpuAccumulator) freeCPUs() []int {
return a.sortAvailableCPUs()
return a.availableCPUSorter.sort()
}
// Sorts the provided list of NUMA nodes/sockets/cores/cpus referenced in 'ids'
@ -404,7 +468,7 @@ func (a *cpuAccumulator) sortAvailableCores() []int {
// same way as described in the previous paragraph, except that the priority of NUMA nodes and
// sockets is inverted (e.g. first sort the CPUs by number of free CPUs in their NUMA nodes, then,
// for each NUMA node, sort the CPUs by number of free CPUs in their sockets, etc...).
func (a *cpuAccumulator) sortAvailableCPUs() []int {
func (a *cpuAccumulator) sortAvailableCPUsPacked() []int {
var result []int
for _, core := range a.sortAvailableCores() {
cpus := a.details.CPUsInCores(core).UnsortedList()
@ -414,6 +478,19 @@ func (a *cpuAccumulator) sortAvailableCPUs() []int {
return result
}
// Sort all available CPUs:
// - First by core using sortAvailableSockets().
// - Then within each socket, sort cpus directly using the sort() algorithm defined above.
func (a *cpuAccumulator) sortAvailableCPUsSpread() []int {
var result []int
for _, socket := range a.sortAvailableSockets() {
cpus := a.details.CPUsInSockets(socket).UnsortedList()
sort.Ints(cpus)
result = append(result, cpus...)
}
return result
}
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
a.result = a.result.Union(cpus)
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
@ -454,7 +531,7 @@ func (a *cpuAccumulator) takeFullCores() {
}
func (a *cpuAccumulator) takeRemainingCPUs() {
for _, cpu := range a.sortAvailableCPUs() {
for _, cpu := range a.availableCPUSorter.sort() {
klog.V(4).InfoS("takeRemainingCPUs: claiming CPU", "cpu", cpu)
a.take(cpuset.New(cpu))
if a.isSatisfied() {
@ -581,8 +658,8 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
// the least amount of free CPUs to the one with the highest amount of free CPUs.
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
if acc.isSatisfied() {
return acc.result, nil
}
@ -606,9 +683,12 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
// 2. Acquire whole cores, if available and the container requires at least
// a core's-worth of CPUs.
acc.takeFullCores()
if acc.isSatisfied() {
return acc.result, nil
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
if cpuSortingStrategy != CPUSortingStrategySpread {
acc.takeFullCores()
if acc.isSatisfied() {
return acc.result, nil
}
}
// 3. Acquire single threads, preferring to fill partially-allocated cores
@ -685,16 +765,16 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
// of size 'cpuGroupSize' according to the algorithm described above. This is
// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from
// a single core are allocated together.
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int) (cpuset.CPUSet, error) {
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
// If the number of CPUs requested cannot be handed out in chunks of
// 'cpuGroupSize', then we just call out the packing algorithm since we
// can't distribute CPUs in this chunk size.
if (numCPUs % cpuGroupSize) != 0 {
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
}
// Otherwise build an accumulator to start allocating CPUs from.
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
if acc.isSatisfied() {
return acc.result, nil
}
@ -873,7 +953,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// size 'cpuGroupSize' from 'bestCombo'.
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
for _, numa := range bestCombo {
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy)
acc.take(cpus)
}
@ -888,7 +968,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
continue
}
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy)
acc.take(cpus)
remainder -= cpuGroupSize
}
@ -912,5 +992,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// If we never found a combination of NUMA nodes that we could properly
// distribute CPUs across, fall back to the packing algorithm.
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
}

View File

@ -114,7 +114,7 @@ func TestCPUAccumulatorFreeSockets(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
result := acc.freeSockets()
sort.Ints(result)
if !reflect.DeepEqual(result, tc.expect) {
@ -214,7 +214,7 @@ func TestCPUAccumulatorFreeNUMANodes(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
result := acc.freeNUMANodes()
if !reflect.DeepEqual(result, tc.expect) {
t.Errorf("expected %v to equal %v", result, tc.expect)
@ -263,7 +263,7 @@ func TestCPUAccumulatorFreeSocketsAndNUMANodes(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
resultNUMANodes := acc.freeNUMANodes()
if !reflect.DeepEqual(resultNUMANodes, tc.expectNUMANodes) {
t.Errorf("expected NUMA Nodes %v to equal %v", resultNUMANodes, tc.expectNUMANodes)
@ -335,7 +335,7 @@ func TestCPUAccumulatorFreeCores(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
result := acc.freeCores()
if !reflect.DeepEqual(result, tc.expect) {
t.Errorf("expected %v to equal %v", result, tc.expect)
@ -391,7 +391,7 @@ func TestCPUAccumulatorFreeCPUs(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
result := acc.freeCPUs()
if !reflect.DeepEqual(result, tc.expect) {
t.Errorf("expected %v to equal %v", result, tc.expect)
@ -477,7 +477,7 @@ func TestCPUAccumulatorTake(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs)
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs, CPUSortingStrategyPacked)
totalTaken := 0
for _, cpus := range tc.takeCPUs {
acc.take(cpus)
@ -509,6 +509,7 @@ func TestCPUAccumulatorTake(t *testing.T) {
type takeByTopologyTestCase struct {
description string
topo *topology.CPUTopology
opts StaticPolicyOptions
availableCPUs cpuset.CPUSet
numCPUs int
expErr string
@ -520,6 +521,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take more cpus than are available from single socket with HT",
topoSingleSocketHT,
StaticPolicyOptions{},
cpuset.New(0, 2, 4, 6),
5,
"not enough cpus available to satisfy request: requested=5, available=4",
@ -528,6 +530,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take zero cpus from single socket with HT",
topoSingleSocketHT,
StaticPolicyOptions{},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
0,
"",
@ -536,6 +539,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take one cpu from single socket with HT",
topoSingleSocketHT,
StaticPolicyOptions{},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
1,
"",
@ -544,6 +548,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take one cpu from single socket with HT, some cpus are taken",
topoSingleSocketHT,
StaticPolicyOptions{},
cpuset.New(1, 3, 5, 6, 7),
1,
"",
@ -552,6 +557,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take two cpus from single socket with HT",
topoSingleSocketHT,
StaticPolicyOptions{},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
2,
"",
@ -560,6 +566,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take all cpus from single socket with HT",
topoSingleSocketHT,
StaticPolicyOptions{},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
8,
"",
@ -568,6 +575,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take two cpus from single socket with HT, only one core totally free",
topoSingleSocketHT,
StaticPolicyOptions{},
cpuset.New(0, 1, 2, 3, 6),
2,
"",
@ -576,6 +584,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take a socket of cpus from dual socket with HT",
topoDualSocketHT,
StaticPolicyOptions{},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
6,
"",
@ -584,6 +593,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take a socket of cpus from dual socket with multi-numa-per-socket with HT",
topoDualSocketMultiNumaPerSocketHT,
StaticPolicyOptions{},
mustParseCPUSet(t, "0-79"),
40,
"",
@ -592,6 +602,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take a NUMA node of cpus from dual socket with multi-numa-per-socket with HT",
topoDualSocketMultiNumaPerSocketHT,
StaticPolicyOptions{},
mustParseCPUSet(t, "0-79"),
20,
"",
@ -600,6 +611,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take a NUMA node of cpus from dual socket with multi-numa-per-socket with HT, with 1 NUMA node already taken",
topoDualSocketMultiNumaPerSocketHT,
StaticPolicyOptions{},
mustParseCPUSet(t, "10-39,50-79"),
20,
"",
@ -608,6 +620,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take a socket and a NUMA node of cpus from dual socket with multi-numa-per-socket with HT",
topoDualSocketMultiNumaPerSocketHT,
StaticPolicyOptions{},
mustParseCPUSet(t, "0-79"),
60,
"",
@ -616,6 +629,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
{
"take a socket and a NUMA node of cpus from dual socket with multi-numa-per-socket with HT, a core taken",
topoDualSocketMultiNumaPerSocketHT,
StaticPolicyOptions{},
mustParseCPUSet(t, "1-39,41-79"), // reserve the first (phys) core (0,40)
60,
"",
@ -630,6 +644,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
{
"take one cpu from dual socket with HT - core from Socket 0",
topoDualSocketHT,
StaticPolicyOptions{},
cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1,
"",
@ -638,6 +653,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
{
"allocate 4 full cores with 3 coming from the first NUMA node (filling it up) and 1 coming from the second NUMA node",
topoDualSocketHT,
StaticPolicyOptions{},
mustParseCPUSet(t, "0-11"),
8,
"",
@ -646,6 +662,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
{
"allocate 32 full cores with 30 coming from the first 3 NUMA nodes (filling them up) and 2 coming from the fourth NUMA node",
topoDualSocketMultiNumaPerSocketHT,
StaticPolicyOptions{},
mustParseCPUSet(t, "0-79"),
64,
"",
@ -655,7 +672,12 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs)
strategy := CPUSortingStrategyPacked
if tc.opts.DistributeCPUsAcrossCores {
strategy = CPUSortingStrategySpread
}
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
}
@ -666,6 +688,106 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
}
}
func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) {
testCases := []struct {
description string
topo *topology.CPUTopology
opts StaticPolicyOptions
availableCPUs cpuset.CPUSet
numCPUs int
expErr string
expResult cpuset.CPUSet
}{
{
"take a socket of cpus from single socket with HT, 3 cpus",
topoSingleSocketHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
3,
"",
cpuset.New(0, 1, 2),
},
{
"take a socket of cpus from dual socket with HT, 2 cpus",
topoDualSocketHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
2,
"",
cpuset.New(0, 2),
},
{
"take a socket of cpus from dual socket with HT, 3 cpus",
topoDualSocketHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
3,
"",
cpuset.New(0, 2, 4),
},
{
"take a socket of cpus from dual socket with HT, 6 cpus",
topoDualSocketHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
6,
"",
cpuset.New(0, 2, 4, 6, 8, 10),
},
{
"take cpus from dual socket with HT, 8 cpus",
topoDualSocketHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
8,
"",
cpuset.New(0, 2, 4, 6, 8, 10, 1, 3),
},
{
"take a socket of cpus from dual socket without HT, 2 cpus",
topoDualSocketNoHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
2,
"",
cpuset.New(0, 1),
},
{
// DistributeCPUsAcrossCores doesn't care socket and numa ranking. such setting in test is transparent.
"take a socket of cpus from dual socket with multi numa per socket and HT, 12 cpus",
topoDualSocketMultiNumaPerSocketHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
mustParseCPUSet(t, "0-79"),
8,
"",
mustParseCPUSet(t, "0-7"),
},
{
"take a socket of cpus from quad socket four way with HT, 12 cpus",
topoQuadSocketFourWayHT,
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
mustParseCPUSet(t, "0-287"),
12,
"",
mustParseCPUSet(t, "0-2,9-10,13-14,21-22,25-26,33"),
},
}
for _, tc := range testCases {
strategy := CPUSortingStrategyPacked
if tc.opts.DistributeCPUsAcrossCores {
strategy = CPUSortingStrategySpread
}
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
if tc.expErr != "" && err.Error() != tc.expErr {
t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err)
}
if !result.Equals(tc.expResult) {
t.Errorf("testCase %q failed, expected result [%s] to equal [%s]", tc.description, result, tc.expResult)
}
}
}
type takeByTopologyExtendedTestCase struct {
description string
topo *topology.CPUTopology
@ -858,7 +980,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize)
result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize, CPUSortingStrategyPacked)
if err != nil {
if tc.expErr == "" {
t.Errorf("unexpected error [%v]", err)

View File

@ -29,15 +29,17 @@ import (
// Names of the options, as part of the user interface.
const (
FullPCPUsOnlyOption string = "full-pcpus-only"
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
AlignBySocketOption string = "align-by-socket"
FullPCPUsOnlyOption string = "full-pcpus-only"
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
AlignBySocketOption string = "align-by-socket"
DistributeCPUsAcrossCoresOption string = "distribute-cpus-across-cores"
)
var (
alphaOptions = sets.New[string](
DistributeCPUsAcrossNUMAOption,
AlignBySocketOption,
DistributeCPUsAcrossCoresOption,
)
betaOptions = sets.New[string](
FullPCPUsOnlyOption,
@ -80,6 +82,10 @@ type StaticPolicyOptions struct {
// Flag to ensure CPUs are considered aligned at socket boundary rather than
// NUMA boundary
AlignBySocket bool
// flag to enable extra allocation restrictions to spread
// cpus (HT) on different physical core.
// This is a preferred policy so do not throw error if they have to packed in one physical core.
DistributeCPUsAcrossCores bool
}
// NewStaticPolicyOptions creates a StaticPolicyOptions struct from the user configuration.
@ -109,12 +115,29 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
}
opts.AlignBySocket = optValue
case DistributeCPUsAcrossCoresOption:
optValue, err := strconv.ParseBool(value)
if err != nil {
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
}
opts.DistributeCPUsAcrossCores = optValue
default:
// this should never be reached, we already detect unknown options,
// but we keep it as further safety.
return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value)
}
}
if opts.FullPhysicalCPUsOnly && opts.DistributeCPUsAcrossCores {
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", FullPCPUsOnlyOption, DistributeCPUsAcrossCoresOption)
}
// TODO(@Jeffwan): Remove this check after more compatibility tests are done.
if opts.DistributeCPUsAcrossNUMA && opts.DistributeCPUsAcrossCores {
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", DistributeCPUsAcrossNUMAOption, DistributeCPUsAcrossCoresOption)
}
return opts, nil
}

View File

@ -94,6 +94,30 @@ func TestPolicyOptionsAvailable(t *testing.T) {
featureGateEnable: true,
expectedAvailable: false,
},
{
option: DistributeCPUsAcrossNUMAOption,
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
featureGateEnable: true,
expectedAvailable: true,
},
{
option: DistributeCPUsAcrossNUMAOption,
featureGate: pkgfeatures.CPUManagerPolicyBetaOptions,
featureGateEnable: true,
expectedAvailable: false,
},
{
option: DistributeCPUsAcrossCoresOption,
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
featureGateEnable: true,
expectedAvailable: true,
},
{
option: DistributeCPUsAcrossCoresOption,
featureGate: pkgfeatures.CPUManagerPolicyBetaOptions,
featureGateEnable: true,
expectedAvailable: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.option, func(t *testing.T) {
@ -163,7 +187,6 @@ func TestValidateStaticPolicyOptions(t *testing.T) {
topoMgrPolicy := topologymanager.NewNonePolicy()
if testCase.topoMgrPolicy == topologymanager.PolicySingleNumaNode {
topoMgrPolicy = topologymanager.NewSingleNumaNodePolicy(&topologymanager.NUMAInfo{}, topologymanager.PolicyOptions{})
}
topoMgrStore := topologymanager.NewFakeManagerWithPolicy(topoMgrPolicy)
@ -177,3 +200,49 @@ func TestValidateStaticPolicyOptions(t *testing.T) {
})
}
}
func TestPolicyOptionsCompatibility(t *testing.T) {
// take feature gate into the consideration
testCases := []struct {
description string
featureGate featuregate.Feature
policyOptions map[string]string
expectedErr bool
}{
{
description: "FullPhysicalCPUsOnly set to true only",
featureGate: pkgfeatures.CPUManagerPolicyBetaOptions,
policyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
},
expectedErr: false,
},
{
description: "DistributeCPUsAcrossCores set to true only",
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
policyOptions: map[string]string{
DistributeCPUsAcrossCoresOption: "true",
},
expectedErr: false,
},
{
description: "FullPhysicalCPUsOnly and DistributeCPUsAcrossCores options can not coexist",
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
policyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
DistributeCPUsAcrossCoresOption: "true",
},
expectedErr: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, testCase.featureGate, true)
_, err := NewStaticPolicyOptions(testCase.policyOptions)
gotError := err != nil
if gotError != testCase.expectedErr {
t.Errorf("testCase %q failed, got %v expected %v", testCase.description, gotError, testCase.expectedErr)
}
})
}
}

View File

@ -483,14 +483,19 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
}
func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
cpuSortingStrategy := CPUSortingStrategyPacked
if p.options.DistributeCPUsAcrossCores {
cpuSortingStrategy = CPUSortingStrategySpread
}
if p.options.DistributeCPUsAcrossNUMA {
cpuGroupSize := 1
if p.options.FullPhysicalCPUsOnly {
cpuGroupSize = p.topology.CPUsPerCore()
}
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize)
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize, cpuSortingStrategy)
}
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs)
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy)
}
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {