diff --git a/pkg/kubelet/cm/cpumanager/cpu_assignment.go b/pkg/kubelet/cm/cpumanager/cpu_assignment.go new file mode 100644 index 00000000000..a4de3b5ec89 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/cpu_assignment.go @@ -0,0 +1,197 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "fmt" + "sort" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +type cpuAccumulator struct { + topo *topology.CPUTopology + details topology.CPUDetails + numCPUsNeeded int + result cpuset.CPUSet +} + +func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator { + return &cpuAccumulator{ + topo: topo, + details: topo.CPUDetails.KeepOnly(availableCPUs), + numCPUsNeeded: numCPUs, + result: cpuset.NewCPUSet(), + } +} + +func (a *cpuAccumulator) take(cpus cpuset.CPUSet) { + a.result = a.result.Union(cpus) + a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result)) + a.numCPUsNeeded -= cpus.Size() +} + +// Returns true if the supplied socket is fully available in `topoDetails`. +func (a *cpuAccumulator) isSocketFree(socketID int) bool { + return a.details.CPUsInSocket(socketID).Size() == a.topo.CPUsPerSocket() +} + +// Returns true if the supplied core is fully available in `topoDetails`. +func (a *cpuAccumulator) isCoreFree(coreID int) bool { + return a.details.CPUsInCore(coreID).Size() == a.topo.CPUsPerCore() +} + +// Returns free socket IDs as a slice sorted by: +// - socket ID, ascending. +func (a *cpuAccumulator) freeSockets() []int { + return a.details.Sockets().Filter(a.isSocketFree).ToSlice() +} + +// Returns core IDs as a slice sorted by: +// - the number of whole available cores on the socket, ascending +// - socket ID, ascending +// - core ID, ascending +func (a *cpuAccumulator) freeCores() []int { + socketIDs := a.details.Sockets().ToSlice() + sort.Slice(socketIDs, + func(i, j int) bool { + iCores := a.details.CoresInSocket(socketIDs[i]).Filter(a.isCoreFree) + jCores := a.details.CoresInSocket(socketIDs[j]).Filter(a.isCoreFree) + return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j] + }) + + coreIDs := []int{} + for _, s := range socketIDs { + coreIDs = append(coreIDs, a.details.CoresInSocket(s).Filter(a.isCoreFree).ToSlice()...) + } + return coreIDs +} + +// Returns CPU IDs as a slice sorted by: +// - socket affinity with result +// - number of CPUs available on the same sockett +// - number of CPUs available on the same core +// - socket ID. +// - core ID. +func (a *cpuAccumulator) freeCPUs() []int { + result := []int{} + cores := a.details.Cores().ToSlice() + + sort.Slice( + cores, + func(i, j int) bool { + iCore := cores[i] + jCore := cores[j] + + iCPUs := a.topo.CPUDetails.CPUsInCore(iCore).ToSlice() + jCPUs := a.topo.CPUDetails.CPUsInCore(jCore).ToSlice() + + iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID + jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID + + // Compute the number of CPUs in the result reside on the same socket + // as each core. + iSocketColoScore := a.topo.CPUDetails.CPUsInSocket(iSocket).Intersection(a.result).Size() + jSocketColoScore := a.topo.CPUDetails.CPUsInSocket(jSocket).Intersection(a.result).Size() + + // Compute the number of available CPUs available on the same socket + // as each core. + iSocketFreeScore := a.details.CPUsInSocket(iSocket).Size() + jSocketFreeScore := a.details.CPUsInSocket(jSocket).Size() + + // Compute the number of available CPUs on each core. + iCoreFreeScore := a.details.CPUsInCore(iCore).Size() + jCoreFreeScore := a.details.CPUsInCore(jCore).Size() + + return iSocketColoScore > jSocketColoScore || + iSocketFreeScore < jSocketFreeScore || + iCoreFreeScore < jCoreFreeScore || + iSocket < jSocket || + iCore < jCore + }) + + // For each core, append sorted CPU IDs to result. + for _, core := range cores { + result = append(result, a.details.CPUsInCore(core).ToSlice()...) + } + return result +} + +func (a *cpuAccumulator) needs(n int) bool { + return a.numCPUsNeeded >= n +} + +func (a *cpuAccumulator) isSatisfied() bool { + return a.numCPUsNeeded < 1 +} + +func (a *cpuAccumulator) isFailed() bool { + return a.numCPUsNeeded > a.details.CPUs().Size() +} + +func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + acc := newCPUAccumulator(topo, availableCPUs, numCPUs) + if acc.isSatisfied() { + return acc.result, nil + } + if acc.isFailed() { + return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request") + } + + // Algorithm: topology-aware best-fit + // 1. Acquire whole sockets, if available and the container requires at + // least a socket's-worth of CPUs. + for _, s := range acc.freeSockets() { + if acc.needs(acc.topo.CPUsPerSocket()) { + glog.V(4).Infof("[cpumanager] takeByTopology: claiming socket [%d]", s) + acc.take(acc.details.CPUsInSocket(s)) + if acc.isSatisfied() { + return acc.result, nil + } + } + } + + // 2. Acquire whole cores, if available and the container requires at least + // a core's-worth of CPUs. + for _, c := range acc.freeCores() { + if acc.needs(acc.topo.CPUsPerCore()) { + glog.V(4).Infof("[cpumanager] takeByTopology: claiming core [%d]", c) + acc.take(acc.details.CPUsInCore(c)) + if acc.isSatisfied() { + return acc.result, nil + } + } + } + + // 3. Acquire single threads, preferring to fill partially-allocated cores + // on the same sockets as the whole cores we have already taken in this + // allocation. + for _, c := range acc.freeCPUs() { + glog.V(4).Infof("[cpumanager] takeByTopology: claiming CPU [%d]", c) + if acc.needs(1) { + acc.take(cpuset.NewCPUSet(c)) + } + if acc.isSatisfied() { + return acc.result, nil + } + } + + return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") +}