diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 1fe4d94d4a8..f7bd53db6f8 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -312,11 +312,13 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I machineInfo, cm.GetNodeAllocatableReservation(), nodeConfig.KubeletRootDir, + cm.topologyManager, ) if err != nil { klog.Errorf("failed to initialize cpu manager: %v", err) return nil, err } + cm.topologyManager.AddHintProvider(cm.cpuManager) } return cm, nil diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index 9982e73cbb9..24cf06638f0 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -10,6 +10,7 @@ go_library( "policy.go", "policy_none.go", "policy_static.go", + "topology_hints.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager", visibility = ["//visibility:public"], @@ -18,6 +19,8 @@ go_library( "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", + "//pkg/kubelet/cm/topologymanager:go_default_library", + "//pkg/kubelet/cm/topologymanager/socketmask:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/status:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -37,12 +40,15 @@ go_test( "policy_none_test.go", "policy_static_test.go", "policy_test.go", + "topology_hints_test.go", ], embed = [":go_default_library"], deps = [ "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", + "//pkg/kubelet/cm/topologymanager:go_default_library", + "//pkg/kubelet/cm/topologymanager/socketmask:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index f3503d6ba8c..06e268d04c3 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/status" ) @@ -64,6 +65,10 @@ type Manager interface { // State returns a read-only interface to the internal CPU manager state. State() state.Reader + + // GetTopologyHints implements the Topology Manager Interface and is + // consulted to make Topology aware resource alignments + GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint } type manager struct { @@ -97,7 +102,7 @@ type manager struct { var _ Manager = &manager{} // NewManager creates new cpu manager based on provided policy -func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string) (Manager, error) { +func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var policy Policy switch policyName(cpuPolicyName) { @@ -129,7 +134,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo // exclusively allocated. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) - policy = NewStaticPolicy(topo, numReservedCPUs) + policy = NewStaticPolicy(topo, numReservedCPUs, affinity) default: return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index ea4144567cb..404ceece252 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) type mockState struct { @@ -195,7 +196,7 @@ func TestCPUManagerAdd(t *testing.T) { 2: {CoreID: 2, SocketID: 0}, 3: {CoreID: 3, SocketID: 0}, }, - }, 0) + }, 0, topologymanager.NewFakeManager()) testCases := []struct { description string updateErr error @@ -342,7 +343,7 @@ func TestCPUManagerGenerate(t *testing.T) { } defer os.RemoveAll(sDir) - mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir) + mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) if testCase.expectedError != nil { if !strings.Contains(err.Error(), testCase.expectedError.Error()) { t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 8240bbd497d..7682895340f 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -20,6 +20,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/klog" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/status" ) @@ -46,6 +47,11 @@ func (m *fakeManager) RemoveContainer(containerID string) error { return nil } +func (m *fakeManager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint { + klog.Infof("[fake cpumanager] Get Topology Hints") + return []topologymanager.TopologyHint{} +} + func (m *fakeManager) State() state.Reader { return m.state } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index c374cb42e55..90b214c55ab 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -25,6 +25,8 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask" ) // PolicyStatic is the name of the static policy @@ -77,6 +79,8 @@ type staticPolicy struct { // (pod, container) -> containerID // for all containers a pod containerMap containerMap + // topology manager reference to get container Topology affinity + affinity topologymanager.Store } // Ensure staticPolicy implements Policy interface @@ -85,7 +89,7 @@ var _ Policy = &staticPolicy{} // NewStaticPolicy returns a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. -func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy { +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, affinity topologymanager.Store) Policy { allCPUs := topology.CPUDetails.CPUs() // takeByTopology allocates CPUs associated with low-numbered cores from // allCPUs. @@ -104,6 +108,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy topology: topology, reserved: reserved, containerMap: newContainerMap(), + affinity: affinity, } } @@ -186,7 +191,7 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co } }() - if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 { + if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) // container belongs in an exclusively allocated pool @@ -211,7 +216,12 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co } } - cpuset, err := p.allocateCPUs(s, numCPUs) + // Call Topology Manager to get the aligned socket affinity across all hint providers. + hint := p.affinity.GetAffinity(string(pod.UID), container.Name) + klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint) + + // Allocate CPUs according to the socket affinity contained in the hint. + cpuset, err := p.allocateCPUs(s, numCPUs, hint.SocketAffinity) if err != nil { klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err) return err @@ -240,12 +250,37 @@ func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr return nil } -func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) { - klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d)", numCPUs) - result, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs) +func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, socketmask socketmask.SocketMask) (cpuset.CPUSet, error) { + klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, socketmask) + + // If there are aligned CPUs in the socketmask, attempt to take those first. + result := cpuset.NewCPUSet() + if socketmask != nil { + alignedCPUs := cpuset.NewCPUSet() + for _, socketID := range socketmask.GetSockets() { + alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInSocket(socketID))) + } + + numAlignedToAlloc := alignedCPUs.Size() + if numCPUs < numAlignedToAlloc { + numAlignedToAlloc = numCPUs + } + + alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc) + if err != nil { + return cpuset.NewCPUSet(), err + } + + result = result.Union(alignedCPUs) + } + + // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. + remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } + result = result.Union(remainingCPUs) + // Remove allocated CPUs from the shared CPUSet. s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result)) @@ -253,7 +288,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, return result, nil } -func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { +func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return 0 } diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 9103f9c0c85..f4500c7dd01 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -25,6 +25,8 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask" ) type staticPolicyTest struct { @@ -58,7 +60,7 @@ type staticPolicyMultiContainerTest struct { } func TestStaticPolicyName(t *testing.T) { - policy := NewStaticPolicy(topoSingleSocketHT, 1) + policy := NewStaticPolicy(topoSingleSocketHT, 1, topologymanager.NewFakeManager()) policyName := policy.Name() if policyName != "static" { @@ -135,7 +137,7 @@ func TestStaticPolicyStart(t *testing.T) { t.Error("expected panic doesn't occurred") } }() - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs).(*staticPolicy) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()).(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, @@ -419,7 +421,7 @@ func TestStaticPolicyAdd(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -632,7 +634,7 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -719,7 +721,7 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -739,3 +741,93 @@ func TestStaticPolicyRemove(t *testing.T) { } } } + +func TestTopologyAwareAllocateCPUs(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + stAssignments state.ContainerCPUAssignments + stDefaultCPUSet cpuset.CPUSet + numRequested int + socketMask socketmask.SocketMask + expCSet cpuset.CPUSet + }{ + { + description: "Request 2 CPUs, No SocketMask", + topo: topoDualSocketHT, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + numRequested: 2, + socketMask: nil, + expCSet: cpuset.NewCPUSet(0, 6), + }, + { + description: "Request 2 CPUs, SocketMask on Socket 0", + topo: topoDualSocketHT, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + numRequested: 2, + socketMask: func() socketmask.SocketMask { + mask, _ := socketmask.NewSocketMask(0) + return mask + }(), + expCSet: cpuset.NewCPUSet(0, 6), + }, + { + description: "Request 2 CPUs, SocketMask on Socket 1", + topo: topoDualSocketHT, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + numRequested: 2, + socketMask: func() socketmask.SocketMask { + mask, _ := socketmask.NewSocketMask(1) + return mask + }(), + expCSet: cpuset.NewCPUSet(1, 7), + }, + { + description: "Request 8 CPUs, SocketMask on Socket 0", + topo: topoDualSocketHT, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + numRequested: 8, + socketMask: func() socketmask.SocketMask { + mask, _ := socketmask.NewSocketMask(0) + return mask + }(), + expCSet: cpuset.NewCPUSet(0, 6, 2, 8, 4, 10, 1, 7), + }, + { + description: "Request 8 CPUs, SocketMask on Socket 1", + topo: topoDualSocketHT, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + numRequested: 8, + socketMask: func() socketmask.SocketMask { + mask, _ := socketmask.NewSocketMask(1) + return mask + }(), + expCSet: cpuset.NewCPUSet(1, 7, 3, 9, 5, 11, 0, 6), + }, + } + for _, tc := range testCases { + policy := NewStaticPolicy(tc.topo, 0, topologymanager.NewFakeManager()).(*staticPolicy) + st := &mockState{ + assignments: tc.stAssignments, + defaultCPUSet: tc.stDefaultCPUSet, + } + policy.Start(st) + + cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask) + if err != nil { + t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v", + tc.description, tc.expCSet, err) + continue + } + + if !reflect.DeepEqual(tc.expCSet, cset) { + t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v", + tc.description, tc.expCSet, cset) + } + } +} diff --git a/pkg/kubelet/cm/cpumanager/topology_hints.go b/pkg/kubelet/cm/cpumanager/topology_hints.go new file mode 100644 index 00000000000..f4be9d6d3e6 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/topology_hints.go @@ -0,0 +1,127 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "k8s.io/api/core/v1" + "k8s.io/klog" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask" +) + +func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint { + // The 'none' policy does not generate topology hints. + if m.policy.Name() == string(PolicyNone) { + return nil + } + + // For all other policies, if there are no CPU resources requested for this + // container, we do not generate any topology hints. + if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok { + return nil + } + + // Otherwise, attempt to generate TopologyHints for the CPUManager. + // For now, this implementation assumes the 'static' CPUManager policy. + // TODO: Generalize this so that its applicable to future CPUManager polices. + + // Get a count of how many guaranteed CPUs have been requested. + requested := m.policy.(*staticPolicy).guaranteedCPUs(&pod, &container) + + // If there are no guaranteed CPUs being requested, we do not generate + // any topology hints. This can happen, for example, because init + // containers don't have to have guaranteed CPUs in order for the pod + // to still be in the Guaranteed QOS tier. + if requested == 0 { + return nil + } + + // Get a list of available CPUs. + available := m.policy.(*staticPolicy).assignableCPUs(m.state) + + // Generate hints. + cpuHints := m.generateCPUTopologyHints(available, requested) + klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", pod.Name, container.Name, cpuHints) + + return cpuHints +} + +// generateCPUtopologyHints generates a set of TopologyHints given the set of +// available CPUs and the number of CPUs being requested. +// +// It follows the convention of marking all hints that have the same number of +// bits set as the narrowest matching SocketAffinity with 'Preferred: true', and +// marking all others with 'Preferred: false'. +func (m *manager) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, request int) []topologymanager.TopologyHint { + // Discover topology in order to establish the number + // of available CPUs per socket. + topo, err := topology.Discover(m.machineInfo) + if err != nil { + klog.Warningf("[cpu manager] Error discovering topology for TopologyHint generation") + return nil + } + + // Initialize minAffinity to a full affinity mask. + minAffinity, _ := socketmask.NewSocketMask() + minAffinity.Fill() + + // Iterate through all combinations of socketMasks and build hints from them. + hints := []topologymanager.TopologyHint{} + socketmask.IterateSocketMasks(topo.CPUDetails.Sockets().ToSlice(), func(mask socketmask.SocketMask) { + // Check to see if we have enough CPUs available on the current + // SocketMask to satisfy the CPU request. + numMatching := 0 + for _, c := range availableCPUs.ToSlice() { + if mask.IsSet(topo.CPUDetails[c].SocketID) { + numMatching++ + } + } + + // If we don't, then move onto the next combination. + if numMatching < request { + return + } + + // Otherwise, create a new hint from the SocketMask and add it to the + // list of hints. We set all hint preferences to 'false' on the first + // pass through. + hints = append(hints, topologymanager.TopologyHint{ + SocketAffinity: mask, + Preferred: false, + }) + + // Update minAffinity if relevant + if mask.IsNarrowerThan(minAffinity) { + minAffinity = mask + } + }) + + // Loop back through all hints and update the 'Preferred' field based on + // counting the number of bits sets in the affinity mask and comparing it + // to the minAffinity. Only those with an equal number of bits set will be + // considered preferred. + for i := range hints { + if hints[i].SocketAffinity.Count() == minAffinity.Count() { + hints[i].Preferred = true + } + } + + return hints +} diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go new file mode 100644 index 00000000000..3b74a9247ed --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "reflect" + "sort" + "testing" + + cadvisorapi "github.com/google/cadvisor/info/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask" +) + +func topologyHintLessThan(a topologymanager.TopologyHint, b topologymanager.TopologyHint) bool { + if a.Preferred != b.Preferred { + return a.Preferred == true + } + return a.SocketAffinity.IsNarrowerThan(b.SocketAffinity) +} + +func TestGetTopologyHints(t *testing.T) { + testPod1 := makePod("2", "2") + testContainer1 := &testPod1.Spec.Containers[0] + testPod2 := makePod("5", "5") + testContainer2 := &testPod2.Spec.Containers[0] + testPod3 := makePod("7", "7") + testContainer3 := &testPod3.Spec.Containers[0] + testPod4 := makePod("11", "11") + testContainer4 := &testPod4.Spec.Containers[0] + + firstSocketMask, _ := socketmask.NewSocketMask(0) + secondSocketMask, _ := socketmask.NewSocketMask(1) + crossSocketMask, _ := socketmask.NewSocketMask(0, 1) + + m := manager{ + policy: &staticPolicy{}, + machineInfo: &cadvisorapi.MachineInfo{ + NumCores: 12, + Topology: []cadvisorapi.Node{ + {Id: 0, + Cores: []cadvisorapi.Core{ + {Id: 0, Threads: []int{0, 6}}, + {Id: 1, Threads: []int{1, 7}}, + {Id: 2, Threads: []int{2, 8}}, + }, + }, + {Id: 1, + Cores: []cadvisorapi.Core{ + {Id: 0, Threads: []int{3, 9}}, + {Id: 1, Threads: []int{4, 10}}, + {Id: 2, Threads: []int{5, 11}}, + }, + }, + }, + }, + state: &mockState{ + defaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + }, + } + + tcases := []struct { + name string + pod v1.Pod + container v1.Container + expectedHints []topologymanager.TopologyHint + }{ + { + name: "Request 2 CPUs; 4 available on Socket 0, 6 available on Socket 1", + pod: *testPod1, + container: *testContainer1, + expectedHints: []topologymanager.TopologyHint{ + { + SocketAffinity: firstSocketMask, + Preferred: true, + }, + { + SocketAffinity: secondSocketMask, + Preferred: true, + }, + { + SocketAffinity: crossSocketMask, + Preferred: false, + }, + }, + }, + { + name: "Request 5 CPUs; 4 available on Socket 0, 6 available on Socket 1", + pod: *testPod2, + container: *testContainer2, + expectedHints: []topologymanager.TopologyHint{ + { + SocketAffinity: secondSocketMask, + Preferred: true, + }, + { + SocketAffinity: crossSocketMask, + Preferred: false, + }, + }, + }, + { + name: "Request 7 CPUs, 4 available on Socket 0, 6 available on Socket 1", + pod: *testPod3, + container: *testContainer3, + expectedHints: []topologymanager.TopologyHint{ + { + SocketAffinity: crossSocketMask, + Preferred: true, + }, + }, + }, + { + name: "Request 11 CPUs, 4 available on Socket 0, 6 available on Socket 1", + pod: *testPod4, + container: *testContainer4, + expectedHints: []topologymanager.TopologyHint{}, + }, + } + for _, tc := range tcases { + hints := m.GetTopologyHints(tc.pod, tc.container) + if len(tc.expectedHints) == 0 && len(hints) == 0 { + continue + } + sort.SliceStable(hints, func(i, j int) bool { + return topologyHintLessThan(hints[i], hints[j]) + }) + sort.SliceStable(tc.expectedHints, func(i, j int) bool { + return topologyHintLessThan(tc.expectedHints[i], tc.expectedHints[j]) + }) + if !reflect.DeepEqual(tc.expectedHints, hints) { + t.Errorf("Expected in result to be %v , got %v", tc.expectedHints, hints) + } + + } +} diff --git a/pkg/kubelet/cm/topologymanager/socketmask/socketmask.go b/pkg/kubelet/cm/topologymanager/socketmask/socketmask.go index 016725a46d0..8befaec0bce 100644 --- a/pkg/kubelet/cm/topologymanager/socketmask/socketmask.go +++ b/pkg/kubelet/cm/topologymanager/socketmask/socketmask.go @@ -172,3 +172,23 @@ func Or(first SocketMask, masks ...SocketMask) SocketMask { s.Or(masks...) return &s } + +// IterateSocketMasks iterates all possible masks from a list of sockets, +// issuing a callback on each mask. +func IterateSocketMasks(sockets []int, callback func(SocketMask)) { + var iterate func(sockets, accum []int, size int) + iterate = func(sockets, accum []int, size int) { + if len(accum) == size { + mask, _ := NewSocketMask(accum...) + callback(mask) + return + } + for i := range sockets { + iterate(sockets[i+1:], append(accum, sockets[i]), size) + } + } + + for i := 1; i <= len(sockets); i++ { + iterate(sockets, []int{}, i) + } +} diff --git a/pkg/kubelet/cm/topologymanager/socketmask/socketmask_test.go b/pkg/kubelet/cm/topologymanager/socketmask/socketmask_test.go index 2ff720be0db..38424890896 100644 --- a/pkg/kubelet/cm/topologymanager/socketmask/socketmask_test.go +++ b/pkg/kubelet/cm/topologymanager/socketmask/socketmask_test.go @@ -342,3 +342,54 @@ func TestIsNarrowerThan(t *testing.T) { } } } + +func TestIterateSocketMasks(t *testing.T) { + tcases := []struct { + name string + numSockets int + }{ + { + name: "1 Socket", + numSockets: 1, + }, + { + name: "2 Sockets", + numSockets: 2, + }, + { + name: "4 Sockets", + numSockets: 4, + }, + { + name: "8 Sockets", + numSockets: 8, + }, + { + name: "16 Sockets", + numSockets: 16, + }, + } + for _, tc := range tcases { + // Generate a list of sockets from tc.numSockets. + var sockets []int + for i := 0; i < tc.numSockets; i++ { + sockets = append(sockets, i) + } + + // Calculate the expected number of masks. Since we always have masks + // with sockets from 0..n, this is just (2^n - 1) since we want 1 mask + // represented by each integer between 1 and 2^n-1. + expectedNumMasks := (1 << uint(tc.numSockets)) - 1 + + // Iterate all masks and count them. + numMasks := 0 + IterateSocketMasks(sockets, func(SocketMask) { + numMasks++ + }) + + // Compare the number of masks generated to the expected amount. + if expectedNumMasks != numMasks { + t.Errorf("Expected to iterate %v masks, got %v", expectedNumMasks, numMasks) + } + } +}