mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Merge pull request #123733 from Jeffwan/jiaxin/kep-4176-240305
KEP-4176: Add a new static policy SpreadPhysicalCPUsPreferredOption
This commit is contained in:
commit
3361895612
@ -196,6 +196,58 @@ func (s *socketsFirst) sortAvailableCores() []int {
|
||||
return result
|
||||
}
|
||||
|
||||
type availableCPUSorter interface {
|
||||
sort() []int
|
||||
}
|
||||
|
||||
type sortCPUsPacked struct{ acc *cpuAccumulator }
|
||||
type sortCPUsSpread struct{ acc *cpuAccumulator }
|
||||
|
||||
var _ availableCPUSorter = (*sortCPUsPacked)(nil)
|
||||
var _ availableCPUSorter = (*sortCPUsSpread)(nil)
|
||||
|
||||
func (s sortCPUsPacked) sort() []int {
|
||||
return s.acc.sortAvailableCPUsPacked()
|
||||
}
|
||||
|
||||
func (s sortCPUsSpread) sort() []int {
|
||||
return s.acc.sortAvailableCPUsSpread()
|
||||
}
|
||||
|
||||
// CPUSortingStrategy describes the CPU sorting solution within the socket scope.
|
||||
// Using topoDualSocketHT (12 CPUs, 2 sockets, 6 cores) as an example:
|
||||
//
|
||||
// CPUDetails: map[int]topology.CPUInfo{
|
||||
// 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0},
|
||||
// 1: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
|
||||
// 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0},
|
||||
// 3: {CoreID: 3, SocketID: 1, NUMANodeID: 1},
|
||||
// 4: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
|
||||
// 5: {CoreID: 5, SocketID: 1, NUMANodeID: 1},
|
||||
// 6: {CoreID: 0, SocketID: 0, NUMANodeID: 0},
|
||||
// 7: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
|
||||
// 8: {CoreID: 2, SocketID: 0, NUMANodeID: 0},
|
||||
// 9: {CoreID: 3, SocketID: 1, NUMANodeID: 1},
|
||||
// 10: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
|
||||
// 11: {CoreID: 5, SocketID: 1, NUMANodeID: 1},
|
||||
// },
|
||||
//
|
||||
// - CPUSortingOptionPacked sorts CPUs in a packed manner, where CPUs are grouped by core
|
||||
// before moving to the next core, resulting in packed cores, like:
|
||||
// 0, 2, 4, 6, 8, 10, 1, 3, 5, 7, 9, 11
|
||||
// - CPUSortingOptionSpread sorts CPUs in a spread manner, where CPUs are spread across cores
|
||||
// before moving to the next CPU, resulting in spread-out cores, like:
|
||||
// 0, 6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11
|
||||
//
|
||||
// By default, CPUSortingOptionPacked will be used, and CPUSortingOptionSpread will only be activated
|
||||
// when the user specifies the `DistributeCPUsAcrossCoresOption` static policy option.
|
||||
type CPUSortingStrategy string
|
||||
|
||||
const (
|
||||
CPUSortingStrategyPacked CPUSortingStrategy = "packed"
|
||||
CPUSortingStrategySpread CPUSortingStrategy = "spread"
|
||||
)
|
||||
|
||||
type cpuAccumulator struct {
|
||||
// `topo` describes the layout of CPUs (i.e. hyper-threads if hyperthreading is on) between
|
||||
// cores (i.e. physical CPUs if hyper-threading is on), NUMA nodes, and sockets on the K8s
|
||||
@ -223,9 +275,15 @@ type cpuAccumulator struct {
|
||||
result cpuset.CPUSet
|
||||
|
||||
numaOrSocketsFirst numaOrSocketsFirstFuncs
|
||||
|
||||
// availableCPUSorter is used to control the cpu sorting result.
|
||||
// The sequence of returned CPU IDs depends on the policy.
|
||||
// By default, cpus is sorted by sortAvailableCPUsPacked()
|
||||
// If packed is false, cpu is sorted by sortAvailableCPUsSpread()
|
||||
availableCPUSorter availableCPUSorter
|
||||
}
|
||||
|
||||
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator {
|
||||
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) *cpuAccumulator {
|
||||
acc := &cpuAccumulator{
|
||||
topo: topo,
|
||||
details: topo.CPUDetails.KeepOnly(availableCPUs),
|
||||
@ -239,6 +297,12 @@ func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet,
|
||||
acc.numaOrSocketsFirst = &socketsFirst{acc}
|
||||
}
|
||||
|
||||
if cpuSortingStrategy == CPUSortingStrategyPacked {
|
||||
acc.availableCPUSorter = &sortCPUsPacked{acc}
|
||||
} else {
|
||||
acc.availableCPUSorter = &sortCPUsSpread{acc}
|
||||
}
|
||||
|
||||
return acc
|
||||
}
|
||||
|
||||
@ -293,9 +357,9 @@ func (a *cpuAccumulator) freeCores() []int {
|
||||
return free
|
||||
}
|
||||
|
||||
// Returns free CPU IDs as a slice sorted by sortAvailableCPUs().
|
||||
// Returns free CPU IDs as a slice sorted by sortAvailableCPUsPacked().
|
||||
func (a *cpuAccumulator) freeCPUs() []int {
|
||||
return a.sortAvailableCPUs()
|
||||
return a.availableCPUSorter.sort()
|
||||
}
|
||||
|
||||
// Sorts the provided list of NUMA nodes/sockets/cores/cpus referenced in 'ids'
|
||||
@ -404,7 +468,7 @@ func (a *cpuAccumulator) sortAvailableCores() []int {
|
||||
// same way as described in the previous paragraph, except that the priority of NUMA nodes and
|
||||
// sockets is inverted (e.g. first sort the CPUs by number of free CPUs in their NUMA nodes, then,
|
||||
// for each NUMA node, sort the CPUs by number of free CPUs in their sockets, etc...).
|
||||
func (a *cpuAccumulator) sortAvailableCPUs() []int {
|
||||
func (a *cpuAccumulator) sortAvailableCPUsPacked() []int {
|
||||
var result []int
|
||||
for _, core := range a.sortAvailableCores() {
|
||||
cpus := a.details.CPUsInCores(core).UnsortedList()
|
||||
@ -414,6 +478,19 @@ func (a *cpuAccumulator) sortAvailableCPUs() []int {
|
||||
return result
|
||||
}
|
||||
|
||||
// Sort all available CPUs:
|
||||
// - First by core using sortAvailableSockets().
|
||||
// - Then within each socket, sort cpus directly using the sort() algorithm defined above.
|
||||
func (a *cpuAccumulator) sortAvailableCPUsSpread() []int {
|
||||
var result []int
|
||||
for _, socket := range a.sortAvailableSockets() {
|
||||
cpus := a.details.CPUsInSockets(socket).UnsortedList()
|
||||
sort.Ints(cpus)
|
||||
result = append(result, cpus...)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
|
||||
a.result = a.result.Union(cpus)
|
||||
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
|
||||
@ -454,7 +531,7 @@ func (a *cpuAccumulator) takeFullCores() {
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) takeRemainingCPUs() {
|
||||
for _, cpu := range a.sortAvailableCPUs() {
|
||||
for _, cpu := range a.availableCPUSorter.sort() {
|
||||
klog.V(4).InfoS("takeRemainingCPUs: claiming CPU", "cpu", cpu)
|
||||
a.take(cpuset.New(cpu))
|
||||
if a.isSatisfied() {
|
||||
@ -581,8 +658,8 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
|
||||
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
|
||||
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
|
||||
// the least amount of free CPUs to the one with the highest amount of free CPUs.
|
||||
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
|
||||
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
|
||||
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
|
||||
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
@ -606,9 +683,12 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
|
||||
|
||||
// 2. Acquire whole cores, if available and the container requires at least
|
||||
// a core's-worth of CPUs.
|
||||
acc.takeFullCores()
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
|
||||
if cpuSortingStrategy != CPUSortingStrategySpread {
|
||||
acc.takeFullCores()
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Acquire single threads, preferring to fill partially-allocated cores
|
||||
@ -685,16 +765,16 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
|
||||
// of size 'cpuGroupSize' according to the algorithm described above. This is
|
||||
// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from
|
||||
// a single core are allocated together.
|
||||
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int) (cpuset.CPUSet, error) {
|
||||
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
|
||||
// If the number of CPUs requested cannot be handed out in chunks of
|
||||
// 'cpuGroupSize', then we just call out the packing algorithm since we
|
||||
// can't distribute CPUs in this chunk size.
|
||||
if (numCPUs % cpuGroupSize) != 0 {
|
||||
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
|
||||
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
|
||||
}
|
||||
|
||||
// Otherwise build an accumulator to start allocating CPUs from.
|
||||
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
|
||||
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
@ -873,7 +953,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
|
||||
// size 'cpuGroupSize' from 'bestCombo'.
|
||||
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
|
||||
for _, numa := range bestCombo {
|
||||
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution)
|
||||
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy)
|
||||
acc.take(cpus)
|
||||
}
|
||||
|
||||
@ -888,7 +968,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
|
||||
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
|
||||
continue
|
||||
}
|
||||
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize)
|
||||
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy)
|
||||
acc.take(cpus)
|
||||
remainder -= cpuGroupSize
|
||||
}
|
||||
@ -912,5 +992,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
|
||||
|
||||
// If we never found a combination of NUMA nodes that we could properly
|
||||
// distribute CPUs across, fall back to the packing algorithm.
|
||||
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
|
||||
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ func TestCPUAccumulatorFreeSockets(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
|
||||
result := acc.freeSockets()
|
||||
sort.Ints(result)
|
||||
if !reflect.DeepEqual(result, tc.expect) {
|
||||
@ -214,7 +214,7 @@ func TestCPUAccumulatorFreeNUMANodes(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
|
||||
result := acc.freeNUMANodes()
|
||||
if !reflect.DeepEqual(result, tc.expect) {
|
||||
t.Errorf("expected %v to equal %v", result, tc.expect)
|
||||
@ -263,7 +263,7 @@ func TestCPUAccumulatorFreeSocketsAndNUMANodes(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
|
||||
resultNUMANodes := acc.freeNUMANodes()
|
||||
if !reflect.DeepEqual(resultNUMANodes, tc.expectNUMANodes) {
|
||||
t.Errorf("expected NUMA Nodes %v to equal %v", resultNUMANodes, tc.expectNUMANodes)
|
||||
@ -335,7 +335,7 @@ func TestCPUAccumulatorFreeCores(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
|
||||
result := acc.freeCores()
|
||||
if !reflect.DeepEqual(result, tc.expect) {
|
||||
t.Errorf("expected %v to equal %v", result, tc.expect)
|
||||
@ -391,7 +391,7 @@ func TestCPUAccumulatorFreeCPUs(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0)
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
|
||||
result := acc.freeCPUs()
|
||||
if !reflect.DeepEqual(result, tc.expect) {
|
||||
t.Errorf("expected %v to equal %v", result, tc.expect)
|
||||
@ -477,7 +477,7 @@ func TestCPUAccumulatorTake(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs)
|
||||
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs, CPUSortingStrategyPacked)
|
||||
totalTaken := 0
|
||||
for _, cpus := range tc.takeCPUs {
|
||||
acc.take(cpus)
|
||||
@ -509,6 +509,7 @@ func TestCPUAccumulatorTake(t *testing.T) {
|
||||
type takeByTopologyTestCase struct {
|
||||
description string
|
||||
topo *topology.CPUTopology
|
||||
opts StaticPolicyOptions
|
||||
availableCPUs cpuset.CPUSet
|
||||
numCPUs int
|
||||
expErr string
|
||||
@ -520,6 +521,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take more cpus than are available from single socket with HT",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(0, 2, 4, 6),
|
||||
5,
|
||||
"not enough cpus available to satisfy request: requested=5, available=4",
|
||||
@ -528,6 +530,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take zero cpus from single socket with HT",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
|
||||
0,
|
||||
"",
|
||||
@ -536,6 +539,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take one cpu from single socket with HT",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
|
||||
1,
|
||||
"",
|
||||
@ -544,6 +548,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take one cpu from single socket with HT, some cpus are taken",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(1, 3, 5, 6, 7),
|
||||
1,
|
||||
"",
|
||||
@ -552,6 +557,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take two cpus from single socket with HT",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
|
||||
2,
|
||||
"",
|
||||
@ -560,6 +566,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take all cpus from single socket with HT",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
|
||||
8,
|
||||
"",
|
||||
@ -568,6 +575,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take two cpus from single socket with HT, only one core totally free",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(0, 1, 2, 3, 6),
|
||||
2,
|
||||
"",
|
||||
@ -576,6 +584,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take a socket of cpus from dual socket with HT",
|
||||
topoDualSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
6,
|
||||
"",
|
||||
@ -584,6 +593,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take a socket of cpus from dual socket with multi-numa-per-socket with HT",
|
||||
topoDualSocketMultiNumaPerSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
mustParseCPUSet(t, "0-79"),
|
||||
40,
|
||||
"",
|
||||
@ -592,6 +602,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take a NUMA node of cpus from dual socket with multi-numa-per-socket with HT",
|
||||
topoDualSocketMultiNumaPerSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
mustParseCPUSet(t, "0-79"),
|
||||
20,
|
||||
"",
|
||||
@ -600,6 +611,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take a NUMA node of cpus from dual socket with multi-numa-per-socket with HT, with 1 NUMA node already taken",
|
||||
topoDualSocketMultiNumaPerSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
mustParseCPUSet(t, "10-39,50-79"),
|
||||
20,
|
||||
"",
|
||||
@ -608,6 +620,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take a socket and a NUMA node of cpus from dual socket with multi-numa-per-socket with HT",
|
||||
topoDualSocketMultiNumaPerSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
mustParseCPUSet(t, "0-79"),
|
||||
60,
|
||||
"",
|
||||
@ -616,6 +629,7 @@ func commonTakeByTopologyTestCases(t *testing.T) []takeByTopologyTestCase {
|
||||
{
|
||||
"take a socket and a NUMA node of cpus from dual socket with multi-numa-per-socket with HT, a core taken",
|
||||
topoDualSocketMultiNumaPerSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
mustParseCPUSet(t, "1-39,41-79"), // reserve the first (phys) core (0,40)
|
||||
60,
|
||||
"",
|
||||
@ -630,6 +644,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
|
||||
{
|
||||
"take one cpu from dual socket with HT - core from Socket 0",
|
||||
topoDualSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
|
||||
1,
|
||||
"",
|
||||
@ -638,6 +653,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
|
||||
{
|
||||
"allocate 4 full cores with 3 coming from the first NUMA node (filling it up) and 1 coming from the second NUMA node",
|
||||
topoDualSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
mustParseCPUSet(t, "0-11"),
|
||||
8,
|
||||
"",
|
||||
@ -646,6 +662,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
|
||||
{
|
||||
"allocate 32 full cores with 30 coming from the first 3 NUMA nodes (filling them up) and 2 coming from the fourth NUMA node",
|
||||
topoDualSocketMultiNumaPerSocketHT,
|
||||
StaticPolicyOptions{},
|
||||
mustParseCPUSet(t, "0-79"),
|
||||
64,
|
||||
"",
|
||||
@ -655,7 +672,12 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs)
|
||||
strategy := CPUSortingStrategyPacked
|
||||
if tc.opts.DistributeCPUsAcrossCores {
|
||||
strategy = CPUSortingStrategySpread
|
||||
}
|
||||
|
||||
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
|
||||
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
|
||||
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
|
||||
}
|
||||
@ -666,6 +688,106 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
topo *topology.CPUTopology
|
||||
opts StaticPolicyOptions
|
||||
availableCPUs cpuset.CPUSet
|
||||
numCPUs int
|
||||
expErr string
|
||||
expResult cpuset.CPUSet
|
||||
}{
|
||||
{
|
||||
"take a socket of cpus from single socket with HT, 3 cpus",
|
||||
topoSingleSocketHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
|
||||
3,
|
||||
"",
|
||||
cpuset.New(0, 1, 2),
|
||||
},
|
||||
{
|
||||
"take a socket of cpus from dual socket with HT, 2 cpus",
|
||||
topoDualSocketHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
2,
|
||||
"",
|
||||
cpuset.New(0, 2),
|
||||
},
|
||||
{
|
||||
"take a socket of cpus from dual socket with HT, 3 cpus",
|
||||
topoDualSocketHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
3,
|
||||
"",
|
||||
cpuset.New(0, 2, 4),
|
||||
},
|
||||
{
|
||||
"take a socket of cpus from dual socket with HT, 6 cpus",
|
||||
topoDualSocketHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
6,
|
||||
"",
|
||||
cpuset.New(0, 2, 4, 6, 8, 10),
|
||||
},
|
||||
{
|
||||
"take cpus from dual socket with HT, 8 cpus",
|
||||
topoDualSocketHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
8,
|
||||
"",
|
||||
cpuset.New(0, 2, 4, 6, 8, 10, 1, 3),
|
||||
},
|
||||
{
|
||||
"take a socket of cpus from dual socket without HT, 2 cpus",
|
||||
topoDualSocketNoHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
|
||||
2,
|
||||
"",
|
||||
cpuset.New(0, 1),
|
||||
},
|
||||
{
|
||||
// DistributeCPUsAcrossCores doesn't care socket and numa ranking. such setting in test is transparent.
|
||||
"take a socket of cpus from dual socket with multi numa per socket and HT, 12 cpus",
|
||||
topoDualSocketMultiNumaPerSocketHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
mustParseCPUSet(t, "0-79"),
|
||||
8,
|
||||
"",
|
||||
mustParseCPUSet(t, "0-7"),
|
||||
},
|
||||
{
|
||||
"take a socket of cpus from quad socket four way with HT, 12 cpus",
|
||||
topoQuadSocketFourWayHT,
|
||||
StaticPolicyOptions{DistributeCPUsAcrossCores: true},
|
||||
mustParseCPUSet(t, "0-287"),
|
||||
12,
|
||||
"",
|
||||
mustParseCPUSet(t, "0-2,9-10,13-14,21-22,25-26,33"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
strategy := CPUSortingStrategyPacked
|
||||
if tc.opts.DistributeCPUsAcrossCores {
|
||||
strategy = CPUSortingStrategySpread
|
||||
}
|
||||
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
|
||||
if tc.expErr != "" && err.Error() != tc.expErr {
|
||||
t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err)
|
||||
}
|
||||
if !result.Equals(tc.expResult) {
|
||||
t.Errorf("testCase %q failed, expected result [%s] to equal [%s]", tc.description, result, tc.expResult)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type takeByTopologyExtendedTestCase struct {
|
||||
description string
|
||||
topo *topology.CPUTopology
|
||||
@ -858,7 +980,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize)
|
||||
result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize, CPUSortingStrategyPacked)
|
||||
if err != nil {
|
||||
if tc.expErr == "" {
|
||||
t.Errorf("unexpected error [%v]", err)
|
||||
|
@ -29,15 +29,17 @@ import (
|
||||
|
||||
// Names of the options, as part of the user interface.
|
||||
const (
|
||||
FullPCPUsOnlyOption string = "full-pcpus-only"
|
||||
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
|
||||
AlignBySocketOption string = "align-by-socket"
|
||||
FullPCPUsOnlyOption string = "full-pcpus-only"
|
||||
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
|
||||
AlignBySocketOption string = "align-by-socket"
|
||||
DistributeCPUsAcrossCoresOption string = "distribute-cpus-across-cores"
|
||||
)
|
||||
|
||||
var (
|
||||
alphaOptions = sets.New[string](
|
||||
DistributeCPUsAcrossNUMAOption,
|
||||
AlignBySocketOption,
|
||||
DistributeCPUsAcrossCoresOption,
|
||||
)
|
||||
betaOptions = sets.New[string](
|
||||
FullPCPUsOnlyOption,
|
||||
@ -80,6 +82,10 @@ type StaticPolicyOptions struct {
|
||||
// Flag to ensure CPUs are considered aligned at socket boundary rather than
|
||||
// NUMA boundary
|
||||
AlignBySocket bool
|
||||
// flag to enable extra allocation restrictions to spread
|
||||
// cpus (HT) on different physical core.
|
||||
// This is a preferred policy so do not throw error if they have to packed in one physical core.
|
||||
DistributeCPUsAcrossCores bool
|
||||
}
|
||||
|
||||
// NewStaticPolicyOptions creates a StaticPolicyOptions struct from the user configuration.
|
||||
@ -109,12 +115,29 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
|
||||
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
|
||||
}
|
||||
opts.AlignBySocket = optValue
|
||||
case DistributeCPUsAcrossCoresOption:
|
||||
optValue, err := strconv.ParseBool(value)
|
||||
if err != nil {
|
||||
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
|
||||
}
|
||||
opts.DistributeCPUsAcrossCores = optValue
|
||||
|
||||
default:
|
||||
// this should never be reached, we already detect unknown options,
|
||||
// but we keep it as further safety.
|
||||
return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value)
|
||||
}
|
||||
}
|
||||
|
||||
if opts.FullPhysicalCPUsOnly && opts.DistributeCPUsAcrossCores {
|
||||
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", FullPCPUsOnlyOption, DistributeCPUsAcrossCoresOption)
|
||||
}
|
||||
|
||||
// TODO(@Jeffwan): Remove this check after more compatibility tests are done.
|
||||
if opts.DistributeCPUsAcrossNUMA && opts.DistributeCPUsAcrossCores {
|
||||
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", DistributeCPUsAcrossNUMAOption, DistributeCPUsAcrossCoresOption)
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
|
@ -94,6 +94,30 @@ func TestPolicyOptionsAvailable(t *testing.T) {
|
||||
featureGateEnable: true,
|
||||
expectedAvailable: false,
|
||||
},
|
||||
{
|
||||
option: DistributeCPUsAcrossNUMAOption,
|
||||
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
|
||||
featureGateEnable: true,
|
||||
expectedAvailable: true,
|
||||
},
|
||||
{
|
||||
option: DistributeCPUsAcrossNUMAOption,
|
||||
featureGate: pkgfeatures.CPUManagerPolicyBetaOptions,
|
||||
featureGateEnable: true,
|
||||
expectedAvailable: false,
|
||||
},
|
||||
{
|
||||
option: DistributeCPUsAcrossCoresOption,
|
||||
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
|
||||
featureGateEnable: true,
|
||||
expectedAvailable: true,
|
||||
},
|
||||
{
|
||||
option: DistributeCPUsAcrossCoresOption,
|
||||
featureGate: pkgfeatures.CPUManagerPolicyBetaOptions,
|
||||
featureGateEnable: true,
|
||||
expectedAvailable: false,
|
||||
},
|
||||
}
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.option, func(t *testing.T) {
|
||||
@ -163,7 +187,6 @@ func TestValidateStaticPolicyOptions(t *testing.T) {
|
||||
topoMgrPolicy := topologymanager.NewNonePolicy()
|
||||
if testCase.topoMgrPolicy == topologymanager.PolicySingleNumaNode {
|
||||
topoMgrPolicy = topologymanager.NewSingleNumaNodePolicy(&topologymanager.NUMAInfo{}, topologymanager.PolicyOptions{})
|
||||
|
||||
}
|
||||
topoMgrStore := topologymanager.NewFakeManagerWithPolicy(topoMgrPolicy)
|
||||
|
||||
@ -177,3 +200,49 @@ func TestValidateStaticPolicyOptions(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPolicyOptionsCompatibility(t *testing.T) {
|
||||
// take feature gate into the consideration
|
||||
testCases := []struct {
|
||||
description string
|
||||
featureGate featuregate.Feature
|
||||
policyOptions map[string]string
|
||||
expectedErr bool
|
||||
}{
|
||||
{
|
||||
description: "FullPhysicalCPUsOnly set to true only",
|
||||
featureGate: pkgfeatures.CPUManagerPolicyBetaOptions,
|
||||
policyOptions: map[string]string{
|
||||
FullPCPUsOnlyOption: "true",
|
||||
},
|
||||
expectedErr: false,
|
||||
},
|
||||
{
|
||||
description: "DistributeCPUsAcrossCores set to true only",
|
||||
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
|
||||
policyOptions: map[string]string{
|
||||
DistributeCPUsAcrossCoresOption: "true",
|
||||
},
|
||||
expectedErr: false,
|
||||
},
|
||||
{
|
||||
description: "FullPhysicalCPUsOnly and DistributeCPUsAcrossCores options can not coexist",
|
||||
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
|
||||
policyOptions: map[string]string{
|
||||
FullPCPUsOnlyOption: "true",
|
||||
DistributeCPUsAcrossCoresOption: "true",
|
||||
},
|
||||
expectedErr: true,
|
||||
},
|
||||
}
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.description, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, testCase.featureGate, true)
|
||||
_, err := NewStaticPolicyOptions(testCase.policyOptions)
|
||||
gotError := err != nil
|
||||
if gotError != testCase.expectedErr {
|
||||
t.Errorf("testCase %q failed, got %v expected %v", testCase.description, gotError, testCase.expectedErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -483,14 +483,19 @@ func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
|
||||
}
|
||||
|
||||
func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
|
||||
cpuSortingStrategy := CPUSortingStrategyPacked
|
||||
if p.options.DistributeCPUsAcrossCores {
|
||||
cpuSortingStrategy = CPUSortingStrategySpread
|
||||
}
|
||||
|
||||
if p.options.DistributeCPUsAcrossNUMA {
|
||||
cpuGroupSize := 1
|
||||
if p.options.FullPhysicalCPUsOnly {
|
||||
cpuGroupSize = p.topology.CPUsPerCore()
|
||||
}
|
||||
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize)
|
||||
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize, cpuSortingStrategy)
|
||||
}
|
||||
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs)
|
||||
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy)
|
||||
}
|
||||
|
||||
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
|
Loading…
Reference in New Issue
Block a user