mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #73920 from nolancon/topology-manager-cpu-manager
Changes to make CPU Manager a Hint Provider for Topology Manager
This commit is contained in:
commit
f2dd24820a
@ -312,11 +312,13 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
|
||||
machineInfo,
|
||||
cm.GetNodeAllocatableReservation(),
|
||||
nodeConfig.KubeletRootDir,
|
||||
cm.topologyManager,
|
||||
)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to initialize cpu manager: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
cm.topologyManager.AddHintProvider(cm.cpuManager)
|
||||
}
|
||||
|
||||
return cm, nil
|
||||
|
@ -10,6 +10,7 @@ go_library(
|
||||
"policy.go",
|
||||
"policy_none.go",
|
||||
"policy_static.go",
|
||||
"topology_hints.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager",
|
||||
visibility = ["//visibility:public"],
|
||||
@ -18,6 +19,8 @@ go_library(
|
||||
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
|
||||
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
|
||||
"//pkg/kubelet/cm/cpuset:go_default_library",
|
||||
"//pkg/kubelet/cm/topologymanager:go_default_library",
|
||||
"//pkg/kubelet/cm/topologymanager/socketmask:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/status:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
@ -37,12 +40,15 @@ go_test(
|
||||
"policy_none_test.go",
|
||||
"policy_static_test.go",
|
||||
"policy_test.go",
|
||||
"topology_hints_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
|
||||
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
|
||||
"//pkg/kubelet/cm/cpuset:go_default_library",
|
||||
"//pkg/kubelet/cm/topologymanager:go_default_library",
|
||||
"//pkg/kubelet/cm/topologymanager/socketmask:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
)
|
||||
@ -64,6 +65,10 @@ type Manager interface {
|
||||
|
||||
// State returns a read-only interface to the internal CPU manager state.
|
||||
State() state.Reader
|
||||
|
||||
// GetTopologyHints implements the Topology Manager Interface and is
|
||||
// consulted to make Topology aware resource alignments
|
||||
GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint
|
||||
}
|
||||
|
||||
type manager struct {
|
||||
@ -97,7 +102,7 @@ type manager struct {
|
||||
var _ Manager = &manager{}
|
||||
|
||||
// NewManager creates new cpu manager based on provided policy
|
||||
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string) (Manager, error) {
|
||||
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
|
||||
var policy Policy
|
||||
|
||||
switch policyName(cpuPolicyName) {
|
||||
@ -129,7 +134,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
|
||||
// exclusively allocated.
|
||||
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
|
||||
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
|
||||
policy = NewStaticPolicy(topo, numReservedCPUs)
|
||||
policy = NewStaticPolicy(topo, numReservedCPUs, affinity)
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
)
|
||||
|
||||
type mockState struct {
|
||||
@ -195,7 +196,7 @@ func TestCPUManagerAdd(t *testing.T) {
|
||||
2: {CoreID: 2, SocketID: 0},
|
||||
3: {CoreID: 3, SocketID: 0},
|
||||
},
|
||||
}, 0)
|
||||
}, 0, topologymanager.NewFakeManager())
|
||||
testCases := []struct {
|
||||
description string
|
||||
updateErr error
|
||||
@ -342,7 +343,7 @@ func TestCPUManagerGenerate(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(sDir)
|
||||
|
||||
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir)
|
||||
mgr, err := NewManager(testCase.cpuPolicyName, 5*time.Second, machineInfo, testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
|
||||
if testCase.expectedError != nil {
|
||||
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
|
||||
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
)
|
||||
|
||||
@ -46,6 +47,11 @@ func (m *fakeManager) RemoveContainer(containerID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeManager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint {
|
||||
klog.Infof("[fake cpumanager] Get Topology Hints")
|
||||
return []topologymanager.TopologyHint{}
|
||||
}
|
||||
|
||||
func (m *fakeManager) State() state.Reader {
|
||||
return m.state
|
||||
}
|
||||
|
@ -25,6 +25,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask"
|
||||
)
|
||||
|
||||
// PolicyStatic is the name of the static policy
|
||||
@ -77,6 +79,8 @@ type staticPolicy struct {
|
||||
// (pod, container) -> containerID
|
||||
// for all containers a pod
|
||||
containerMap containerMap
|
||||
// topology manager reference to get container Topology affinity
|
||||
affinity topologymanager.Store
|
||||
}
|
||||
|
||||
// Ensure staticPolicy implements Policy interface
|
||||
@ -85,7 +89,7 @@ var _ Policy = &staticPolicy{}
|
||||
// NewStaticPolicy returns a CPU manager policy that does not change CPU
|
||||
// assignments for exclusively pinned guaranteed containers after the main
|
||||
// container process starts.
|
||||
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy {
|
||||
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, affinity topologymanager.Store) Policy {
|
||||
allCPUs := topology.CPUDetails.CPUs()
|
||||
// takeByTopology allocates CPUs associated with low-numbered cores from
|
||||
// allCPUs.
|
||||
@ -104,6 +108,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int) Policy
|
||||
topology: topology,
|
||||
reserved: reserved,
|
||||
containerMap: newContainerMap(),
|
||||
affinity: affinity,
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,7 +191,7 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
|
||||
}
|
||||
}()
|
||||
|
||||
if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 {
|
||||
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
|
||||
klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
|
||||
// container belongs in an exclusively allocated pool
|
||||
|
||||
@ -211,7 +216,12 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co
|
||||
}
|
||||
}
|
||||
|
||||
cpuset, err := p.allocateCPUs(s, numCPUs)
|
||||
// Call Topology Manager to get the aligned socket affinity across all hint providers.
|
||||
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
|
||||
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
|
||||
|
||||
// Allocate CPUs according to the socket affinity contained in the hint.
|
||||
cpuset, err := p.allocateCPUs(s, numCPUs, hint.SocketAffinity)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)
|
||||
return err
|
||||
@ -240,12 +250,37 @@ func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) {
|
||||
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d)", numCPUs)
|
||||
result, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs)
|
||||
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, socketmask socketmask.SocketMask) (cpuset.CPUSet, error) {
|
||||
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, socketmask)
|
||||
|
||||
// If there are aligned CPUs in the socketmask, attempt to take those first.
|
||||
result := cpuset.NewCPUSet()
|
||||
if socketmask != nil {
|
||||
alignedCPUs := cpuset.NewCPUSet()
|
||||
for _, socketID := range socketmask.GetSockets() {
|
||||
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInSocket(socketID)))
|
||||
}
|
||||
|
||||
numAlignedToAlloc := alignedCPUs.Size()
|
||||
if numCPUs < numAlignedToAlloc {
|
||||
numAlignedToAlloc = numCPUs
|
||||
}
|
||||
|
||||
alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc)
|
||||
if err != nil {
|
||||
return cpuset.NewCPUSet(), err
|
||||
}
|
||||
|
||||
result = result.Union(alignedCPUs)
|
||||
}
|
||||
|
||||
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
|
||||
remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size())
|
||||
if err != nil {
|
||||
return cpuset.NewCPUSet(), err
|
||||
}
|
||||
result = result.Union(remainingCPUs)
|
||||
|
||||
// Remove allocated CPUs from the shared CPUSet.
|
||||
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
|
||||
|
||||
@ -253,7 +288,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet,
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
|
||||
func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
|
||||
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
|
||||
return 0
|
||||
}
|
||||
|
@ -25,6 +25,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask"
|
||||
)
|
||||
|
||||
type staticPolicyTest struct {
|
||||
@ -58,7 +60,7 @@ type staticPolicyMultiContainerTest struct {
|
||||
}
|
||||
|
||||
func TestStaticPolicyName(t *testing.T) {
|
||||
policy := NewStaticPolicy(topoSingleSocketHT, 1)
|
||||
policy := NewStaticPolicy(topoSingleSocketHT, 1, topologymanager.NewFakeManager())
|
||||
|
||||
policyName := policy.Name()
|
||||
if policyName != "static" {
|
||||
@ -135,7 +137,7 @@ func TestStaticPolicyStart(t *testing.T) {
|
||||
t.Error("expected panic doesn't occurred")
|
||||
}
|
||||
}()
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs).(*staticPolicy)
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager()).(*staticPolicy)
|
||||
st := &mockState{
|
||||
assignments: testCase.stAssignments,
|
||||
defaultCPUSet: testCase.stDefaultCPUSet,
|
||||
@ -419,7 +421,7 @@ func TestStaticPolicyAdd(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
|
||||
|
||||
st := &mockState{
|
||||
assignments: testCase.stAssignments,
|
||||
@ -632,7 +634,7 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
|
||||
|
||||
st := &mockState{
|
||||
assignments: testCase.stAssignments,
|
||||
@ -719,7 +721,7 @@ func TestStaticPolicyRemove(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs)
|
||||
policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, topologymanager.NewFakeManager())
|
||||
|
||||
st := &mockState{
|
||||
assignments: testCase.stAssignments,
|
||||
@ -739,3 +741,93 @@ func TestStaticPolicyRemove(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTopologyAwareAllocateCPUs(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
topo *topology.CPUTopology
|
||||
stAssignments state.ContainerCPUAssignments
|
||||
stDefaultCPUSet cpuset.CPUSet
|
||||
numRequested int
|
||||
socketMask socketmask.SocketMask
|
||||
expCSet cpuset.CPUSet
|
||||
}{
|
||||
{
|
||||
description: "Request 2 CPUs, No SocketMask",
|
||||
topo: topoDualSocketHT,
|
||||
stAssignments: state.ContainerCPUAssignments{},
|
||||
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
numRequested: 2,
|
||||
socketMask: nil,
|
||||
expCSet: cpuset.NewCPUSet(0, 6),
|
||||
},
|
||||
{
|
||||
description: "Request 2 CPUs, SocketMask on Socket 0",
|
||||
topo: topoDualSocketHT,
|
||||
stAssignments: state.ContainerCPUAssignments{},
|
||||
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
numRequested: 2,
|
||||
socketMask: func() socketmask.SocketMask {
|
||||
mask, _ := socketmask.NewSocketMask(0)
|
||||
return mask
|
||||
}(),
|
||||
expCSet: cpuset.NewCPUSet(0, 6),
|
||||
},
|
||||
{
|
||||
description: "Request 2 CPUs, SocketMask on Socket 1",
|
||||
topo: topoDualSocketHT,
|
||||
stAssignments: state.ContainerCPUAssignments{},
|
||||
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
numRequested: 2,
|
||||
socketMask: func() socketmask.SocketMask {
|
||||
mask, _ := socketmask.NewSocketMask(1)
|
||||
return mask
|
||||
}(),
|
||||
expCSet: cpuset.NewCPUSet(1, 7),
|
||||
},
|
||||
{
|
||||
description: "Request 8 CPUs, SocketMask on Socket 0",
|
||||
topo: topoDualSocketHT,
|
||||
stAssignments: state.ContainerCPUAssignments{},
|
||||
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
numRequested: 8,
|
||||
socketMask: func() socketmask.SocketMask {
|
||||
mask, _ := socketmask.NewSocketMask(0)
|
||||
return mask
|
||||
}(),
|
||||
expCSet: cpuset.NewCPUSet(0, 6, 2, 8, 4, 10, 1, 7),
|
||||
},
|
||||
{
|
||||
description: "Request 8 CPUs, SocketMask on Socket 1",
|
||||
topo: topoDualSocketHT,
|
||||
stAssignments: state.ContainerCPUAssignments{},
|
||||
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
numRequested: 8,
|
||||
socketMask: func() socketmask.SocketMask {
|
||||
mask, _ := socketmask.NewSocketMask(1)
|
||||
return mask
|
||||
}(),
|
||||
expCSet: cpuset.NewCPUSet(1, 7, 3, 9, 5, 11, 0, 6),
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
policy := NewStaticPolicy(tc.topo, 0, topologymanager.NewFakeManager()).(*staticPolicy)
|
||||
st := &mockState{
|
||||
assignments: tc.stAssignments,
|
||||
defaultCPUSet: tc.stDefaultCPUSet,
|
||||
}
|
||||
policy.Start(st)
|
||||
|
||||
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask)
|
||||
if err != nil {
|
||||
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
|
||||
tc.description, tc.expCSet, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tc.expCSet, cset) {
|
||||
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v",
|
||||
tc.description, tc.expCSet, cset)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
127
pkg/kubelet/cm/cpumanager/topology_hints.go
Normal file
127
pkg/kubelet/cm/cpumanager/topology_hints.go
Normal file
@ -0,0 +1,127 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cpumanager
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask"
|
||||
)
|
||||
|
||||
func (m *manager) GetTopologyHints(pod v1.Pod, container v1.Container) []topologymanager.TopologyHint {
|
||||
// The 'none' policy does not generate topology hints.
|
||||
if m.policy.Name() == string(PolicyNone) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// For all other policies, if there are no CPU resources requested for this
|
||||
// container, we do not generate any topology hints.
|
||||
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Otherwise, attempt to generate TopologyHints for the CPUManager.
|
||||
// For now, this implementation assumes the 'static' CPUManager policy.
|
||||
// TODO: Generalize this so that its applicable to future CPUManager polices.
|
||||
|
||||
// Get a count of how many guaranteed CPUs have been requested.
|
||||
requested := m.policy.(*staticPolicy).guaranteedCPUs(&pod, &container)
|
||||
|
||||
// If there are no guaranteed CPUs being requested, we do not generate
|
||||
// any topology hints. This can happen, for example, because init
|
||||
// containers don't have to have guaranteed CPUs in order for the pod
|
||||
// to still be in the Guaranteed QOS tier.
|
||||
if requested == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get a list of available CPUs.
|
||||
available := m.policy.(*staticPolicy).assignableCPUs(m.state)
|
||||
|
||||
// Generate hints.
|
||||
cpuHints := m.generateCPUTopologyHints(available, requested)
|
||||
klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", pod.Name, container.Name, cpuHints)
|
||||
|
||||
return cpuHints
|
||||
}
|
||||
|
||||
// generateCPUtopologyHints generates a set of TopologyHints given the set of
|
||||
// available CPUs and the number of CPUs being requested.
|
||||
//
|
||||
// It follows the convention of marking all hints that have the same number of
|
||||
// bits set as the narrowest matching SocketAffinity with 'Preferred: true', and
|
||||
// marking all others with 'Preferred: false'.
|
||||
func (m *manager) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, request int) []topologymanager.TopologyHint {
|
||||
// Discover topology in order to establish the number
|
||||
// of available CPUs per socket.
|
||||
topo, err := topology.Discover(m.machineInfo)
|
||||
if err != nil {
|
||||
klog.Warningf("[cpu manager] Error discovering topology for TopologyHint generation")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initialize minAffinity to a full affinity mask.
|
||||
minAffinity, _ := socketmask.NewSocketMask()
|
||||
minAffinity.Fill()
|
||||
|
||||
// Iterate through all combinations of socketMasks and build hints from them.
|
||||
hints := []topologymanager.TopologyHint{}
|
||||
socketmask.IterateSocketMasks(topo.CPUDetails.Sockets().ToSlice(), func(mask socketmask.SocketMask) {
|
||||
// Check to see if we have enough CPUs available on the current
|
||||
// SocketMask to satisfy the CPU request.
|
||||
numMatching := 0
|
||||
for _, c := range availableCPUs.ToSlice() {
|
||||
if mask.IsSet(topo.CPUDetails[c].SocketID) {
|
||||
numMatching++
|
||||
}
|
||||
}
|
||||
|
||||
// If we don't, then move onto the next combination.
|
||||
if numMatching < request {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, create a new hint from the SocketMask and add it to the
|
||||
// list of hints. We set all hint preferences to 'false' on the first
|
||||
// pass through.
|
||||
hints = append(hints, topologymanager.TopologyHint{
|
||||
SocketAffinity: mask,
|
||||
Preferred: false,
|
||||
})
|
||||
|
||||
// Update minAffinity if relevant
|
||||
if mask.IsNarrowerThan(minAffinity) {
|
||||
minAffinity = mask
|
||||
}
|
||||
})
|
||||
|
||||
// Loop back through all hints and update the 'Preferred' field based on
|
||||
// counting the number of bits sets in the affinity mask and comparing it
|
||||
// to the minAffinity. Only those with an equal number of bits set will be
|
||||
// considered preferred.
|
||||
for i := range hints {
|
||||
if hints[i].SocketAffinity.Count() == minAffinity.Count() {
|
||||
hints[i].Preferred = true
|
||||
}
|
||||
}
|
||||
|
||||
return hints
|
||||
}
|
152
pkg/kubelet/cm/cpumanager/topology_hints_test.go
Normal file
152
pkg/kubelet/cm/cpumanager/topology_hints_test.go
Normal file
@ -0,0 +1,152 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cpumanager
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/socketmask"
|
||||
)
|
||||
|
||||
func topologyHintLessThan(a topologymanager.TopologyHint, b topologymanager.TopologyHint) bool {
|
||||
if a.Preferred != b.Preferred {
|
||||
return a.Preferred == true
|
||||
}
|
||||
return a.SocketAffinity.IsNarrowerThan(b.SocketAffinity)
|
||||
}
|
||||
|
||||
func TestGetTopologyHints(t *testing.T) {
|
||||
testPod1 := makePod("2", "2")
|
||||
testContainer1 := &testPod1.Spec.Containers[0]
|
||||
testPod2 := makePod("5", "5")
|
||||
testContainer2 := &testPod2.Spec.Containers[0]
|
||||
testPod3 := makePod("7", "7")
|
||||
testContainer3 := &testPod3.Spec.Containers[0]
|
||||
testPod4 := makePod("11", "11")
|
||||
testContainer4 := &testPod4.Spec.Containers[0]
|
||||
|
||||
firstSocketMask, _ := socketmask.NewSocketMask(0)
|
||||
secondSocketMask, _ := socketmask.NewSocketMask(1)
|
||||
crossSocketMask, _ := socketmask.NewSocketMask(0, 1)
|
||||
|
||||
m := manager{
|
||||
policy: &staticPolicy{},
|
||||
machineInfo: &cadvisorapi.MachineInfo{
|
||||
NumCores: 12,
|
||||
Topology: []cadvisorapi.Node{
|
||||
{Id: 0,
|
||||
Cores: []cadvisorapi.Core{
|
||||
{Id: 0, Threads: []int{0, 6}},
|
||||
{Id: 1, Threads: []int{1, 7}},
|
||||
{Id: 2, Threads: []int{2, 8}},
|
||||
},
|
||||
},
|
||||
{Id: 1,
|
||||
Cores: []cadvisorapi.Core{
|
||||
{Id: 0, Threads: []int{3, 9}},
|
||||
{Id: 1, Threads: []int{4, 10}},
|
||||
{Id: 2, Threads: []int{5, 11}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
state: &mockState{
|
||||
defaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
|
||||
},
|
||||
}
|
||||
|
||||
tcases := []struct {
|
||||
name string
|
||||
pod v1.Pod
|
||||
container v1.Container
|
||||
expectedHints []topologymanager.TopologyHint
|
||||
}{
|
||||
{
|
||||
name: "Request 2 CPUs; 4 available on Socket 0, 6 available on Socket 1",
|
||||
pod: *testPod1,
|
||||
container: *testContainer1,
|
||||
expectedHints: []topologymanager.TopologyHint{
|
||||
{
|
||||
SocketAffinity: firstSocketMask,
|
||||
Preferred: true,
|
||||
},
|
||||
{
|
||||
SocketAffinity: secondSocketMask,
|
||||
Preferred: true,
|
||||
},
|
||||
{
|
||||
SocketAffinity: crossSocketMask,
|
||||
Preferred: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Request 5 CPUs; 4 available on Socket 0, 6 available on Socket 1",
|
||||
pod: *testPod2,
|
||||
container: *testContainer2,
|
||||
expectedHints: []topologymanager.TopologyHint{
|
||||
{
|
||||
SocketAffinity: secondSocketMask,
|
||||
Preferred: true,
|
||||
},
|
||||
{
|
||||
SocketAffinity: crossSocketMask,
|
||||
Preferred: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Request 7 CPUs, 4 available on Socket 0, 6 available on Socket 1",
|
||||
pod: *testPod3,
|
||||
container: *testContainer3,
|
||||
expectedHints: []topologymanager.TopologyHint{
|
||||
{
|
||||
SocketAffinity: crossSocketMask,
|
||||
Preferred: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Request 11 CPUs, 4 available on Socket 0, 6 available on Socket 1",
|
||||
pod: *testPod4,
|
||||
container: *testContainer4,
|
||||
expectedHints: []topologymanager.TopologyHint{},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcases {
|
||||
hints := m.GetTopologyHints(tc.pod, tc.container)
|
||||
if len(tc.expectedHints) == 0 && len(hints) == 0 {
|
||||
continue
|
||||
}
|
||||
sort.SliceStable(hints, func(i, j int) bool {
|
||||
return topologyHintLessThan(hints[i], hints[j])
|
||||
})
|
||||
sort.SliceStable(tc.expectedHints, func(i, j int) bool {
|
||||
return topologyHintLessThan(tc.expectedHints[i], tc.expectedHints[j])
|
||||
})
|
||||
if !reflect.DeepEqual(tc.expectedHints, hints) {
|
||||
t.Errorf("Expected in result to be %v , got %v", tc.expectedHints, hints)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -172,3 +172,23 @@ func Or(first SocketMask, masks ...SocketMask) SocketMask {
|
||||
s.Or(masks...)
|
||||
return &s
|
||||
}
|
||||
|
||||
// IterateSocketMasks iterates all possible masks from a list of sockets,
|
||||
// issuing a callback on each mask.
|
||||
func IterateSocketMasks(sockets []int, callback func(SocketMask)) {
|
||||
var iterate func(sockets, accum []int, size int)
|
||||
iterate = func(sockets, accum []int, size int) {
|
||||
if len(accum) == size {
|
||||
mask, _ := NewSocketMask(accum...)
|
||||
callback(mask)
|
||||
return
|
||||
}
|
||||
for i := range sockets {
|
||||
iterate(sockets[i+1:], append(accum, sockets[i]), size)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 1; i <= len(sockets); i++ {
|
||||
iterate(sockets, []int{}, i)
|
||||
}
|
||||
}
|
||||
|
@ -342,3 +342,54 @@ func TestIsNarrowerThan(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIterateSocketMasks(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
numSockets int
|
||||
}{
|
||||
{
|
||||
name: "1 Socket",
|
||||
numSockets: 1,
|
||||
},
|
||||
{
|
||||
name: "2 Sockets",
|
||||
numSockets: 2,
|
||||
},
|
||||
{
|
||||
name: "4 Sockets",
|
||||
numSockets: 4,
|
||||
},
|
||||
{
|
||||
name: "8 Sockets",
|
||||
numSockets: 8,
|
||||
},
|
||||
{
|
||||
name: "16 Sockets",
|
||||
numSockets: 16,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcases {
|
||||
// Generate a list of sockets from tc.numSockets.
|
||||
var sockets []int
|
||||
for i := 0; i < tc.numSockets; i++ {
|
||||
sockets = append(sockets, i)
|
||||
}
|
||||
|
||||
// Calculate the expected number of masks. Since we always have masks
|
||||
// with sockets from 0..n, this is just (2^n - 1) since we want 1 mask
|
||||
// represented by each integer between 1 and 2^n-1.
|
||||
expectedNumMasks := (1 << uint(tc.numSockets)) - 1
|
||||
|
||||
// Iterate all masks and count them.
|
||||
numMasks := 0
|
||||
IterateSocketMasks(sockets, func(SocketMask) {
|
||||
numMasks++
|
||||
})
|
||||
|
||||
// Compare the number of masks generated to the expected amount.
|
||||
if expectedNumMasks != numMasks {
|
||||
t.Errorf("Expected to iterate %v masks, got %v", expectedNumMasks, numMasks)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user