diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 56236e95096..a9828474d64 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -330,6 +330,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai } return } + // TODO: move in updateMetricsOnAllocate if p.options.FullPhysicalCPUsOnly { // increment only if we know we allocate aligned resources metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Inc() @@ -369,8 +370,8 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai } } } - if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { - p.updateCPUsToReuse(pod, container, cpuset) + if cset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + p.updateCPUsToReuse(pod, container, cset) klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) return nil } @@ -380,17 +381,17 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) // Allocate CPUs according to the NUMA affinity contained in the hint. - cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) + cpuAllocation, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) if err != nil { klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) return err } - s.SetCPUSet(string(pod.UID), container.Name, cpuset) - p.updateCPUsToReuse(pod, container, cpuset) - p.updateMetricsOnAllocate(cpuset) + s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs) + p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs) + p.updateMetricsOnAllocate(cpuAllocation) - klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuset) + klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String()) return nil } @@ -420,13 +421,13 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa return nil } -func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { +func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (topology.Allocation, error) { klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity) allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs) // If there are aligned CPUs in numaAffinity, attempt to take those first. - result := cpuset.New() + result := topology.EmptyAllocation() if numaAffinity != nil { alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs) @@ -435,25 +436,26 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit numAlignedToAlloc = numCPUs } - alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) + allocatedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) if err != nil { - return cpuset.New(), err + return topology.EmptyAllocation(), err } - result = result.Union(alignedCPUs) + result.CPUs = result.CPUs.Union(allocatedCPUs) } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size()) + remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result.CPUs), numCPUs-result.CPUs.Size()) if err != nil { - return cpuset.New(), err + return topology.EmptyAllocation(), err } - result = result.Union(remainingCPUs) + result.CPUs = result.CPUs.Union(remainingCPUs) + result.Aligned = p.topology.CheckAlignment(result.CPUs) // Remove allocated CPUs from the shared CPUSet. - s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result)) + s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result.CPUs)) - klog.InfoS("AllocateCPUs", "result", result) + klog.InfoS("AllocateCPUs", "result", result.String()) return result, nil } @@ -755,12 +757,17 @@ func (p *staticPolicy) initializeMetrics(s state.State) { metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000)) metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s))) metrics.ContainerAlignedComputeResourcesFailure.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists + metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists + metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Add(0) // ensure the value exists } -func (p *staticPolicy) updateMetricsOnAllocate(cset cpuset.CPUSet) { - ncpus := cset.Size() +func (p *staticPolicy) updateMetricsOnAllocate(cpuAlloc topology.Allocation) { + ncpus := cpuAlloc.CPUs.Size() metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus)) metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000)) + if cpuAlloc.Aligned.UncoreCache { + metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Inc() + } } func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) { diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index e21136e8f61..73003a2a5c4 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -70,7 +70,10 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest { } func TestStaticPolicyName(t *testing.T) { - policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } policyName := policy.Name() if policyName != "static" { @@ -168,13 +171,16 @@ func TestStaticPolicyStart(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true) - p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options) + p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } policy := p.(*staticPolicy) st := &mockState{ assignments: testCase.stAssignments, defaultCPUSet: testCase.stDefaultCPUSet, } - err := policy.Start(st) + err = policy.Start(st) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v", testCase.description, testCase.expErr, err) @@ -637,7 +643,10 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { if testCase.reservedCPUs != nil { cpus = testCase.reservedCPUs.Clone() } - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -645,7 +654,7 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { } container := &testCase.pod.Spec.Containers[0] - err := policy.Allocate(st, testCase.pod, container) + err = policy.Allocate(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %q but got: %q", testCase.description, testCase.expErr, err) @@ -658,13 +667,13 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) { testCase.description, container.Name, st.assignments) } - if !reflect.DeepEqual(cset, testCase.expCSet) { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", + if !cset.Equals(testCase.expCSet) { + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s", testCase.description, testCase.expCSet, cset) } if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s", testCase.description, cset, st.defaultCPUSet) } } @@ -708,7 +717,10 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -720,16 +732,16 @@ func TestStaticPolicyReuseCPUs(t *testing.T) { for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { policy.Allocate(st, pod, &container) } - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) { - t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v", + if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) { + t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s", testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet) } // remove policy.RemoveContainer(st, string(pod.UID), testCase.containerName) - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterRemove) { - t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v", + if !st.defaultCPUSet.Equals(testCase.expCSetAfterRemove) { + t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %sv but got %s", testCase.description, testCase.expCSetAfterRemove, st.defaultCPUSet) } if _, found := st.assignments[string(pod.UID)][testCase.containerName]; found { @@ -761,7 +773,10 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -777,8 +792,8 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) { testCase.description, err) } } - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) { - t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v", + if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) { + t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s", testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet) } } @@ -843,7 +858,10 @@ func TestStaticPolicyRemove(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -852,8 +870,8 @@ func TestStaticPolicyRemove(t *testing.T) { policy.RemoveContainer(st, testCase.podUID, testCase.containerName) - if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSet) { - t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v", + if !st.defaultCPUSet.Equals(testCase.expCSet) { + t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %s but got %s", testCase.description, testCase.expCSet, st.defaultCPUSet) } @@ -933,28 +951,31 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { }, } for _, tc := range testCases { - p, _ := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil) + p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } policy := p.(*staticPolicy) st := &mockState{ assignments: tc.stAssignments, defaultCPUSet: tc.stDefaultCPUSet, } - err := policy.Start(st) + err = policy.Start(st) if err != nil { t.Errorf("StaticPolicy Start() error (%v)", err) continue } - cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New()) + cpuAlloc, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New()) if err != nil { t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v", tc.description, tc.expCSet, err) continue } - if !reflect.DeepEqual(tc.expCSet, cset) { + if !tc.expCSet.Equals(cpuAlloc.CPUs) { t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v", - tc.description, tc.expCSet, cset) + tc.description, tc.expCSet, cpuAlloc.CPUs) } } } @@ -1107,7 +1128,10 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } for _, testCase := range testCases { - policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) + policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil) + if err != nil { + t.Fatalf("NewStaticPolicy() failed: %v", err) + } st := &mockState{ assignments: testCase.stAssignments, @@ -1115,7 +1139,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } container := &testCase.pod.Spec.Containers[0] - err := policy.Allocate(st, testCase.pod, container) + err = policy.Allocate(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v", testCase.description, testCase.expErr, err) @@ -1128,13 +1152,13 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { testCase.description, container.Name, st.assignments) } - if !reflect.DeepEqual(cset, testCase.expCSet) { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v", + if !cset.Equals(testCase.expCSet) { + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s", testCase.description, testCase.expCSet, cset) } if !cset.Intersection(st.defaultCPUSet).IsEmpty() { - t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v", + t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s", testCase.description, cset, st.defaultCPUSet) } } diff --git a/pkg/kubelet/cm/cpumanager/topology/alignment.go b/pkg/kubelet/cm/cpumanager/topology/alignment.go new file mode 100644 index 00000000000..4f3dfe29762 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/topology/alignment.go @@ -0,0 +1,78 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "fmt" + + "k8s.io/utils/cpuset" +) + +// Alignment is metadata about a cpuset allocation +type Alignment struct { + // UncoreCache is true if all the CPUs are uncore-cache aligned, + // IOW if they all share the same Uncore cache block. + // If the allocated CPU count is greater than a Uncore Group size, + // CPUs can't be uncore-aligned; otherwise, they are. + // This flag tracks alignment, not interference or lack thereof. + UncoreCache bool +} + +func (ca Alignment) String() string { + return fmt.Sprintf("aligned=", ca.UncoreCache) +} + +// Allocation represents a CPU set plus alignment metadata +type Allocation struct { + CPUs cpuset.CPUSet + Aligned Alignment +} + +func (ca Allocation) String() string { + return ca.CPUs.String() + " " + ca.Aligned.String() +} + +// EmptyAllocation returns a new zero-valued CPU allocation. Please note that +// a empty cpuset is aligned according to every possible way we can consider +func EmptyAllocation() Allocation { + return Allocation{ + CPUs: cpuset.New(), + Aligned: Alignment{ + UncoreCache: true, + }, + } +} + +func isAlignedAtUncoreCache(topo *CPUTopology, cpuList ...int) bool { + if len(cpuList) <= 1 { + return true + } + reference, ok := topo.CPUDetails[cpuList[0]] + if !ok { + return false + } + for _, cpu := range cpuList[1:] { + info, ok := topo.CPUDetails[cpu] + if !ok { + return false + } + if info.UncoreCacheID != reference.UncoreCacheID { + return false + } + } + return true +} diff --git a/pkg/kubelet/cm/cpumanager/topology/alignment_test.go b/pkg/kubelet/cm/cpumanager/topology/alignment_test.go new file mode 100644 index 00000000000..2fc0dd08a67 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/topology/alignment_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "reflect" + "testing" + + "k8s.io/utils/cpuset" +) + +func TestNewAlignment(t *testing.T) { + topo := &CPUTopology{ + NumCPUs: 32, + NumSockets: 1, + NumCores: 32, + NumNUMANodes: 1, + NumUncoreCache: 8, + CPUDetails: map[int]CPUInfo{ + 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 1: {CoreID: 1, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 3: {CoreID: 3, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0}, + 4: {CoreID: 4, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 5: {CoreID: 5, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 6: {CoreID: 6, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 7: {CoreID: 7, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1}, + 8: {CoreID: 8, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 9: {CoreID: 9, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 10: {CoreID: 10, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 11: {CoreID: 11, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2}, + 12: {CoreID: 12, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 13: {CoreID: 13, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 14: {CoreID: 14, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 15: {CoreID: 15, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3}, + 16: {CoreID: 16, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 17: {CoreID: 17, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 18: {CoreID: 18, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 19: {CoreID: 19, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4}, + 20: {CoreID: 20, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 21: {CoreID: 21, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 22: {CoreID: 22, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 23: {CoreID: 23, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5}, + 24: {CoreID: 24, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 25: {CoreID: 25, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 26: {CoreID: 26, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 27: {CoreID: 27, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6}, + 28: {CoreID: 28, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + 29: {CoreID: 29, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + 30: {CoreID: 30, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + 31: {CoreID: 31, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7}, + }, + } + + tests := []struct { + name string + topo *CPUTopology + cpus cpuset.CPUSet + want Alignment + }{{ + name: "empty cpuset", + topo: topo, + cpus: cpuset.New(), + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "single random CPU", + topo: topo, + cpus: cpuset.New(11), // any single id is fine, no special meaning + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "less CPUs than a uncore cache group", + topo: topo, + cpus: cpuset.New(29, 30, 31), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "enough CPUs to fill a uncore cache group", + topo: topo, + cpus: cpuset.New(8, 9, 10, 11), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: true, + }, + }, { + name: "more CPUs than a full a uncore cache group", + topo: topo, + cpus: cpuset.New(9, 10, 11, 23), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: false, + }, + }, { + name: "enough CPUs to exactly fill multiple uncore cache groups", + topo: topo, + cpus: cpuset.New(8, 9, 10, 11, 16, 17, 18, 19), // random cpus as long as they belong to the same uncore cache + want: Alignment{ + UncoreCache: false, + }, + }} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.topo.CheckAlignment(tt.cpus) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("AlignmentFromCPUSet() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/kubelet/cm/cpumanager/topology/topology.go b/pkg/kubelet/cm/cpumanager/topology/topology.go index 18b38233666..36291e6f5af 100644 --- a/pkg/kubelet/cm/cpumanager/topology/topology.go +++ b/pkg/kubelet/cm/cpumanager/topology/topology.go @@ -101,6 +101,15 @@ func (topo *CPUTopology) CPUNUMANodeID(cpu int) (int, error) { return info.NUMANodeID, nil } +// CheckAlignment returns alignment information for the given cpuset in +// the context of the current CPU topology +func (topo *CPUTopology) CheckAlignment(cpus cpuset.CPUSet) Alignment { + cpuList := cpus.UnsortedList() + return Alignment{ + UncoreCache: isAlignedAtUncoreCache(topo, cpuList...), + } +} + // CPUInfo contains the NUMA, socket, UncoreCache and core IDs associated with a CPU. type CPUInfo struct { NUMANodeID int diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index b4c58b7ca26..04d62880b0b 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -150,6 +150,7 @@ const ( AlignedPhysicalCPU = "physical_cpu" AlignedNUMANode = "numa_node" + AlignedUncoreCache = "uncore_cache" // Metrics to track kubelet admission rejections. AdmissionRejectionsTotalKey = "admission_rejections_total" diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go index 03afe0c1e79..b4db0f40806 100644 --- a/test/e2e_node/cpu_manager_metrics_test.go +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -19,6 +19,7 @@ package e2enode import ( "context" "fmt" + "strconv" "time" "github.com/onsi/ginkgo/v2" @@ -50,7 +51,10 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa ginkgo.Context("when querying /metrics", func() { var oldCfg *kubeletconfig.KubeletConfiguration var testPod *v1.Pod + var cpuAlloc int64 var smtLevel int + var uncoreGroupSize int + var hasSplitUncore bool ginkgo.BeforeEach(func(ctx context.Context) { var err error @@ -60,7 +64,7 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa } fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption) - _, cpuAlloc, _ := getLocalNodeCPUDetails(ctx, f) + _, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f) smtLevel = getSMTLevel() // strict SMT alignment is trivially verified and granted on non-SMT systems @@ -75,10 +79,22 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa framework.Logf("SMT level %d", smtLevel) + uncoreGroupSize = getUncoreCPUGroupSize() + if uncoreGroupSize == 0 { + hasSplitUncore = false + } else { + // check we do physically have split Uncore but also we have enough CPUs available to run + // meaningful tests. We need them both. + hasSplitUncore = (cpuAlloc > int64(uncoreGroupSize)) + } + + framework.Logf("Uncore Group Size %d; Split Uncore detected=%v", uncoreGroupSize, hasSplitUncore) + // TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably // check what we do have in the node. cpuPolicyOptions := map[string]string{ - cpumanager.FullPCPUsOnlyOption: "true", + cpumanager.FullPCPUsOnlyOption: "true", + cpumanager.PreferAlignByUnCoreCacheOption: strconv.FormatBool(hasSplitUncore), } newCfg := configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ @@ -177,7 +193,7 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) }) - ginkgo.It("should return updated alignment counters when pod successfully run", func(ctx context.Context) { + ginkgo.It("should return updated SMT alignment counters when pod successfully run", func(ctx context.Context) { ginkgo.By("Creating the test pod") testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-smt-ok", smtLevel)) @@ -289,6 +305,90 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa ginkgo.By("Ensuring the metrics match the expectations a few more times") gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsIdle) }) + + ginkgo.It("should update alignment counters when pod successfully run taking less than uncore cache group", func(ctx context.Context) { + if !hasSplitUncore { + e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected") + } + if smtLevel >= uncoreGroupSize { + // this doesn't make sense according to the very definition of uncore cache (a cache which spans across core blocks, + // and thread siblings belong to the same block and they share a exclusive cache block) + // so it has to be a configuration or detection issue. Fail loudly. + framework.Failf("Failed preconditions for CPU Manager uncore alignment test - SMT level more than Uncore group size - this is unexpected") + } + + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", smtLevel)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + idFn := makeCustomPairID("scope", "boundary") + matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{ + "container::uncore_cache": timelessSample(1), + }), + }) + + ginkgo.By("Giving the Kubelet time to update the alignment metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + }) + + ginkgo.It("should update alignment counters when pod successfully run taking a full uncore cache group", func(ctx context.Context) { + if !hasSplitUncore { + e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected") + } + + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", uncoreGroupSize)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + idFn := makeCustomPairID("scope", "boundary") + matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{ + "container::uncore_cache": timelessSample(1), + }), + }) + + ginkgo.By("Giving the Kubelet time to update the alignment metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + }) + + ginkgo.It("should not update alignment counters when pod successfully run taking more than a uncore cache group", func(ctx context.Context) { + if !hasSplitUncore { + e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected") + } + if cpuAlloc < int64(uncoreGroupSize+smtLevel) { + e2eskipper.Skipf("Skipping CPU Manager uncore alignment test - not enough available CPUs (needs %d allocatable %d)", uncoreGroupSize+smtLevel, cpuAlloc) + } + + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", uncoreGroupSize+smtLevel)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + idFn := makeCustomPairID("scope", "boundary") + matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{ + "container::uncore_cache": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to update the alignment metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) + }) }) }) diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index c46e96166d3..c6546dc773a 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -18,7 +18,10 @@ package e2enode import ( "context" + "errors" "fmt" + "io/fs" + "os" "os/exec" "regexp" "strconv" @@ -253,6 +256,19 @@ func getSMTLevel() int { return cpus.Size() } +func getUncoreCPUGroupSize() int { + cpuID := 0 // this is just the most likely cpu to be present in a random system. No special meaning besides this. + out, err := os.ReadFile(fmt.Sprintf("/sys/devices/system/cpu/cpu%d/cache/index3/shared_cpu_list", cpuID)) + if errors.Is(err, fs.ErrNotExist) { + return 0 // no Uncore/LLC cache detected, nothing to do + } + framework.ExpectNoError(err) + // how many cores share a same Uncore/LLC block? + cpus, err := cpuset.Parse(strings.TrimSpace(string(out))) + framework.ExpectNoError(err) + return cpus.Size() +} + func getCPUSiblingList(cpuRes int64) string { out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuRes)).Output() framework.ExpectNoError(err)