diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 0c82c833a30..90b214c55ab 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask" ) // PolicyStatic is the name of the static policy @@ -215,7 +216,12 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co } } - cpuset, err := p.allocateCPUs(s, numCPUs) + // Call Topology Manager to get the aligned socket affinity across all hint providers. + hint := p.affinity.GetAffinity(string(pod.UID), container.Name) + klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint) + + // Allocate CPUs according to the socket affinity contained in the hint. + cpuset, err := p.allocateCPUs(s, numCPUs, hint.SocketAffinity) if err != nil { klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err) return err @@ -244,12 +250,37 @@ func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr return nil } -func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) { - klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d)", numCPUs) - result, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs) +func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, socketmask socketmask.SocketMask) (cpuset.CPUSet, error) { + klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, socketmask) + + // If there are aligned CPUs in the socketmask, attempt to take those first. + result := cpuset.NewCPUSet() + if socketmask != nil { + alignedCPUs := cpuset.NewCPUSet() + for _, socketID := range socketmask.GetSockets() { + alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInSocket(socketID))) + } + + numAlignedToAlloc := alignedCPUs.Size() + if numCPUs < numAlignedToAlloc { + numAlignedToAlloc = numCPUs + } + + alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc) + if err != nil { + return cpuset.NewCPUSet(), err + } + + result = result.Union(alignedCPUs) + } + + // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. + remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } + result = result.Union(remainingCPUs) + // Remove allocated CPUs from the shared CPUSet. s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))