From a07510fade8813a44a2bc915ac1aa14f15582f13 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 30 Jan 2025 09:31:37 +0100 Subject: [PATCH 1/5] node: cpumgr: test: abort on failure abort the test in the unlikely case we fail to create the Policy object. Exactly because this is unlikely we should fail loudly. In my case this happened because unrelated bad parameters, which was hard to catch Signed-off-by: Francesco Romani --- .../cm/cpumanager/policy_static_test.go | 48 ++++++++++++++----- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index e21136e8f61..8020e78bc5d 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -70,7 +70,10 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest { } func TestStaticPolicyName(t *testing.T) { - policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } policyName := policy.Name() if policyName != "static" { @@ -168,13 +171,16 @@ func TestStaticPolicyStart(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true) - p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options) + p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } policy := p.(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, } - err := policy.Start(st) + err = policy.Start(st) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", testCase.description, testCase.expErr, err) @@ -637,7 +643,10 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { if testCase.reservedCPUs != nil { cpus = testCase.reservedCPUs.Clone() } - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -645,7 +654,7 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { } container := &testCase.pod.Spec.Containers[0] - err := policy.Allocate(st, testCase.pod, container) + err = policy.Allocate(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %q but got: %q", testCase.description, testCase.expErr, err) @@ -708,7 +717,10 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -761,7 +773,10 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -843,7 +858,10 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -933,13 +951,16 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { }, } for _, tc := range testCases { - p, _ := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil) + p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } policy := p.(*staticPolicy) st := &mockState{ assignments: tc.stAssignments, defaultCPUSet: tc.stDefaultCPUSet, } - err := policy.Start(st) + err = policy.Start(st) if err != nil { t.Errorf("StaticPolicy Start() error (%v)", err) continue @@ -1107,7 +1128,10 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -1115,7 +1139,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } container := &testCase.pod.Spec.Containers[0] - err := policy.Allocate(st, testCase.pod, container) + err = policy.Allocate(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v", testCase.description, testCase.expErr, err) From 52e0f522ba0c31c7012d8b9396f332ed25e30af2 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 30 Jan 2025 09:35:06 +0100 Subject: [PATCH 2/5] node: cpumgr: test: use Equals for comparison There's no need to use the generic reflect.DeepEqual, we can use the cpuset Equals method in tests, which is more idiomatic and a tad clearer. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/cpumanager/policy_static_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 8020e78bc5d..901d45bfe87 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -667,7 +667,7 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { testCase.description, container.Name, st.assignments) } - if !reflect.DeepEqual(cset, testCase.expCSet) { + if !cset.Equals(testCase.expCSet) { t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", testCase.description, testCase.expCSet, cset) } @@ -732,7 +732,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { policy.Allocate(st, pod, &container) } - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) { + if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) { t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v", testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet) } @@ -740,7 +740,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { // remove policy.RemoveContainer(st, string(pod.UID), testCase.containerName) - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterRemove) { + if !st.defaultCPUSet.Equals(testCase.expCSetAfterRemove) { t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v", testCase.description, testCase.expCSetAfterRemove, st.defaultCPUSet) } @@ -792,7 +792,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { testCase.description, err) } } - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) { + if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) { t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v", testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet) } @@ -870,7 +870,7 @@ func TestStaticPolicyRemove(t *testing.T) { policy.RemoveContainer(st, testCase.podUID, testCase.containerName) - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSet) { + if !st.defaultCPUSet.Equals(testCase.expCSet) { t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v", testCase.description, testCase.expCSet, st.defaultCPUSet) } @@ -973,7 +973,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { continue } - if !reflect.DeepEqual(tc.expCSet, cset) { + if !tc.expCSet.Equals(cset) { t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v", tc.description, tc.expCSet, cset) } @@ -1152,7 +1152,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { testCase.description, container.Name, st.assignments) } - if !reflect.DeepEqual(cset, testCase.expCSet) { + if !cset.Equals(testCase.expCSet) { t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", testCase.description, testCase.expCSet, cset) } From cea7d97d8bc66c49deeeb6792db8a0ca2b18f0f9 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 30 Jan 2025 14:06:35 +0100 Subject: [PATCH 3/5] node: cpumgr: use String representation in output get nicer output for free Signed-off-by: Francesco Romani --- pkg/kubelet/cm/cpumanager/policy_static_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 901d45bfe87..b7b43c135b7 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -668,12 +668,12 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { } if !cset.Equals(testCase.expCSet) { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s", testCase.description, testCase.expCSet, cset) } if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s", testCase.description, cset, st.defaultCPUSet) } } @@ -733,7 +733,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { policy.Allocate(st, pod, &container) } if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) { - t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s", testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet) } @@ -741,7 +741,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { policy.RemoveContainer(st, string(pod.UID), testCase.containerName) if !st.defaultCPUSet.Equals(testCase.expCSetAfterRemove) { - t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v", + t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %sv but got %s", testCase.description, testCase.expCSetAfterRemove, st.defaultCPUSet) } if _, found := st.assignments[string(pod.UID)][testCase.containerName]; found { @@ -793,7 +793,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { } } if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) { - t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s", testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet) } } @@ -871,7 +871,7 @@ func TestStaticPolicyRemove(t *testing.T) { policy.RemoveContainer(st, testCase.podUID, testCase.containerName) if !st.defaultCPUSet.Equals(testCase.expCSet) { - t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v", + t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %s but got %s", testCase.description, testCase.expCSet, st.defaultCPUSet) } @@ -1153,12 +1153,12 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } if !cset.Equals(testCase.expCSet) { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s", testCase.description, testCase.expCSet, cset) } if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s", testCase.description, cset, st.defaultCPUSet) } } From 9cfe1863ce9a736705ad0012b955b0fd62e34efd Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 30 Jan 2025 09:36:45 +0100 Subject: [PATCH 4/5] node: e2e: clarify test description fix test name. Cosmetic only. Signed-off-by: Francesco Romani --- test/e2e_node/cpu_manager_metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go index 03afe0c1e79..6a9fe491544 100644 --- a/test/e2e_node/cpu_manager_metrics_test.go +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -177,7 +177,7 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) }) - ginkgo.It("should return updated alignment counters when pod successfully run", func(ctx context.Context) { + ginkgo.It("should return updated SMT alignment counters when pod successfully run", func(ctx context.Context) { ginkgo.By("Creating the test pod") testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-smt-ok", smtLevel)) From 5c17e7bd9c689387d9f694b3064b34cff0037fdd Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 30 Jan 2025 10:01:49 +0100 Subject: [PATCH 5/5] node: cpumgr: metrics: add uncore cache alignment metrics add missing metric about uncore / L3 / Last-Level cache alignment, plus its e2e tests. Exposing uncore alignment requires a bit of refactoring in the static policy implementation because, differently from full PCPUs alignment and NUMA alignment, can't be easily and safely inferred by construction. The main reason for this is that uncore cache alignment is preferred, not mandatory, thus the cpu allocator can legally use cross-uncore allocation. Because of that, the final cpuset union step can create a final cpuset which is not uncore-aligned even though all its parts are uncore-aligned. The safest way seems thus to run just a final uncore-alignment check once the final cpuset is computed. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/cpumanager/policy_static.go | 45 ++++--- .../cm/cpumanager/policy_static_test.go | 6 +- .../cm/cpumanager/topology/alignment.go | 78 +++++++++++ .../cm/cpumanager/topology/alignment_test.go | 126 ++++++++++++++++++ .../cm/cpumanager/topology/topology.go | 9 ++ pkg/kubelet/metrics/metrics.go | 1 + test/e2e_node/cpu_manager_metrics_test.go | 104 ++++++++++++++- test/e2e_node/cpu_manager_test.go | 16 +++ 8 files changed, 361 insertions(+), 24 deletions(-) create mode 100644 pkg/kubelet/cm/cpumanager/topology/alignment.go create mode 100644 pkg/kubelet/cm/cpumanager/topology/alignment_test.go diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 56236e95096..a9828474d64 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -330,6 +330,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai } return } + // TODO: move in updateMetricsOnAllocate if p.options.FullPhysicalCPUsOnly { // increment only if we know we allocate aligned resources metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Inc() @@ -369,8 +370,8 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai } } } - if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { - p.updateCPUsToReuse(pod, container, cpuset) + if cset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + p.updateCPUsToReuse(pod, container, cset) klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) return nil } @@ -380,17 +381,17 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) // Allocate CPUs according to the NUMA affinity contained in the hint. - cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) + cpuAllocation, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) if err != nil { klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) return err } - s.SetCPUSet(string(pod.UID), container.Name, cpuset) - p.updateCPUsToReuse(pod, container, cpuset) - p.updateMetricsOnAllocate(cpuset) + s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs) + p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs) + p.updateMetricsOnAllocate(cpuAllocation) - klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuset) + klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String()) return nil } @@ -420,13 +421,13 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa return nil } -func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { +func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (topology.Allocation, error) { klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity) allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs) // If there are aligned CPUs in numaAffinity, attempt to take those first. - result := cpuset.New() + result := topology.EmptyAllocation() if numaAffinity != nil { alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs) @@ -435,25 +436,26 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit numAlignedToAlloc = numCPUs } - alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) + allocatedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) if err != nil { - return cpuset.New(), err + return topology.EmptyAllocation(), err } - result = result.Union(alignedCPUs) + result.CPUs = result.CPUs.Union(allocatedCPUs) } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size()) + remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result.CPUs), numCPUs-result.CPUs.Size()) if err != nil { - return cpuset.New(), err + return topology.EmptyAllocation(), err } - result = result.Union(remainingCPUs) + result.CPUs = result.CPUs.Union(remainingCPUs) + result.Aligned = p.topology.CheckAlignment(result.CPUs) // Remove allocated CPUs from the shared CPUSet. - s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result)) + s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result.CPUs)) - klog.InfoS("AllocateCPUs", "result", result) + klog.InfoS("AllocateCPUs", "result", result.String()) return result, nil } @@ -755,12 +757,17 @@ func (p *staticPolicy) initializeMetrics(s state.State) { metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000)) metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s))) metrics.ContainerAlignedComputeResourcesFailure.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists + metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists + metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Add(0) // ensure the value exists } -func (p *staticPolicy) updateMetricsOnAllocate(cset cpuset.CPUSet) { - ncpus := cset.Size() +func (p *staticPolicy) updateMetricsOnAllocate(cpuAlloc topology.Allocation) { + ncpus := cpuAlloc.CPUs.Size() metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus)) metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000)) + if cpuAlloc.Aligned.UncoreCache { + metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Inc() + } } func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) { diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index b7b43c135b7..73003a2a5c4 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -966,16 +966,16 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { continue } - cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New()) + cpuAlloc, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New()) if err != nil { t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v", tc.description, tc.expCSet, err) continue } - if !tc.expCSet.Equals(cset) { + if !tc.expCSet.Equals(cpuAlloc.CPUs) { t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v", - tc.description, tc.expCSet, cset) + tc.description, tc.expCSet, cpuAlloc.CPUs) } } } diff --git a/pkg/kubelet/cm/cpumanager/topology/alignment.go b/pkg/kubelet/cm/cpumanager/topology/alignment.go new file mode 100644 index 00000000000..4f3dfe29762 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/topology/alignment.go @@ -0,0 +1,78 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "fmt" + + "k8s.io/utils/cpuset" +) + +// Alignment is metadata about a cpuset allocation +type Alignment struct { + // UncoreCache is true if all the CPUs are uncore-cache aligned, + // IOW if they all share the same Uncore cache block. + // If the allocated CPU count is greater than a Uncore Group size, + // CPUs can't be uncore-aligned; otherwise, they are. + // This flag tracks alignment, not interference or lack thereof. + UncoreCache bool +} + +func (ca Alignment) String() string { + return fmt.Sprintf("aligned=", ca.UncoreCache) +} + +// Allocation represents a CPU set plus alignment metadata +type Allocation struct { + CPUs cpuset.CPUSet + Aligned Alignment +} + +func (ca Allocation) String() string { + return ca.CPUs.String() + " " + ca.Aligned.String() +} + +// EmptyAllocation returns a new zero-valued CPU allocation. Please note that +// a empty cpuset is aligned according to every possible way we can consider +func EmptyAllocation() Allocation { + return Allocation{ + CPUs: cpuset.New(), + Aligned: Alignment{ + UncoreCache: true, + }, + } +} + +func isAlignedAtUncoreCache(topo *CPUTopology, cpuList ...int) bool { + if len(cpuList) <= 1 { + return true + } + reference, ok := topo.CPUDetails[cpuList[0]] + if !ok { + return false + } + for _, cpu := range cpuList[1:] { + info, ok := topo.CPUDetails[cpu] + if !ok { + return false + } + if info.UncoreCacheID != reference.UncoreCacheID { + return false + } + } + return true +} diff --git a/pkg/kubelet/cm/cpumanager/topology/alignment_test.go b/pkg/kubelet/cm/cpumanager/topology/alignment_test.go new file mode 100644 index 00000000000..2fc0dd08a67 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/topology/alignment_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "reflect" + "testing" + + "k8s.io/utils/cpuset" +) + +func TestNewAlignment(t *testing.T) { + topo := &CPUTopology{ + NumCPUs: 32, + NumSockets: 1, + NumCores: 32, + NumNUMANodes: 1, + NumUncoreCache: 8, + CPUDetails: map[int]CPUInfo{ + 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 1: {CoreID: 1, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 3: {CoreID: 3, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 4: {CoreID: 4, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 5: {CoreID: 5, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 6: {CoreID: 6, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 7: {CoreID: 7, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 8: {CoreID: 8, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 9: {CoreID: 9, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 10: {CoreID: 10, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 11: {CoreID: 11, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 12: {CoreID: 12, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 13: {CoreID: 13, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 14: {CoreID: 14, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 15: {CoreID: 15, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 16: {CoreID: 16, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 17: {CoreID: 17, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 18: {CoreID: 18, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 19: {CoreID: 19, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 20: {CoreID: 20, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 21: {CoreID: 21, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 22: {CoreID: 22, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 23: {CoreID: 23, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 24: {CoreID: 24, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 25: {CoreID: 25, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 26: {CoreID: 26, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 27: {CoreID: 27, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 28: {CoreID: 28, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + 29: {CoreID: 29, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + 30: {CoreID: 30, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + 31: {CoreID: 31, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + }, + } + + tests := []struct { + name string + topo *CPUTopology + cpus cpuset.CPUSet + want Alignment + }{{ + name: "empty cpuset", + topo: topo, + cpus: cpuset.New(), + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "single random CPU", + topo: topo, + cpus: cpuset.New(11), // any single id is fine, no special meaning + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "less CPUs than a uncore cache group", + topo: topo, + cpus: cpuset.New(29, 30, 31), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "enough CPUs to fill a uncore cache group", + topo: topo, + cpus: cpuset.New(8, 9, 10, 11), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "more CPUs than a full a uncore cache group", + topo: topo, + cpus: cpuset.New(9, 10, 11, 23), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: false, + }, + }, { + name: "enough CPUs to exactly fill multiple uncore cache groups", + topo: topo, + cpus: cpuset.New(8, 9, 10, 11, 16, 17, 18, 19), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: false, + }, + }} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.topo.CheckAlignment(tt.cpus) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("AlignmentFromCPUSet() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/kubelet/cm/cpumanager/topology/topology.go b/pkg/kubelet/cm/cpumanager/topology/topology.go index 18b38233666..36291e6f5af 100644 --- a/pkg/kubelet/cm/cpumanager/topology/topology.go +++ b/pkg/kubelet/cm/cpumanager/topology/topology.go @@ -101,6 +101,15 @@ func (topo *CPUTopology) CPUNUMANodeID(cpu int) (int, error) { return info.NUMANodeID, nil } +// CheckAlignment returns alignment information for the given cpuset in +// the context of the current CPU topology +func (topo *CPUTopology) CheckAlignment(cpus cpuset.CPUSet) Alignment { + cpuList := cpus.UnsortedList() + return Alignment{ + UncoreCache: isAlignedAtUncoreCache(topo, cpuList...), + } +} + // CPUInfo contains the NUMA, socket, UncoreCache and core IDs associated with a CPU. type CPUInfo struct { NUMANodeID int diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index b4c58b7ca26..04d62880b0b 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -150,6 +150,7 @@ const ( AlignedPhysicalCPU = "physical_cpu" AlignedNUMANode = "numa_node" + AlignedUncoreCache = "uncore_cache" // Metrics to track kubelet admission rejections. AdmissionRejectionsTotalKey = "admission_rejections_total" diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go index 6a9fe491544..b4db0f40806 100644 --- a/test/e2e_node/cpu_manager_metrics_test.go +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -19,6 +19,7 @@ package e2enode import ( "context" "fmt" + "strconv" "time" "github.com/onsi/ginkgo/v2" @@ -50,7 +51,10 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa ginkgo.Context("when querying /metrics", func() { var oldCfg *kubeletconfig.KubeletConfiguration var testPod *v1.Pod + var cpuAlloc int64 var smtLevel int + var uncoreGroupSize int + var hasSplitUncore bool ginkgo.BeforeEach(func(ctx context.Context) { var err error @@ -60,7 +64,7 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa } fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption) - _, cpuAlloc, _ := getLocalNodeCPUDetails(ctx, f) + _, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f) smtLevel = getSMTLevel() // strict SMT alignment is trivially verified and granted on non-SMT systems @@ -75,10 +79,22 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa framework.Logf("SMT level %d", smtLevel) + uncoreGroupSize = getUncoreCPUGroupSize() + if uncoreGroupSize == 0 { + hasSplitUncore = false + } else { + // check we do physically have split Uncore but also we have enough CPUs available to run + // meaningful tests. We need them both. + hasSplitUncore = (cpuAlloc > int64(uncoreGroupSize)) + } + + framework.Logf("Uncore Group Size %d; Split Uncore detected=%v", uncoreGroupSize, hasSplitUncore) + // TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably // check what we do have in the node. cpuPolicyOptions := map[string]string{ - cpumanager.FullPCPUsOnlyOption: "true", + cpumanager.FullPCPUsOnlyOption: "true", + cpumanager.PreferAlignByUnCoreCacheOption: strconv.FormatBool(hasSplitUncore), } newCfg := configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ @@ -289,6 +305,90 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa ginkgo.By("Ensuring the metrics match the expectations a few more times") gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsIdle) }) + + ginkgo.It("should update alignment counters when pod successfully run taking less than uncore cache group", func(ctx context.Context) { + if !hasSplitUncore { + e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected") + } + if smtLevel >= uncoreGroupSize { + // this doesn't make sense according to the very definition of uncore cache (a cache which spans across core blocks, + // and thread siblings belong to the same block and they share a exclusive cache block) + // so it has to be a configuration or detection issue. Fail loudly. + framework.Failf("Failed preconditions for CPU Manager uncore alignment test - SMT level more than Uncore group size - this is unexpected") + } + + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", smtLevel)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + idFn := makeCustomPairID("scope", "boundary") + matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{ + "container::uncore_cache": timelessSample(1), + }), + }) + + ginkgo.By("Giving the Kubelet time to update the alignment metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + }) + + ginkgo.It("should update alignment counters when pod successfully run taking a full uncore cache group", func(ctx context.Context) { + if !hasSplitUncore { + e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected") + } + + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", uncoreGroupSize)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + idFn := makeCustomPairID("scope", "boundary") + matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{ + "container::uncore_cache": timelessSample(1), + }), + }) + + ginkgo.By("Giving the Kubelet time to update the alignment metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + }) + + ginkgo.It("should not update alignment counters when pod successfully run taking more than a uncore cache group", func(ctx context.Context) { + if !hasSplitUncore { + e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected") + } + if cpuAlloc < int64(uncoreGroupSize+smtLevel) { + e2eskipper.Skipf("Skipping CPU Manager uncore alignment test - not enough available CPUs (needs %d allocatable %d)", uncoreGroupSize+smtLevel, cpuAlloc) + } + + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", uncoreGroupSize+smtLevel)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + idFn := makeCustomPairID("scope", "boundary") + matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{ + "container::uncore_cache": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to update the alignment metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + }) }) }) diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index c46e96166d3..c6546dc773a 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -18,7 +18,10 @@ package e2enode import ( "context" + "errors" "fmt" + "io/fs" + "os" "os/exec" "regexp" "strconv" @@ -253,6 +256,19 @@ func getSMTLevel() int { return cpus.Size() } +func getUncoreCPUGroupSize() int { + cpuID := 0 // this is just the most likely cpu to be present in a random system. No special meaning besides this. + out, err := os.ReadFile(fmt.Sprintf("/sys/devices/system/cpu/cpu%d/cache/index3/shared_cpu_list", cpuID)) + if errors.Is(err, fs.ErrNotExist) { + return 0 // no Uncore/LLC cache detected, nothing to do + } + framework.ExpectNoError(err) + // how many cores share a same Uncore/LLC block? + cpus, err := cpuset.Parse(strings.TrimSpace(string(out))) + framework.ExpectNoError(err) + return cpus.Size() +} + func getCPUSiblingList(cpuRes int64) string { out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuRes)).Output() framework.ExpectNoError(err)