From 156b3f6af8c6eff4318cd6d266e8846d9a3062e9 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Wed, 7 Aug 2019 13:41:33 +0200 Subject: [PATCH] Generate TopologyHints from the CPUManager --- pkg/kubelet/cm/cpumanager/BUILD | 2 + pkg/kubelet/cm/cpumanager/policy_static.go | 4 +- pkg/kubelet/cm/cpumanager/topology_hints.go | 113 +++++++++++++++++--- 3 files changed, 101 insertions(+), 18 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index ea0b7c2cc48..d894f1c3c4a 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/cm/topologymanager:go_default_library", + "//pkg/kubelet/cm/topologymanager/socketmask:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/status:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -46,6 +47,7 @@ go_test( "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/cm/topologymanager:go_default_library", + "//pkg/kubelet/cm/topologymanager/socketmask:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 7200e0fcec6..0c82c833a30 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -190,7 +190,7 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co } }() - if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 { + if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) // container belongs in an exclusively allocated pool @@ -257,7 +257,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, return result, nil } -func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { +func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return 0 } diff --git a/pkg/kubelet/cm/cpumanager/topology_hints.go b/pkg/kubelet/cm/cpumanager/topology_hints.go index aafb3b43aef..f4be9d6d3e6 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints.go @@ -21,26 +21,107 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask" ) func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint { - var cpuHints []topologymanager.TopologyHint - if requestedObj, ok := container.Resources.Requests[v1.ResourceCPU]; ok { - // Get a count of how many CPUs have been requested - requested := int(requestedObj.Value()) - klog.Infof("[cpumanager] Guaranteed CPUs detected: %v", requested) - - // Discover topology in order to establish the number - // of available CPUs per socket. - _, err := topology.Discover(m.machineInfo) - if err != nil { - klog.Infof("[cpu manager] error discovering topology") - return nil - } - - // TODO: Fill in cpuHints with proper TopologyHints + // The 'none' policy does not generate topology hints. + if m.policy.Name() == string(PolicyNone) { + return nil } - klog.Infof("[cpumanager] Topology Hints for pod: %v", cpuHints) + + // For all other policies, if there are no CPU resources requested for this + // container, we do not generate any topology hints. + if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok { + return nil + } + + // Otherwise, attempt to generate TopologyHints for the CPUManager. + // For now, this implementation assumes the 'static' CPUManager policy. + // TODO: Generalize this so that its applicable to future CPUManager polices. + + // Get a count of how many guaranteed CPUs have been requested. + requested := m.policy.(*staticPolicy).guaranteedCPUs(&pod, &container) + + // If there are no guaranteed CPUs being requested, we do not generate + // any topology hints. This can happen, for example, because init + // containers don't have to have guaranteed CPUs in order for the pod + // to still be in the Guaranteed QOS tier. + if requested == 0 { + return nil + } + + // Get a list of available CPUs. + available := m.policy.(*staticPolicy).assignableCPUs(m.state) + + // Generate hints. + cpuHints := m.generateCPUTopologyHints(available, requested) + klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", pod.Name, container.Name, cpuHints) + return cpuHints } + +// generateCPUtopologyHints generates a set of TopologyHints given the set of +// available CPUs and the number of CPUs being requested. +// +// It follows the convention of marking all hints that have the same number of +// bits set as the narrowest matching SocketAffinity with 'Preferred: true', and +// marking all others with 'Preferred: false'. +func (m *manager) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, request int) []topologymanager.TopologyHint { + // Discover topology in order to establish the number + // of available CPUs per socket. + topo, err := topology.Discover(m.machineInfo) + if err != nil { + klog.Warningf("[cpu manager] Error discovering topology for TopologyHint generation") + return nil + } + + // Initialize minAffinity to a full affinity mask. + minAffinity, _ := socketmask.NewSocketMask() + minAffinity.Fill() + + // Iterate through all combinations of socketMasks and build hints from them. + hints := []topologymanager.TopologyHint{} + socketmask.IterateSocketMasks(topo.CPUDetails.Sockets().ToSlice(), func(mask socketmask.SocketMask) { + // Check to see if we have enough CPUs available on the current + // SocketMask to satisfy the CPU request. + numMatching := 0 + for _, c := range availableCPUs.ToSlice() { + if mask.IsSet(topo.CPUDetails[c].SocketID) { + numMatching++ + } + } + + // If we don't, then move onto the next combination. + if numMatching < request { + return + } + + // Otherwise, create a new hint from the SocketMask and add it to the + // list of hints. We set all hint preferences to 'false' on the first + // pass through. + hints = append(hints, topologymanager.TopologyHint{ + SocketAffinity: mask, + Preferred: false, + }) + + // Update minAffinity if relevant + if mask.IsNarrowerThan(minAffinity) { + minAffinity = mask + } + }) + + // Loop back through all hints and update the 'Preferred' field based on + // counting the number of bits sets in the affinity mask and comparing it + // to the minAffinity. Only those with an equal number of bits set will be + // considered preferred. + for i := range hints { + if hints[i].SocketAffinity.Count() == minAffinity.Count() { + hints[i].Preferred = true + } + } + + return hints +}