Add stub support for TopologyManager to CPUManager

Co-Authored-By: Louise Daly <louise.m.daly@intel.com>
This commit is contained in:
Conor Nolan 2019-08-07 13:32:15 +02:00 committed by Kevin Klues
parent f392229104
commit e33af11add
8 changed files with 78 additions and 10 deletions

View File

@ -319,11 +319,13 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.Errorf("failed to initialize cpu manager: %v", err)
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
}
return cm, nil

View File

@ -10,6 +10,7 @@ go_library(
"policy.go",
"policy_none.go",
"policy_static.go",
"topology_hints.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager",
visibility = ["//visibility:public"],
@ -18,6 +19,7 @@ go_library(
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -43,6 +45,7 @@ go_test(
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
)
@ -64,6 +65,10 @@ type Manager interface {
// State returns a read-only interface to the internal CPU manager state.
State() state.Reader
// GetTopologyHints implements the Topology Manager Interface and is
// consulted to make Topology aware resource alignments
GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint
}
type manager struct {
@ -97,7 +102,7 @@ type manager struct {
var _ Manager = &manager{}
// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string) (Manager, error) {
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var policy Policy
switch policyName(cpuPolicyName) {
@ -129,7 +134,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy = NewStaticPolicy(topo, numReservedCPUs)
policy = NewStaticPolicy(topo, numReservedCPUs, affinity)
default:
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
type mockState struct {
@ -195,7 +196,7 @@ func TestCPUManagerAdd(t *testing.T) {
2: {CoreID: 2, SocketID: 0},
3: {CoreID: 3, SocketID: 0},
},
}, 0)
}, 0, topologymanager.NewFakeManager())
testCases := []struct {
description string
updateErr error
@ -342,7 +343,7 @@ func TestCPUManagerGenerate(t *testing.T) {
}
defer os.RemoveAll(sDir)
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir)
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
if testCase.expectedError != nil {
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())

View File

@ -20,6 +20,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/status"
)
@ -46,6 +47,11 @@ func (m *fakeManager) RemoveContainer(containerID string) error {
return nil
}
func (m *fakeManager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint {
klog.Infof("[fake cpumanager] Get Topology Hints")
return []topologymanager.TopologyHint{}
}
func (m *fakeManager) State() state.Reader {
return m.state
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
// PolicyStatic is the name of the static policy
@ -77,6 +78,8 @@ type staticPolicy struct {
// (pod, container) -> containerID
// for all containers a pod
containerMap containerMap
// topology manager reference to get container Topology affinity
affinity topologymanager.Store
}
// Ensure staticPolicy implements Policy interface
@ -85,7 +88,7 @@ var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy {
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, affinity topologymanager.Store) Policy {
allCPUs := topology.CPUDetails.CPUs()
// takeByTopology allocates CPUs associated with low-numbered cores from
// allCPUs.
@ -104,6 +107,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy
topology: topology,
reserved: reserved,
containerMap: newContainerMap(),
affinity: affinity,
}
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
type staticPolicyTest struct {
@ -58,7 +59,7 @@ type staticPolicyMultiContainerTest struct {
}
func TestStaticPolicyName(t *testing.T) {
policy := NewStaticPolicy(topoSingleSocketHT, 1)
policy := NewStaticPolicy(topoSingleSocketHT, 1, topologymanager.NewFakeManager())
policyName := policy.Name()
if policyName != "static" {
@ -135,7 +136,7 @@ func TestStaticPolicyStart(t *testing.T) {
t.Error("expected panic doesn't occurred")
}
}()
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs).(*staticPolicy)
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()).(*staticPolicy)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
@ -419,7 +420,7 @@ func TestStaticPolicyAdd(t *testing.T) {
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,
@ -632,7 +633,7 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) {
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,
@ -719,7 +720,7 @@ func TestStaticPolicyRemove(t *testing.T) {
}
for _, testCase := range testCases {
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
st := &mockState{
assignments: testCase.stAssignments,

View File

@ -0,0 +1,46 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint {
var cpuHints []topologymanager.TopologyHint
if requestedObj, ok := container.Resources.Requests[v1.ResourceCPU]; ok {
// Get a count of how many CPUs have been requested
requested := int(requestedObj.Value())
klog.Infof("[cpumanager] Guaranteed CPUs detected: %v", requested)
// Discover topology in order to establish the number
// of available CPUs per socket.
_, err := topology.Discover(m.machineInfo)
if err != nil {
klog.Infof("[cpu manager] error discovering topology")
return nil
}
// TODO: Fill in cpuHints with proper TopologyHints
}
klog.Infof("[cpumanager] Topology Hints for pod: %v", cpuHints)
return cpuHints
}