Merge pull request #102014 from klueska/upstream-update-cpu-asssignment-algorithm

Refactor the algorithm used to decide CPU assignments in the CPUManager
This commit is contained in:
Kubernetes Prow Robot 2021-05-20 16:10:56 -07:00 committed by GitHub
commit 823d870725
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 99 deletions

View File

@ -42,12 +42,6 @@ func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet,
}
}
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
a.result = a.result.Union(cpus)
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
a.numCPUsNeeded -= cpus.Size()
}
// Returns true if the supplied socket is fully available in `topoDetails`.
func (a *cpuAccumulator) isSocketFree(socketID int) bool {
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
@ -58,82 +52,125 @@ func (a *cpuAccumulator) isCoreFree(coreID int) bool {
return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore()
}
// Returns free socket IDs as a slice sorted by:
// - socket ID, ascending.
// Returns free socket IDs as a slice sorted by sortAvailableSockets().
func (a *cpuAccumulator) freeSockets() []int {
return a.details.Sockets().Filter(a.isSocketFree).ToSlice()
}
// Returns core IDs as a slice sorted by:
// - the number of whole available cores on the socket, ascending
// - socket ID, ascending
// - core ID, ascending
func (a *cpuAccumulator) freeCores() []int {
socketIDs := a.details.Sockets().ToSliceNoSort()
sort.Slice(socketIDs,
func(i, j int) bool {
iCores := a.details.CoresInSockets(socketIDs[i]).Filter(a.isCoreFree)
jCores := a.details.CoresInSockets(socketIDs[j]).Filter(a.isCoreFree)
return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j]
})
coreIDs := []int{}
for _, s := range socketIDs {
coreIDs = append(coreIDs, a.details.CoresInSockets(s).Filter(a.isCoreFree).ToSlice()...)
free := []int{}
for _, socket := range a.sortAvailableSockets() {
if a.isSocketFree(socket) {
free = append(free, socket)
}
}
return coreIDs
return free
}
// Returns CPU IDs as a slice sorted by:
// - socket affinity with result
// - number of CPUs available on the same socket
// - number of CPUs available on the same core
// - socket ID.
// - core ID.
// Returns free core IDs as a slice sorted by sortAvailableCores().
func (a *cpuAccumulator) freeCores() []int {
free := []int{}
for _, core := range a.sortAvailableCores() {
if a.isCoreFree(core) {
free = append(free, core)
}
}
return free
}
// Returns free CPU IDs as a slice sorted by sortAvailableCPUs().
func (a *cpuAccumulator) freeCPUs() []int {
result := []int{}
cores := a.details.Cores().ToSlice()
return a.sortAvailableCPUs()
}
sort.Slice(
cores,
// Sorts the provided list of sockets/cores/cpus referenced in 'ids' by the
// number of available CPUs contained within them (smallest to largest). The
// 'getCPU()' paramater defines the function that should be called to retrieve
// the list of available CPUs for the type of socket/core/cpu being referenced.
// If two sockets/cores/cpus have the same number of available CPUs, they are
// sorted in ascending order by their id.
func (a *cpuAccumulator) sort(ids []int, getCPUs func(ids ...int) cpuset.CPUSet) {
sort.Slice(ids,
func(i, j int) bool {
iCore := cores[i]
jCore := cores[j]
iCPUs := a.topo.CPUDetails.CPUsInCores(iCore).ToSlice()
jCPUs := a.topo.CPUDetails.CPUsInCores(jCore).ToSlice()
iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID
jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID
// Compute the number of CPUs in the result reside on the same socket
// as each core.
iSocketColoScore := a.topo.CPUDetails.CPUsInSockets(iSocket).Intersection(a.result).Size()
jSocketColoScore := a.topo.CPUDetails.CPUsInSockets(jSocket).Intersection(a.result).Size()
// Compute the number of available CPUs available on the same socket
// as each core.
iSocketFreeScore := a.details.CPUsInSockets(iSocket).Size()
jSocketFreeScore := a.details.CPUsInSockets(jSocket).Size()
// Compute the number of available CPUs on each core.
iCoreFreeScore := a.details.CPUsInCores(iCore).Size()
jCoreFreeScore := a.details.CPUsInCores(jCore).Size()
return iSocketColoScore > jSocketColoScore ||
iSocketFreeScore < jSocketFreeScore ||
iCoreFreeScore < jCoreFreeScore ||
iSocket < jSocket ||
iCore < jCore
iCPUs := getCPUs(ids[i])
jCPUs := getCPUs(ids[j])
if iCPUs.Size() < jCPUs.Size() {
return true
}
if iCPUs.Size() > jCPUs.Size() {
return false
}
return ids[i] < ids[j]
})
}
// For each core, append sorted CPU IDs to result.
for _, core := range cores {
result = append(result, a.details.CPUsInCores(core).ToSlice()...)
// Sort all sockets with free CPUs using the sort() algorithm defined above.
func (a *cpuAccumulator) sortAvailableSockets() []int {
sockets := a.details.Sockets().ToSliceNoSort()
a.sort(sockets, a.details.CPUsInSockets)
return sockets
}
// Sort all cores with free CPUs:
// - First by socket using sortAvailableSockets().
// - Then within each socket, using the sort() algorithm defined above.
func (a *cpuAccumulator) sortAvailableCores() []int {
var result []int
for _, socket := range a.sortAvailableSockets() {
cores := a.details.CoresInSockets(socket).ToSliceNoSort()
a.sort(cores, a.details.CPUsInCores)
result = append(result, cores...)
}
return result
}
// Sort all available CPUs:
// - First by core using sortAvailableCores().
// - Then within each core, using the sort() algorithm defined above.
func (a *cpuAccumulator) sortAvailableCPUs() []int {
var result []int
for _, core := range a.sortAvailableCores() {
cpus := a.details.CPUsInCores(core).ToSliceNoSort()
sort.Ints(cpus)
result = append(result, cpus...)
}
return result
}
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
a.result = a.result.Union(cpus)
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
a.numCPUsNeeded -= cpus.Size()
}
func (a *cpuAccumulator) takeFullSockets() {
for _, socket := range a.freeSockets() {
cpusInSocket := a.topo.CPUDetails.CPUsInSockets(socket)
if !a.needs(cpusInSocket.Size()) {
continue
}
klog.V(4).InfoS("takeFullSockets: claiming socket", "socket", socket)
a.take(cpusInSocket)
}
}
func (a *cpuAccumulator) takeFullCores() {
for _, core := range a.freeCores() {
cpusInCore := a.topo.CPUDetails.CPUsInCores(core)
if !a.needs(cpusInCore.Size()) {
continue
}
klog.V(4).InfoS("takeFullCores: claiming core", "core", core)
a.take(cpusInCore)
}
}
func (a *cpuAccumulator) takeRemainingCPUs() {
for _, cpu := range a.sortAvailableCPUs() {
klog.V(4).InfoS("takeRemainingCPUs: claiming CPU", "cpu", cpu)
a.take(cpuset.NewCPUSet(cpu))
if a.isSatisfied() {
return
}
}
}
func (a *cpuAccumulator) needs(n int) bool {
return a.numCPUsNeeded >= n
}
@ -158,45 +195,24 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num
// Algorithm: topology-aware best-fit
// 1. Acquire whole sockets, if available and the container requires at
// least a socket's-worth of CPUs.
if acc.needs(acc.topo.CPUsPerSocket()) {
for _, s := range acc.freeSockets() {
klog.V(4).InfoS("takeByTopology: claiming socket", "socket", s)
acc.take(acc.details.CPUsInSockets(s))
if acc.isSatisfied() {
return acc.result, nil
}
if !acc.needs(acc.topo.CPUsPerSocket()) {
break
}
}
acc.takeFullSockets()
if acc.isSatisfied() {
return acc.result, nil
}
// 2. Acquire whole cores, if available and the container requires at least
// a core's-worth of CPUs.
if acc.needs(acc.topo.CPUsPerCore()) {
for _, c := range acc.freeCores() {
klog.V(4).InfoS("takeByTopology: claiming core", "core", c)
acc.take(acc.details.CPUsInCores(c))
if acc.isSatisfied() {
return acc.result, nil
}
if !acc.needs(acc.topo.CPUsPerCore()) {
break
}
}
acc.takeFullCores()
if acc.isSatisfied() {
return acc.result, nil
}
// 3. Acquire single threads, preferring to fill partially-allocated cores
// on the same sockets as the whole cores we have already taken in this
// allocation.
for _, c := range acc.freeCPUs() {
klog.V(4).InfoS("takeByTopology: claiming CPU", "cpu", c)
if acc.needs(1) {
acc.take(cpuset.NewCPUSet(c))
}
if acc.isSatisfied() {
return acc.result, nil
}
acc.takeRemainingCPUs()
if acc.isSatisfied() {
return acc.result, nil
}
return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")

View File

@ -175,6 +175,12 @@ func TestCPUAccumulatorFreeCPUs(t *testing.T) {
cpuset.NewCPUSet(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
[]int{2, 8, 4, 10, 1, 7, 3, 9, 5, 11},
},
{
"triple socket HT, 12 cpus free",
topoTripleSocketHT,
cpuset.NewCPUSet(0, 1, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13),
[]int{12, 13, 0, 1, 2, 3, 6, 7, 8, 9, 10, 11},
},
}
for _, tc := range testCases {

View File

@ -73,6 +73,33 @@ var (
},
}
// fake topology for testing purposes only
topoTripleSocketHT = &topology.CPUTopology{
NumCPUs: 18,
NumSockets: 3,
NumCores: 9,
CPUDetails: map[int]topology.CPUInfo{
0: {CoreID: 0, SocketID: 1, NUMANodeID: 1},
1: {CoreID: 0, SocketID: 1, NUMANodeID: 1},
2: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
3: {CoreID: 1, SocketID: 1, NUMANodeID: 1},
4: {CoreID: 2, SocketID: 1, NUMANodeID: 1},
5: {CoreID: 2, SocketID: 1, NUMANodeID: 1},
6: {CoreID: 3, SocketID: 0, NUMANodeID: 0},
7: {CoreID: 3, SocketID: 0, NUMANodeID: 0},
8: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
9: {CoreID: 4, SocketID: 0, NUMANodeID: 0},
10: {CoreID: 5, SocketID: 0, NUMANodeID: 0},
11: {CoreID: 5, SocketID: 0, NUMANodeID: 0},
12: {CoreID: 6, SocketID: 2, NUMANodeID: 2},
13: {CoreID: 6, SocketID: 2, NUMANodeID: 2},
14: {CoreID: 7, SocketID: 2, NUMANodeID: 2},
15: {CoreID: 7, SocketID: 2, NUMANodeID: 2},
16: {CoreID: 8, SocketID: 2, NUMANodeID: 2},
17: {CoreID: 8, SocketID: 2, NUMANodeID: 2},
},
}
/*
Topology from https://www.open-mpi.org/projects/hwloc/lstopo/images/KNL.SNC4.H50.v1.11.png.
Socket0: