From 67c92a5cd4b5f8d85747edb6a6d723619af76563 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Fri, 14 May 2021 13:32:12 +0000 Subject: [PATCH 1/2] Refactor / simplify logic for CPU assignment algorithm in CPUManager Signed-off-by: Kevin Klues --- pkg/kubelet/cm/cpumanager/cpu_assignment.go | 214 +++++++++++--------- 1 file changed, 115 insertions(+), 99 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go index cb4b9807529..b599485151c 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -42,12 +42,6 @@ func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, } } -func (a *cpuAccumulator) take(cpus cpuset.CPUSet) { - a.result = a.result.Union(cpus) - a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result)) - a.numCPUsNeeded -= cpus.Size() -} - // Returns true if the supplied socket is fully available in `topoDetails`. func (a *cpuAccumulator) isSocketFree(socketID int) bool { return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket() @@ -58,82 +52,125 @@ func (a *cpuAccumulator) isCoreFree(coreID int) bool { return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore() } -// Returns free socket IDs as a slice sorted by: -// - socket ID, ascending. +// Returns free socket IDs as a slice sorted by sortAvailableSockets(). func (a *cpuAccumulator) freeSockets() []int { - return a.details.Sockets().Filter(a.isSocketFree).ToSlice() -} - -// Returns core IDs as a slice sorted by: -// - the number of whole available cores on the socket, ascending -// - socket ID, ascending -// - core ID, ascending -func (a *cpuAccumulator) freeCores() []int { - socketIDs := a.details.Sockets().ToSliceNoSort() - sort.Slice(socketIDs, - func(i, j int) bool { - iCores := a.details.CoresInSockets(socketIDs[i]).Filter(a.isCoreFree) - jCores := a.details.CoresInSockets(socketIDs[j]).Filter(a.isCoreFree) - return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j] - }) - - coreIDs := []int{} - for _, s := range socketIDs { - coreIDs = append(coreIDs, a.details.CoresInSockets(s).Filter(a.isCoreFree).ToSlice()...) + free := []int{} + for _, socket := range a.sortAvailableSockets() { + if a.isSocketFree(socket) { + free = append(free, socket) + } } - return coreIDs + return free } -// Returns CPU IDs as a slice sorted by: -// - socket affinity with result -// - number of CPUs available on the same socket -// - number of CPUs available on the same core -// - socket ID. -// - core ID. +// Returns free core IDs as a slice sorted by sortAvailableCores(). +func (a *cpuAccumulator) freeCores() []int { + free := []int{} + for _, core := range a.sortAvailableCores() { + if a.isCoreFree(core) { + free = append(free, core) + } + } + return free +} + +// Returns free CPU IDs as a slice sorted by sortAvailableCPUs(). func (a *cpuAccumulator) freeCPUs() []int { - result := []int{} - cores := a.details.Cores().ToSlice() + return a.sortAvailableCPUs() +} - sort.Slice( - cores, +// Sorts the provided list of sockets/cores/cpus referenced in 'ids' by the +// number of available CPUs contained within them (smallest to largest). The +// 'getCPU()' paramater defines the function that should be called to retrieve +// the list of available CPUs for the type of socket/core/cpu being referenced. +// If two sockets/cores/cpus have the same number of available CPUs, they are +// sorted in ascending order by their id. +func (a *cpuAccumulator) sort(ids []int, getCPUs func(ids ...int) cpuset.CPUSet) { + sort.Slice(ids, func(i, j int) bool { - iCore := cores[i] - jCore := cores[j] - - iCPUs := a.topo.CPUDetails.CPUsInCores(iCore).ToSlice() - jCPUs := a.topo.CPUDetails.CPUsInCores(jCore).ToSlice() - - iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID - jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID - - // Compute the number of CPUs in the result reside on the same socket - // as each core. - iSocketColoScore := a.topo.CPUDetails.CPUsInSockets(iSocket).Intersection(a.result).Size() - jSocketColoScore := a.topo.CPUDetails.CPUsInSockets(jSocket).Intersection(a.result).Size() - - // Compute the number of available CPUs available on the same socket - // as each core. - iSocketFreeScore := a.details.CPUsInSockets(iSocket).Size() - jSocketFreeScore := a.details.CPUsInSockets(jSocket).Size() - - // Compute the number of available CPUs on each core. - iCoreFreeScore := a.details.CPUsInCores(iCore).Size() - jCoreFreeScore := a.details.CPUsInCores(jCore).Size() - - return iSocketColoScore > jSocketColoScore || - iSocketFreeScore < jSocketFreeScore || - iCoreFreeScore < jCoreFreeScore || - iSocket < jSocket || - iCore < jCore + iCPUs := getCPUs(ids[i]) + jCPUs := getCPUs(ids[j]) + if iCPUs.Size() < jCPUs.Size() { + return true + } + if iCPUs.Size() > jCPUs.Size() { + return false + } + return ids[i] < ids[j] }) +} - // For each core, append sorted CPU IDs to result. - for _, core := range cores { - result = append(result, a.details.CPUsInCores(core).ToSlice()...) +// Sort all sockets with free CPUs using the sort() algorithm defined above. +func (a *cpuAccumulator) sortAvailableSockets() []int { + sockets := a.details.Sockets().ToSliceNoSort() + a.sort(sockets, a.details.CPUsInSockets) + return sockets +} + +// Sort all cores with free CPUs: +// - First by socket using sortAvailableSockets(). +// - Then within each socket, using the sort() algorithm defined above. +func (a *cpuAccumulator) sortAvailableCores() []int { + var result []int + for _, socket := range a.sortAvailableSockets() { + cores := a.details.CoresInSockets(socket).ToSliceNoSort() + a.sort(cores, a.details.CPUsInCores) + result = append(result, cores...) } return result } +// Sort all available CPUs: +// - First by core using sortAvailableCores(). +// - Then within each core, using the sort() algorithm defined above. +func (a *cpuAccumulator) sortAvailableCPUs() []int { + var result []int + for _, core := range a.sortAvailableCores() { + cpus := a.details.CPUsInCores(core).ToSliceNoSort() + sort.Ints(cpus) + result = append(result, cpus...) + } + return result +} + +func (a *cpuAccumulator) take(cpus cpuset.CPUSet) { + a.result = a.result.Union(cpus) + a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result)) + a.numCPUsNeeded -= cpus.Size() +} + +func (a *cpuAccumulator) takeFullSockets() { + for _, socket := range a.freeSockets() { + cpusInSocket := a.topo.CPUDetails.CPUsInSockets(socket) + if !a.needs(cpusInSocket.Size()) { + continue + } + klog.V(4).InfoS("takeFullSockets: claiming socket", "socket", socket) + a.take(cpusInSocket) + } +} + +func (a *cpuAccumulator) takeFullCores() { + for _, core := range a.freeCores() { + cpusInCore := a.topo.CPUDetails.CPUsInCores(core) + if !a.needs(cpusInCore.Size()) { + continue + } + klog.V(4).InfoS("takeFullCores: claiming core", "core", core) + a.take(cpusInCore) + } +} + +func (a *cpuAccumulator) takeRemainingCPUs() { + for _, cpu := range a.sortAvailableCPUs() { + klog.V(4).InfoS("takeRemainingCPUs: claiming CPU", "cpu", cpu) + a.take(cpuset.NewCPUSet(cpu)) + if a.isSatisfied() { + return + } + } +} + func (a *cpuAccumulator) needs(n int) bool { return a.numCPUsNeeded >= n } @@ -158,45 +195,24 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num // Algorithm: topology-aware best-fit // 1. Acquire whole sockets, if available and the container requires at // least a socket's-worth of CPUs. - if acc.needs(acc.topo.CPUsPerSocket()) { - for _, s := range acc.freeSockets() { - klog.V(4).InfoS("takeByTopology: claiming socket", "socket", s) - acc.take(acc.details.CPUsInSockets(s)) - if acc.isSatisfied() { - return acc.result, nil - } - if !acc.needs(acc.topo.CPUsPerSocket()) { - break - } - } + acc.takeFullSockets() + if acc.isSatisfied() { + return acc.result, nil } // 2. Acquire whole cores, if available and the container requires at least // a core's-worth of CPUs. - if acc.needs(acc.topo.CPUsPerCore()) { - for _, c := range acc.freeCores() { - klog.V(4).InfoS("takeByTopology: claiming core", "core", c) - acc.take(acc.details.CPUsInCores(c)) - if acc.isSatisfied() { - return acc.result, nil - } - if !acc.needs(acc.topo.CPUsPerCore()) { - break - } - } + acc.takeFullCores() + if acc.isSatisfied() { + return acc.result, nil } // 3. Acquire single threads, preferring to fill partially-allocated cores // on the same sockets as the whole cores we have already taken in this // allocation. - for _, c := range acc.freeCPUs() { - klog.V(4).InfoS("takeByTopology: claiming CPU", "cpu", c) - if acc.needs(1) { - acc.take(cpuset.NewCPUSet(c)) - } - if acc.isSatisfied() { - return acc.result, nil - } + acc.takeRemainingCPUs() + if acc.isSatisfied() { + return acc.result, nil } return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") From 9469ee70258158cd189f63c30695c9ea9c46663f Mon Sep 17 00:00:00 2001 From: Rancho Chen Date: Fri, 14 May 2021 19:52:50 +0800 Subject: [PATCH 2/2] Add testcase for freeCPUs with three Sockets --- .../cm/cpumanager/cpu_assignment_test.go | 6 +++++ pkg/kubelet/cm/cpumanager/policy_test.go | 27 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go index 2b2b04e7b29..7a25e3887c1 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go @@ -175,6 +175,12 @@ func TestCPUAccumulatorFreeCPUs(t *testing.T) { cpuset.NewCPUSet(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), []int{2, 8, 4, 10, 1, 7, 3, 9, 5, 11}, }, + { + "triple socket HT, 12 cpus free", + topoTripleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13), + []int{12, 13, 0, 1, 2, 3, 6, 7, 8, 9, 10, 11}, + }, } for _, tc := range testCases { diff --git a/pkg/kubelet/cm/cpumanager/policy_test.go b/pkg/kubelet/cm/cpumanager/policy_test.go index 02e3709c01d..7ce051b2b8f 100644 --- a/pkg/kubelet/cm/cpumanager/policy_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_test.go @@ -73,6 +73,33 @@ var ( }, } + // fake topology for testing purposes only + topoTripleSocketHT = &topology.CPUTopology{ + NumCPUs: 18, + NumSockets: 3, + NumCores: 9, + CPUDetails: map[int]topology.CPUInfo{ + 0: {CoreID: 0, SocketID: 1, NUMANodeID: 1}, + 1: {CoreID: 0, SocketID: 1, NUMANodeID: 1}, + 2: {CoreID: 1, SocketID: 1, NUMANodeID: 1}, + 3: {CoreID: 1, SocketID: 1, NUMANodeID: 1}, + 4: {CoreID: 2, SocketID: 1, NUMANodeID: 1}, + 5: {CoreID: 2, SocketID: 1, NUMANodeID: 1}, + 6: {CoreID: 3, SocketID: 0, NUMANodeID: 0}, + 7: {CoreID: 3, SocketID: 0, NUMANodeID: 0}, + 8: {CoreID: 4, SocketID: 0, NUMANodeID: 0}, + 9: {CoreID: 4, SocketID: 0, NUMANodeID: 0}, + 10: {CoreID: 5, SocketID: 0, NUMANodeID: 0}, + 11: {CoreID: 5, SocketID: 0, NUMANodeID: 0}, + 12: {CoreID: 6, SocketID: 2, NUMANodeID: 2}, + 13: {CoreID: 6, SocketID: 2, NUMANodeID: 2}, + 14: {CoreID: 7, SocketID: 2, NUMANodeID: 2}, + 15: {CoreID: 7, SocketID: 2, NUMANodeID: 2}, + 16: {CoreID: 8, SocketID: 2, NUMANodeID: 2}, + 17: {CoreID: 8, SocketID: 2, NUMANodeID: 2}, + }, + } + /* Topology from https://www.open-mpi.org/projects/hwloc/lstopo/images/KNL.SNC4.H50.v1.11.png. Socket0: