diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index ed16b0e032e..0cd987bf242 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -319,11 +319,13 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I machineInfo, cm.GetNodeAllocatableReservation(), nodeConfig.KubeletRootDir, + cm.topologyManager, ) if err != nil { klog.Errorf("failed to initialize cpu manager: %v", err) return nil, err } + cm.topologyManager.AddHintProvider(cm.cpuManager) } return cm, nil diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index 9982e73cbb9..ea0b7c2cc48 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -10,6 +10,7 @@ go_library( "policy.go", "policy_none.go", "policy_static.go", + "topology_hints.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager", visibility = ["//visibility:public"], @@ -18,6 +19,7 @@ go_library( "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", + "//pkg/kubelet/cm/topologymanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/status:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -43,6 +45,7 @@ go_test( "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", + "//pkg/kubelet/cm/topologymanager:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index f3503d6ba8c..06e268d04c3 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/status" ) @@ -64,6 +65,10 @@ type Manager interface { // State returns a read-only interface to the internal CPU manager state. State() state.Reader + + // GetTopologyHints implements the Topology Manager Interface and is + // consulted to make Topology aware resource alignments + GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint } type manager struct { @@ -97,7 +102,7 @@ type manager struct { var _ Manager = &manager{} // NewManager creates new cpu manager based on provided policy -func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string) (Manager, error) { +func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var policy Policy switch policyName(cpuPolicyName) { @@ -129,7 +134,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo // exclusively allocated. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000 numReservedCPUs := int(math.Ceil(reservedCPUsFloat)) - policy = NewStaticPolicy(topo, numReservedCPUs) + policy = NewStaticPolicy(topo, numReservedCPUs, affinity) default: return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index ea4144567cb..404ceece252 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) type mockState struct { @@ -195,7 +196,7 @@ func TestCPUManagerAdd(t *testing.T) { 2: {CoreID: 2, SocketID: 0}, 3: {CoreID: 3, SocketID: 0}, }, - }, 0) + }, 0, topologymanager.NewFakeManager()) testCases := []struct { description string updateErr error @@ -342,7 +343,7 @@ func TestCPUManagerGenerate(t *testing.T) { } defer os.RemoveAll(sDir) - mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir) + mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager()) if testCase.expectedError != nil { if !strings.Contains(err.Error(), testCase.expectedError.Error()) { t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error()) diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 8240bbd497d..7682895340f 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -20,6 +20,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/klog" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/status" ) @@ -46,6 +47,11 @@ func (m *fakeManager) RemoveContainer(containerID string) error { return nil } +func (m *fakeManager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint { + klog.Infof("[fake cpumanager] Get Topology Hints") + return []topologymanager.TopologyHint{} +} + func (m *fakeManager) State() state.Reader { return m.state } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index c374cb42e55..7200e0fcec6 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) // PolicyStatic is the name of the static policy @@ -77,6 +78,8 @@ type staticPolicy struct { // (pod, container) -> containerID // for all containers a pod containerMap containerMap + // topology manager reference to get container Topology affinity + affinity topologymanager.Store } // Ensure staticPolicy implements Policy interface @@ -85,7 +88,7 @@ var _ Policy = &staticPolicy{} // NewStaticPolicy returns a CPU manager policy that does not change CPU // assignments for exclusively pinned guaranteed containers after the main // container process starts. -func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy { +func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, affinity topologymanager.Store) Policy { allCPUs := topology.CPUDetails.CPUs() // takeByTopology allocates CPUs associated with low-numbered cores from // allCPUs. @@ -104,6 +107,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy topology: topology, reserved: reserved, containerMap: newContainerMap(), + affinity: affinity, } } diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 9103f9c0c85..ae35d9403c3 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) type staticPolicyTest struct { @@ -58,7 +59,7 @@ type staticPolicyMultiContainerTest struct { } func TestStaticPolicyName(t *testing.T) { - policy := NewStaticPolicy(topoSingleSocketHT, 1) + policy := NewStaticPolicy(topoSingleSocketHT, 1, topologymanager.NewFakeManager()) policyName := policy.Name() if policyName != "static" { @@ -135,7 +136,7 @@ func TestStaticPolicyStart(t *testing.T) { t.Error("expected panic doesn't occurred") } }() - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs).(*staticPolicy) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()).(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, @@ -419,7 +420,7 @@ func TestStaticPolicyAdd(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -632,7 +633,7 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, @@ -719,7 +720,7 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs) + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()) st := &mockState{ assignments: testCase.stAssignments, diff --git a/pkg/kubelet/cm/cpumanager/topology_hints.go b/pkg/kubelet/cm/cpumanager/topology_hints.go new file mode 100644 index 00000000000..aafb3b43aef --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/topology_hints.go @@ -0,0 +1,46 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "k8s.io/api/core/v1" + "k8s.io/klog" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" +) + +func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint { + var cpuHints []topologymanager.TopologyHint + if requestedObj, ok := container.Resources.Requests[v1.ResourceCPU]; ok { + // Get a count of how many CPUs have been requested + requested := int(requestedObj.Value()) + klog.Infof("[cpumanager] Guaranteed CPUs detected: %v", requested) + + // Discover topology in order to establish the number + // of available CPUs per socket. + _, err := topology.Discover(m.machineInfo) + if err != nil { + klog.Infof("[cpu manager] error discovering topology") + return nil + } + + // TODO: Fill in cpuHints with proper TopologyHints + } + klog.Infof("[cpumanager] Topology Hints for pod: %v", cpuHints) + return cpuHints +}