mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-31 08:36:16 +00:00
Refactor / simplify logic for CPU assignment algorithm in CPUManager
Signed-off-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
@@ -42,12 +42,6 @@ func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
|
||||
a.result = a.result.Union(cpus)
|
||||
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
|
||||
a.numCPUsNeeded -= cpus.Size()
|
||||
}
|
||||
|
||||
// Returns true if the supplied socket is fully available in `topoDetails`.
|
||||
func (a *cpuAccumulator) isSocketFree(socketID int) bool {
|
||||
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
|
||||
@@ -58,82 +52,125 @@ func (a *cpuAccumulator) isCoreFree(coreID int) bool {
|
||||
return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore()
|
||||
}
|
||||
|
||||
// Returns free socket IDs as a slice sorted by:
|
||||
// - socket ID, ascending.
|
||||
// Returns free socket IDs as a slice sorted by sortAvailableSockets().
|
||||
func (a *cpuAccumulator) freeSockets() []int {
|
||||
return a.details.Sockets().Filter(a.isSocketFree).ToSlice()
|
||||
}
|
||||
|
||||
// Returns core IDs as a slice sorted by:
|
||||
// - the number of whole available cores on the socket, ascending
|
||||
// - socket ID, ascending
|
||||
// - core ID, ascending
|
||||
func (a *cpuAccumulator) freeCores() []int {
|
||||
socketIDs := a.details.Sockets().ToSliceNoSort()
|
||||
sort.Slice(socketIDs,
|
||||
func(i, j int) bool {
|
||||
iCores := a.details.CoresInSockets(socketIDs[i]).Filter(a.isCoreFree)
|
||||
jCores := a.details.CoresInSockets(socketIDs[j]).Filter(a.isCoreFree)
|
||||
return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j]
|
||||
})
|
||||
|
||||
coreIDs := []int{}
|
||||
for _, s := range socketIDs {
|
||||
coreIDs = append(coreIDs, a.details.CoresInSockets(s).Filter(a.isCoreFree).ToSlice()...)
|
||||
free := []int{}
|
||||
for _, socket := range a.sortAvailableSockets() {
|
||||
if a.isSocketFree(socket) {
|
||||
free = append(free, socket)
|
||||
}
|
||||
}
|
||||
return coreIDs
|
||||
return free
|
||||
}
|
||||
|
||||
// Returns CPU IDs as a slice sorted by:
|
||||
// - socket affinity with result
|
||||
// - number of CPUs available on the same socket
|
||||
// - number of CPUs available on the same core
|
||||
// - socket ID.
|
||||
// - core ID.
|
||||
// Returns free core IDs as a slice sorted by sortAvailableCores().
|
||||
func (a *cpuAccumulator) freeCores() []int {
|
||||
free := []int{}
|
||||
for _, core := range a.sortAvailableCores() {
|
||||
if a.isCoreFree(core) {
|
||||
free = append(free, core)
|
||||
}
|
||||
}
|
||||
return free
|
||||
}
|
||||
|
||||
// Returns free CPU IDs as a slice sorted by sortAvailableCPUs().
|
||||
func (a *cpuAccumulator) freeCPUs() []int {
|
||||
result := []int{}
|
||||
cores := a.details.Cores().ToSlice()
|
||||
return a.sortAvailableCPUs()
|
||||
}
|
||||
|
||||
sort.Slice(
|
||||
cores,
|
||||
// Sorts the provided list of sockets/cores/cpus referenced in 'ids' by the
|
||||
// number of available CPUs contained within them (smallest to largest). The
|
||||
// 'getCPU()' paramater defines the function that should be called to retrieve
|
||||
// the list of available CPUs for the type of socket/core/cpu being referenced.
|
||||
// If two sockets/cores/cpus have the same number of available CPUs, they are
|
||||
// sorted in ascending order by their id.
|
||||
func (a *cpuAccumulator) sort(ids []int, getCPUs func(ids ...int) cpuset.CPUSet) {
|
||||
sort.Slice(ids,
|
||||
func(i, j int) bool {
|
||||
iCore := cores[i]
|
||||
jCore := cores[j]
|
||||
|
||||
iCPUs := a.topo.CPUDetails.CPUsInCores(iCore).ToSlice()
|
||||
jCPUs := a.topo.CPUDetails.CPUsInCores(jCore).ToSlice()
|
||||
|
||||
iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID
|
||||
jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID
|
||||
|
||||
// Compute the number of CPUs in the result reside on the same socket
|
||||
// as each core.
|
||||
iSocketColoScore := a.topo.CPUDetails.CPUsInSockets(iSocket).Intersection(a.result).Size()
|
||||
jSocketColoScore := a.topo.CPUDetails.CPUsInSockets(jSocket).Intersection(a.result).Size()
|
||||
|
||||
// Compute the number of available CPUs available on the same socket
|
||||
// as each core.
|
||||
iSocketFreeScore := a.details.CPUsInSockets(iSocket).Size()
|
||||
jSocketFreeScore := a.details.CPUsInSockets(jSocket).Size()
|
||||
|
||||
// Compute the number of available CPUs on each core.
|
||||
iCoreFreeScore := a.details.CPUsInCores(iCore).Size()
|
||||
jCoreFreeScore := a.details.CPUsInCores(jCore).Size()
|
||||
|
||||
return iSocketColoScore > jSocketColoScore ||
|
||||
iSocketFreeScore < jSocketFreeScore ||
|
||||
iCoreFreeScore < jCoreFreeScore ||
|
||||
iSocket < jSocket ||
|
||||
iCore < jCore
|
||||
iCPUs := getCPUs(ids[i])
|
||||
jCPUs := getCPUs(ids[j])
|
||||
if iCPUs.Size() < jCPUs.Size() {
|
||||
return true
|
||||
}
|
||||
if iCPUs.Size() > jCPUs.Size() {
|
||||
return false
|
||||
}
|
||||
return ids[i] < ids[j]
|
||||
})
|
||||
}
|
||||
|
||||
// For each core, append sorted CPU IDs to result.
|
||||
for _, core := range cores {
|
||||
result = append(result, a.details.CPUsInCores(core).ToSlice()...)
|
||||
// Sort all sockets with free CPUs using the sort() algorithm defined above.
|
||||
func (a *cpuAccumulator) sortAvailableSockets() []int {
|
||||
sockets := a.details.Sockets().ToSliceNoSort()
|
||||
a.sort(sockets, a.details.CPUsInSockets)
|
||||
return sockets
|
||||
}
|
||||
|
||||
// Sort all cores with free CPUs:
|
||||
// - First by socket using sortAvailableSockets().
|
||||
// - Then within each socket, using the sort() algorithm defined above.
|
||||
func (a *cpuAccumulator) sortAvailableCores() []int {
|
||||
var result []int
|
||||
for _, socket := range a.sortAvailableSockets() {
|
||||
cores := a.details.CoresInSockets(socket).ToSliceNoSort()
|
||||
a.sort(cores, a.details.CPUsInCores)
|
||||
result = append(result, cores...)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Sort all available CPUs:
|
||||
// - First by core using sortAvailableCores().
|
||||
// - Then within each core, using the sort() algorithm defined above.
|
||||
func (a *cpuAccumulator) sortAvailableCPUs() []int {
|
||||
var result []int
|
||||
for _, core := range a.sortAvailableCores() {
|
||||
cpus := a.details.CPUsInCores(core).ToSliceNoSort()
|
||||
sort.Ints(cpus)
|
||||
result = append(result, cpus...)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
|
||||
a.result = a.result.Union(cpus)
|
||||
a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
|
||||
a.numCPUsNeeded -= cpus.Size()
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) takeFullSockets() {
|
||||
for _, socket := range a.freeSockets() {
|
||||
cpusInSocket := a.topo.CPUDetails.CPUsInSockets(socket)
|
||||
if !a.needs(cpusInSocket.Size()) {
|
||||
continue
|
||||
}
|
||||
klog.V(4).InfoS("takeFullSockets: claiming socket", "socket", socket)
|
||||
a.take(cpusInSocket)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) takeFullCores() {
|
||||
for _, core := range a.freeCores() {
|
||||
cpusInCore := a.topo.CPUDetails.CPUsInCores(core)
|
||||
if !a.needs(cpusInCore.Size()) {
|
||||
continue
|
||||
}
|
||||
klog.V(4).InfoS("takeFullCores: claiming core", "core", core)
|
||||
a.take(cpusInCore)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) takeRemainingCPUs() {
|
||||
for _, cpu := range a.sortAvailableCPUs() {
|
||||
klog.V(4).InfoS("takeRemainingCPUs: claiming CPU", "cpu", cpu)
|
||||
a.take(cpuset.NewCPUSet(cpu))
|
||||
if a.isSatisfied() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *cpuAccumulator) needs(n int) bool {
|
||||
return a.numCPUsNeeded >= n
|
||||
}
|
||||
@@ -158,45 +195,24 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num
|
||||
// Algorithm: topology-aware best-fit
|
||||
// 1. Acquire whole sockets, if available and the container requires at
|
||||
// least a socket's-worth of CPUs.
|
||||
if acc.needs(acc.topo.CPUsPerSocket()) {
|
||||
for _, s := range acc.freeSockets() {
|
||||
klog.V(4).InfoS("takeByTopology: claiming socket", "socket", s)
|
||||
acc.take(acc.details.CPUsInSockets(s))
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
if !acc.needs(acc.topo.CPUsPerSocket()) {
|
||||
break
|
||||
}
|
||||
}
|
||||
acc.takeFullSockets()
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
|
||||
// 2. Acquire whole cores, if available and the container requires at least
|
||||
// a core's-worth of CPUs.
|
||||
if acc.needs(acc.topo.CPUsPerCore()) {
|
||||
for _, c := range acc.freeCores() {
|
||||
klog.V(4).InfoS("takeByTopology: claiming core", "core", c)
|
||||
acc.take(acc.details.CPUsInCores(c))
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
if !acc.needs(acc.topo.CPUsPerCore()) {
|
||||
break
|
||||
}
|
||||
}
|
||||
acc.takeFullCores()
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
|
||||
// 3. Acquire single threads, preferring to fill partially-allocated cores
|
||||
// on the same sockets as the whole cores we have already taken in this
|
||||
// allocation.
|
||||
for _, c := range acc.freeCPUs() {
|
||||
klog.V(4).InfoS("takeByTopology: claiming CPU", "cpu", c)
|
||||
if acc.needs(1) {
|
||||
acc.take(cpuset.NewCPUSet(c))
|
||||
}
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
acc.takeRemainingCPUs()
|
||||
if acc.isSatisfied() {
|
||||
return acc.result, nil
|
||||
}
|
||||
|
||||
return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
|
||||
|
Reference in New Issue
Block a user