diff --git a/pkg/kubelet/cm/cpumanager/policy_options.go b/pkg/kubelet/cm/cpumanager/policy_options.go index 03d472fcb3c..d04b60420e1 100644 --- a/pkg/kubelet/cm/cpumanager/policy_options.go +++ b/pkg/kubelet/cm/cpumanager/policy_options.go @@ -73,7 +73,7 @@ type StaticPolicyOptions struct { // Flag to evenly distribute CPUs across NUMA nodes in cases where more // than one NUMA node is required to satisfy the allocation. DistributeCPUsAcrossNUMA bool - // Flag to ensure CPU's are considered aligned at socket boundary rather than + // Flag to ensure CPUs are considered aligned at socket boundary rather than // NUMA boundary AlignBySocket bool } @@ -114,12 +114,12 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption } func ValidateStaticPolicyOptions(opts StaticPolicyOptions, topology *topology.CPUTopology, topologyManager topologymanager.Store) error { - if opts.AlignBySocket == true { - //1. not compatible with topology manager single numa policy option + if opts.AlignBySocket { + // Not compatible with topology manager single-numa-node policy option. if topologyManager.GetPolicy().Name() == topologymanager.PolicySingleNumaNode { - return fmt.Errorf("Topolgy manager Single numa policy is incompatible with CPUManager Align by socket policy option") + return fmt.Errorf("Topolgy manager %s policy is incompatible with CPUManager %s policy option", topologymanager.PolicySingleNumaNode, AlignBySocketOption) } - //2. not comptuble with topology when num_socets > num_numa + // Not compatible with topology when number of sockets are more than number of NUMA nodes. if topology.NumSockets > topology.NumNUMANodes { return fmt.Errorf("Align by socket is not compatible with hardware where number of sockets are more than number of NUMA") } diff --git a/pkg/kubelet/cm/cpumanager/policy_options_test.go b/pkg/kubelet/cm/cpumanager/policy_options_test.go index b50794fb4e6..8fc3bb8bdc3 100644 --- a/pkg/kubelet/cm/cpumanager/policy_options_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_options_test.go @@ -23,6 +23,8 @@ import ( "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" pkgfeatures "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) type optionAvailTest struct { @@ -54,7 +56,7 @@ func TestPolicyDefaultsAvailable(t *testing.T) { } } -func TestPolicyBetaOptionsAvailable(t *testing.T) { +func TestPolicyOptionsAvailable(t *testing.T) { testCases := []optionAvailTest{ { option: "this-option-does-not-exist", @@ -80,6 +82,18 @@ func TestPolicyBetaOptionsAvailable(t *testing.T) { featureGateEnable: false, expectedAvailable: false, }, + { + option: AlignBySocketOption, + featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions, + featureGateEnable: true, + expectedAvailable: true, + }, + { + option: AlignBySocketOption, + featureGate: pkgfeatures.CPUManagerPolicyBetaOptions, + featureGateEnable: true, + expectedAvailable: false, + }, } for _, testCase := range testCases { t.Run(testCase.option, func(t *testing.T) { @@ -92,3 +106,74 @@ func TestPolicyBetaOptionsAvailable(t *testing.T) { }) } } + +func TestValidateStaticPolicyOptions(t *testing.T) { + testCases := []struct { + description string + policyOption map[string]string + topology *topology.CPUTopology + topoMgrPolicy string + expectedErr bool + }{ + { + description: "Align by socket not enabled", + policyOption: map[string]string{FullPCPUsOnlyOption: "true"}, + topology: topoDualSocketMultiNumaPerSocketHT, + topoMgrPolicy: topologymanager.PolicySingleNumaNode, + expectedErr: false, + }, + { + description: "Align by socket enabled with topology manager single numa node", + policyOption: map[string]string{AlignBySocketOption: "true"}, + topology: topoDualSocketMultiNumaPerSocketHT, + topoMgrPolicy: topologymanager.PolicySingleNumaNode, + expectedErr: true, + }, + { + description: "Align by socket enabled with num_sockets > num_numa", + policyOption: map[string]string{AlignBySocketOption: "true"}, + topology: fakeTopoMultiSocketDualSocketPerNumaHT, + topoMgrPolicy: topologymanager.PolicyNone, + expectedErr: true, + }, + { + description: "Align by socket enabled: with topology manager None policy", + policyOption: map[string]string{AlignBySocketOption: "true"}, + topology: topoDualSocketMultiNumaPerSocketHT, + topoMgrPolicy: topologymanager.PolicyNone, + expectedErr: false, + }, + { + description: "Align by socket enabled: with topology manager best-effort policy", + policyOption: map[string]string{AlignBySocketOption: "true"}, + topology: topoDualSocketMultiNumaPerSocketHT, + topoMgrPolicy: topologymanager.PolicyBestEffort, + expectedErr: false, + }, + { + description: "Align by socket enabled: with topology manager restricted policy", + policyOption: map[string]string{AlignBySocketOption: "true"}, + topology: topoDualSocketMultiNumaPerSocketHT, + topoMgrPolicy: topologymanager.PolicyRestricted, + expectedErr: false, + }, + } + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + topoMgrPolicy := topologymanager.NewNonePolicy() + if testCase.topoMgrPolicy == topologymanager.PolicySingleNumaNode { + topoMgrPolicy = topologymanager.NewSingleNumaNodePolicy(nil) + + } + topoMgrStore := topologymanager.NewFakeManagerWithPolicy(topoMgrPolicy) + + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)() + policyOpt, _ := NewStaticPolicyOptions(testCase.policyOption) + err := ValidateStaticPolicyOptions(policyOpt, testCase.topology, topoMgrStore) + gotError := (err != nil) + if gotError != testCase.expectedErr { + t.Errorf("testCase %q failed, got %v expected %v", testCase.description, gotError, testCase.expectedErr) + } + }) + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 7a3c314af27..ff9e6a20e14 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -572,7 +572,7 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu // to the minAffinitySize. Only those with an equal number of bits set (and // with a minimal set of numa nodes) will be considered preferred. for i := range hints { - if p.options.AlignBySocket && p.isHintSocketAligned(hints[i].NUMANodeAffinity) { + if p.options.AlignBySocket && p.isHintSocketAligned(hints[i], minAffinitySize) { hints[i].Preferred = true continue } @@ -584,18 +584,24 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu return hints } -func (p *staticPolicy) isHintSocketAligned(hint bitmask.BitMask) bool { - numaNodes := hint.GetBits() - if p.topology.CPUDetails.SocketsInNUMANodes(numaNodes[:]...).Size() == 1 { - return true +// isHintSocketAligned function return true if numa nodes in hint are socket aligned. +func (p *staticPolicy) isHintSocketAligned(hint topologymanager.TopologyHint, minAffinitySize int) bool { + numaNodesBitMask := hint.NUMANodeAffinity.GetBits() + numaNodesPerSocket := p.topology.NumNUMANodes / p.topology.NumSockets + if numaNodesPerSocket == 0 { + return false } - return false + // minSockets refers to minimum number of socket required to satify allocation. + // A hint is considered socket aligned if sockets across which numa nodes span is equal to minSockets + minSockets := (minAffinitySize + numaNodesPerSocket - 1) / numaNodesPerSocket + return p.topology.CPUDetails.SocketsInNUMANodes(numaNodesBitMask...).Size() == minSockets } // getAlignedCPUs return set of aligned CPUs based on numa affinity mask and configured policy options. func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableCPUs cpuset.CPUSet) cpuset.CPUSet { alignedCPUs := cpuset.NewCPUSet() numaBits := numaAffinity.GetBits() + // If align-by-socket policy option is enabled, NUMA based hint is expanded to // socket aligned hint. It will ensure that first socket aligned available CPUs are // allocated before we try to find CPUs across socket to satisfy allocation request. diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 4e3255fff01..4c10af065a4 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -22,6 +22,9 @@ import ( "testing" v1 "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + pkgfeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -39,6 +42,7 @@ type staticPolicyTest struct { stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet pod *v1.Pod + topologyHint *topologymanager.TopologyHint expErr error expCPUAlloc bool expCSet cpuset.CPUSet @@ -190,6 +194,7 @@ func TestStaticPolicyAdd(t *testing.T) { // these are the cases which must behave the same regardless the policy options. // So we will permutate the options to ensure this holds true. + optionsInsensitiveTestCases := []staticPolicyTest{ { description: "GuPodSingleCore, SingleSocketHT, ExpectError", @@ -493,6 +498,42 @@ func TestStaticPolicyAdd(t *testing.T) { expCSet: cpuset.NewCPUSet(), }, } + newNUMAAffinity := func(bits ...int) bitmask.BitMask { + affinity, _ := bitmask.NewBitMask(bits...) + return affinity + } + alignBySocketOptionTestCases := []staticPolicyTest{ + { + description: "Align by socket: true, cpu's within same socket of numa in hint are part of allocation", + topo: topoDualSocketMultiNumaPerSocketHT, + options: map[string]string{ + AlignBySocketOption: "true", + }, + numReservedCPUs: 1, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(2, 11, 21, 22), + pod: makePod("fakePod", "fakeContainer2", "2000m", "2000m"), + topologyHint: &topologymanager.TopologyHint{NUMANodeAffinity: newNUMAAffinity(0, 2), Preferred: true}, + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(2, 11), + }, + { + description: "Align by socket: false, cpu's are taken strictly from NUMA nodes in hint", + topo: topoDualSocketMultiNumaPerSocketHT, + options: map[string]string{ + AlignBySocketOption: "false", + }, + numReservedCPUs: 1, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(2, 11, 21, 22), + pod: makePod("fakePod", "fakeContainer2", "2000m", "2000m"), + topologyHint: &topologymanager.TopologyHint{NUMANodeAffinity: newNUMAAffinity(0, 2), Preferred: true}, + expErr: nil, + expCPUAlloc: true, + expCSet: cpuset.NewCPUSet(2, 21), + }, + } for _, testCase := range optionsInsensitiveTestCases { for _, options := range []map[string]string{ @@ -514,10 +555,17 @@ func TestStaticPolicyAdd(t *testing.T) { for _, testCase := range smtalignOptionTestCases { runStaticPolicyTestCase(t, testCase) } + for _, testCase := range alignBySocketOptionTestCases { + runStaticPolicyTestCaseWithFeatureGate(t, testCase) + } } func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), testCase.options) + tm := topologymanager.NewFakeManager() + if testCase.topologyHint != nil { + tm = topologymanager.NewFakeManagerWithHint(testCase.topologyHint) + } + policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), tm, testCase.options) st := &mockState{ assignments: testCase.stAssignments, @@ -558,6 +606,11 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { } } +func runStaticPolicyTestCaseWithFeatureGate(t *testing.T, testCase staticPolicyTest) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)() + runStaticPolicyTestCase(t, testCase) +} + func TestStaticPolicyReuseCPUs(t *testing.T) { testCases := []struct { staticPolicyTest diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index 9b8abe77488..02d064e71e3 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -24,6 +24,9 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + pkgfeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -254,6 +257,186 @@ func TestGetPodTopologyHints(t *testing.T) { } } +func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) { + testPod1 := makePod("fakePod", "fakeContainer", "2", "2") + testContainer1 := &testPod1.Spec.Containers[0] + + testPod2 := makePod("fakePod", "fakeContainer", "41", "41") + testContainer2 := &testPod1.Spec.Containers[0] + + cpu_set_across_socket, _ := cpuset.Parse("0-28,40-57") + + m0001, _ := bitmask.NewBitMask(0) + m0011, _ := bitmask.NewBitMask(0, 1) + m0101, _ := bitmask.NewBitMask(0, 2) + m1001, _ := bitmask.NewBitMask(0, 3) + m0111, _ := bitmask.NewBitMask(0, 1, 2) + m1011, _ := bitmask.NewBitMask(0, 1, 3) + m1101, _ := bitmask.NewBitMask(0, 2, 3) + m1111, _ := bitmask.NewBitMask(0, 1, 2, 3) + + testCases := []struct { + description string + pod v1.Pod + container v1.Container + assignments state.ContainerCPUAssignments + defaultCPUSet cpuset.CPUSet + policyOptions map[string]string + topology *topology.CPUTopology + expectedHints []topologymanager.TopologyHint + }{ + { + // CPU available on numa node[0 ,1]. CPU on numa node 0 can satisfy request of 2 CPU's + description: "AlignBySocket:false, Preferred hints does not contains socket aligned hints", + pod: *testPod1, + container: *testContainer1, + defaultCPUSet: cpuset.NewCPUSet(2, 3, 11), + topology: topoDualSocketMultiNumaPerSocketHT, + policyOptions: map[string]string{AlignBySocketOption: "false"}, + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: m0001, + Preferred: true, + }, + { + NUMANodeAffinity: m0011, + Preferred: false, + }, + { + NUMANodeAffinity: m0101, + Preferred: false, + }, + { + NUMANodeAffinity: m1001, + Preferred: false, + }, + { + NUMANodeAffinity: m0111, + Preferred: false, + }, + { + NUMANodeAffinity: m1011, + Preferred: false, + }, + { + NUMANodeAffinity: m1101, + Preferred: false, + }, + { + NUMANodeAffinity: m1111, + Preferred: false, + }, + }, + }, + { + // CPU available on numa node[0 ,1]. CPU on numa node 0 can satisfy request of 2 CPU's + description: "AlignBySocket:true Preferred hints contains socket aligned hints", + pod: *testPod1, + container: *testContainer1, + defaultCPUSet: cpuset.NewCPUSet(2, 3, 11), + topology: topoDualSocketMultiNumaPerSocketHT, + policyOptions: map[string]string{AlignBySocketOption: "true"}, + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: m0001, + Preferred: true, + }, + { + NUMANodeAffinity: m0011, + Preferred: true, + }, + { + NUMANodeAffinity: m0101, + Preferred: false, + }, + { + NUMANodeAffinity: m1001, + Preferred: false, + }, + { + NUMANodeAffinity: m0111, + Preferred: false, + }, + { + NUMANodeAffinity: m1011, + Preferred: false, + }, + { + NUMANodeAffinity: m1101, + Preferred: false, + }, + { + NUMANodeAffinity: m1111, + Preferred: false, + }, + }, + }, + { + // CPU available on numa node[0 ,1]. CPU on numa nodes across sockets can satisfy request of 2 CPU's + description: "AlignBySocket:true Preferred hints are spread across socket since 2 sockets are required", + pod: *testPod2, + container: *testContainer2, + defaultCPUSet: cpu_set_across_socket, + topology: topoDualSocketMultiNumaPerSocketHT, + policyOptions: map[string]string{AlignBySocketOption: "true"}, + expectedHints: []topologymanager.TopologyHint{ + { + NUMANodeAffinity: m0111, + Preferred: true, + }, + { + NUMANodeAffinity: m1111, + Preferred: true, + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)() + + var activePods []*v1.Pod + for p := range testCase.assignments { + pod := v1.Pod{} + pod.UID = types.UID(p) + for c := range testCase.assignments[p] { + container := v1.Container{} + container.Name = c + pod.Spec.Containers = append(pod.Spec.Containers, container) + } + activePods = append(activePods, &pod) + } + policyOpt, _ := NewStaticPolicyOptions(testCase.policyOptions) + m := manager{ + policy: &staticPolicy{ + topology: testCase.topology, + options: policyOpt, + }, + state: &mockState{ + assignments: testCase.assignments, + defaultCPUSet: testCase.defaultCPUSet, + }, + topology: testCase.topology, + activePods: func() []*v1.Pod { return activePods }, + podStatusProvider: mockPodStatusProvider{}, + sourcesReady: &sourcesReadyStub{}, + } + + podHints := m.GetPodTopologyHints(&testCase.pod)[string(v1.ResourceCPU)] + sort.SliceStable(podHints, func(i, j int) bool { + return podHints[i].LessThan(podHints[j]) + }) + sort.SliceStable(testCase.expectedHints, func(i, j int) bool { + return testCase.expectedHints[i].LessThan(testCase.expectedHints[j]) + }) + if !reflect.DeepEqual(testCase.expectedHints, podHints) { + t.Errorf("Expected in result to be %v , got %v", testCase.expectedHints, podHints) + } + }) + } +} + func returnTestCases() []testCase { testPod1 := makePod("fakePod", "fakeContainer", "2", "2") testContainer1 := &testPod1.Spec.Containers[0] diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index da9910fba87..cee5e48d51e 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -39,6 +39,10 @@ func (m *mockAffinityStore) GetAffinity(podUID string, containerName string) top return m.hint } +func (m *mockAffinityStore) GetPolicy() topologymanager.Policy { + return nil +} + func makeNUMADevice(id string, numa int) pluginapi.Device { return pluginapi.Device{ ID: id, diff --git a/pkg/kubelet/cm/topologymanager/fake_topology_manager.go b/pkg/kubelet/cm/topologymanager/fake_topology_manager.go index 18d47e954ea..921b47dab3b 100644 --- a/pkg/kubelet/cm/topologymanager/fake_topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/fake_topology_manager.go @@ -24,7 +24,8 @@ import ( ) type fakeManager struct { - hint *TopologyHint + hint *TopologyHint + policy Policy } // NewFakeManager returns an instance of FakeManager @@ -37,7 +38,16 @@ func NewFakeManager() Manager { func NewFakeManagerWithHint(hint *TopologyHint) Manager { klog.InfoS("NewFakeManagerWithHint") return &fakeManager{ - hint: hint, + hint: hint, + policy: NewNonePolicy(), + } +} + +// NewFakeManagerWithPolicy returns an instance of fake topology manager with specified policy +func NewFakeManagerWithPolicy(policy Policy) Manager { + klog.InfoS("NewFakeManagerWithPolicy") + return &fakeManager{ + policy: policy, } } @@ -51,7 +61,7 @@ func (m *fakeManager) GetAffinity(podUID string, containerName string) TopologyH } func (m *fakeManager) GetPolicy() Policy { - return NewNonePolicy() + return m.policy } func (m *fakeManager) AddHintProvider(h HintProvider) { diff --git a/pkg/kubelet/cm/topologymanager/scope.go b/pkg/kubelet/cm/topologymanager/scope.go index b1d174299c6..ed149df5bad 100644 --- a/pkg/kubelet/cm/topologymanager/scope.go +++ b/pkg/kubelet/cm/topologymanager/scope.go @@ -90,7 +90,7 @@ func (s *scope) GetAffinity(podUID string, containerName string) TopologyHint { } func (s *scope) GetPolicy() Policy { - return s.policy + return s.policy } func (s *scope) AddHintProvider(h HintProvider) { diff --git a/pkg/kubelet/cm/topologymanager/topology_manager.go b/pkg/kubelet/cm/topologymanager/topology_manager.go index 476479bae43..ea5ac91560d 100644 --- a/pkg/kubelet/cm/topologymanager/topology_manager.go +++ b/pkg/kubelet/cm/topologymanager/topology_manager.go @@ -186,7 +186,7 @@ func (m *manager) GetAffinity(podUID string, containerName string) TopologyHint } func (m *manager) GetPolicy() Policy { - return m.scope.GetPolicy() + return m.scope.GetPolicy() } func (m *manager) AddHintProvider(h HintProvider) {