Merge pull request #130133 from ffromani/split-uncore-metrics

node: cpumgr: metrics: add uncore cache alignment metrics
This commit is contained in:
Kubernetes Prow Robot 2025-03-14 08:55:46 -07:00 committed by GitHub
commit 83d33a927a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 412 additions and 51 deletions

View File

@ -330,6 +330,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
}
return
}
// TODO: move in updateMetricsOnAllocate
if p.options.FullPhysicalCPUsOnly {
// increment only if we know we allocate aligned resources
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Inc()
@ -369,8 +370,8 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
}
}
}
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
if cset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cset)
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
return nil
}
@ -380,17 +381,17 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
cpuAllocation, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
if err != nil {
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
p.updateCPUsToReuse(pod, container, cpuset)
p.updateMetricsOnAllocate(cpuset)
s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs)
p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs)
p.updateMetricsOnAllocate(cpuAllocation)
klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuset)
klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String())
return nil
}
@ -420,13 +421,13 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
return nil
}
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (topology.Allocation, error) {
klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)
allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs)
// If there are aligned CPUs in numaAffinity, attempt to take those first.
result := cpuset.New()
result := topology.EmptyAllocation()
if numaAffinity != nil {
alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs)
@ -435,25 +436,26 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
numAlignedToAlloc = numCPUs
}
alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
allocatedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
if err != nil {
return cpuset.New(), err
return topology.EmptyAllocation(), err
}
result = result.Union(alignedCPUs)
result.CPUs = result.CPUs.Union(allocatedCPUs)
}
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size())
remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result.CPUs), numCPUs-result.CPUs.Size())
if err != nil {
return cpuset.New(), err
return topology.EmptyAllocation(), err
}
result = result.Union(remainingCPUs)
result.CPUs = result.CPUs.Union(remainingCPUs)
result.Aligned = p.topology.CheckAlignment(result.CPUs)
// Remove allocated CPUs from the shared CPUSet.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result.CPUs))
klog.InfoS("AllocateCPUs", "result", result)
klog.InfoS("AllocateCPUs", "result", result.String())
return result, nil
}
@ -755,12 +757,17 @@ func (p *staticPolicy) initializeMetrics(s state.State) {
metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000))
metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s)))
metrics.ContainerAlignedComputeResourcesFailure.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Add(0) // ensure the value exists
}
func (p *staticPolicy) updateMetricsOnAllocate(cset cpuset.CPUSet) {
ncpus := cset.Size()
func (p *staticPolicy) updateMetricsOnAllocate(cpuAlloc topology.Allocation) {
ncpus := cpuAlloc.CPUs.Size()
metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus))
metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000))
if cpuAlloc.Aligned.UncoreCache {
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Inc()
}
}
func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) {

View File

@ -70,7 +70,10 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest {
}
func TestStaticPolicyName(t *testing.T) {
policy, _ := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil)
policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
policyName := policy.Name()
if policyName != "static" {
@ -168,13 +171,16 @@ func TestStaticPolicyStart(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)
p, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options)
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
policy := p.(*staticPolicy)
st := &mockState{
assignments: testCase.stAssignments,
defaultCPUSet: testCase.stDefaultCPUSet,
}
err := policy.Start(st)
err = policy.Start(st)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
@ -637,7 +643,10 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
if testCase.reservedCPUs != nil {
cpus = testCase.reservedCPUs.Clone()
}
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options)
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
st := &mockState{
assignments: testCase.stAssignments,
@ -645,7 +654,7 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
}
container := &testCase.pod.Spec.Containers[0]
err := policy.Allocate(st, testCase.pod, container)
err = policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %q but got: %q",
testCase.description, testCase.expErr, err)
@ -658,13 +667,13 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
testCase.description, container.Name, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
if !cset.Equals(testCase.expCSet) {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s",
testCase.description, testCase.expCSet, cset)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s",
testCase.description, cset, st.defaultCPUSet)
}
}
@ -708,7 +717,10 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
st := &mockState{
assignments: testCase.stAssignments,
@ -720,16 +732,16 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
policy.Allocate(st, pod, &container)
}
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) {
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v",
if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) {
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s",
testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet)
}
// remove
policy.RemoveContainer(st, string(pod.UID), testCase.containerName)
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterRemove) {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v",
if !st.defaultCPUSet.Equals(testCase.expCSetAfterRemove) {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %sv but got %s",
testCase.description, testCase.expCSetAfterRemove, st.defaultCPUSet)
}
if _, found := st.assignments[string(pod.UID)][testCase.containerName]; found {
@ -761,7 +773,10 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
st := &mockState{
assignments: testCase.stAssignments,
@ -777,8 +792,8 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
testCase.description, err)
}
}
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSetAfterAlloc) {
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %v but got %v",
if !st.defaultCPUSet.Equals(testCase.expCSetAfterAlloc) {
t.Errorf("StaticPolicy Allocate() error (%v). expected default cpuset %s but got %s",
testCase.description, testCase.expCSetAfterAlloc, st.defaultCPUSet)
}
}
@ -843,7 +858,10 @@ func TestStaticPolicyRemove(t *testing.T) {
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
st := &mockState{
assignments: testCase.stAssignments,
@ -852,8 +870,8 @@ func TestStaticPolicyRemove(t *testing.T) {
policy.RemoveContainer(st, testCase.podUID, testCase.containerName)
if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSet) {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v",
if !st.defaultCPUSet.Equals(testCase.expCSet) {
t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %s but got %s",
testCase.description, testCase.expCSet, st.defaultCPUSet)
}
@ -933,28 +951,31 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
},
}
for _, tc := range testCases {
p, _ := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil)
p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
policy := p.(*staticPolicy)
st := &mockState{
assignments: tc.stAssignments,
defaultCPUSet: tc.stDefaultCPUSet,
}
err := policy.Start(st)
err = policy.Start(st)
if err != nil {
t.Errorf("StaticPolicy Start() error (%v)", err)
continue
}
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New())
cpuAlloc, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.New())
if err != nil {
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
tc.description, tc.expCSet, err)
continue
}
if !reflect.DeepEqual(tc.expCSet, cset) {
if !tc.expCSet.Equals(cpuAlloc.CPUs) {
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v but got %v",
tc.description, tc.expCSet, cset)
tc.description, tc.expCSet, cpuAlloc.CPUs)
}
}
}
@ -1107,7 +1128,10 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
}
for _, testCase := range testCases {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
st := &mockState{
assignments: testCase.stAssignments,
@ -1115,7 +1139,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
}
container := &testCase.pod.Spec.Containers[0]
err := policy.Allocate(st, testCase.pod, container)
err = policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("StaticPolicy Allocate() error (%v). expected add error: %v but got: %v",
testCase.description, testCase.expErr, err)
@ -1128,13 +1152,13 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
testCase.description, container.Name, st.assignments)
}
if !reflect.DeepEqual(cset, testCase.expCSet) {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v but got %v",
if !cset.Equals(testCase.expCSet) {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s but got %s",
testCase.description, testCase.expCSet, cset)
}
if !cset.Intersection(st.defaultCPUSet).IsEmpty() {
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %v to be disoint from the shared cpuset %v",
t.Errorf("StaticPolicy Allocate() error (%v). expected cpuset %s to be disoint from the shared cpuset %s",
testCase.description, cset, st.defaultCPUSet)
}
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topology
import (
"fmt"
"k8s.io/utils/cpuset"
)
// Alignment is metadata about a cpuset allocation
type Alignment struct {
// UncoreCache is true if all the CPUs are uncore-cache aligned,
// IOW if they all share the same Uncore cache block.
// If the allocated CPU count is greater than a Uncore Group size,
// CPUs can't be uncore-aligned; otherwise, they are.
// This flag tracks alignment, not interference or lack thereof.
UncoreCache bool
}
func (ca Alignment) String() string {
return fmt.Sprintf("aligned=<uncore:%v>", ca.UncoreCache)
}
// Allocation represents a CPU set plus alignment metadata
type Allocation struct {
CPUs cpuset.CPUSet
Aligned Alignment
}
func (ca Allocation) String() string {
return ca.CPUs.String() + " " + ca.Aligned.String()
}
// EmptyAllocation returns a new zero-valued CPU allocation. Please note that
// a empty cpuset is aligned according to every possible way we can consider
func EmptyAllocation() Allocation {
return Allocation{
CPUs: cpuset.New(),
Aligned: Alignment{
UncoreCache: true,
},
}
}
func isAlignedAtUncoreCache(topo *CPUTopology, cpuList ...int) bool {
if len(cpuList) <= 1 {
return true
}
reference, ok := topo.CPUDetails[cpuList[0]]
if !ok {
return false
}
for _, cpu := range cpuList[1:] {
info, ok := topo.CPUDetails[cpu]
if !ok {
return false
}
if info.UncoreCacheID != reference.UncoreCacheID {
return false
}
}
return true
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topology
import (
"reflect"
"testing"
"k8s.io/utils/cpuset"
)
func TestNewAlignment(t *testing.T) {
topo := &CPUTopology{
NumCPUs: 32,
NumSockets: 1,
NumCores: 32,
NumNUMANodes: 1,
NumUncoreCache: 8,
CPUDetails: map[int]CPUInfo{
0: {CoreID: 0, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
1: {CoreID: 1, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
2: {CoreID: 2, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
3: {CoreID: 3, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
4: {CoreID: 4, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
5: {CoreID: 5, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
6: {CoreID: 6, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
7: {CoreID: 7, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
8: {CoreID: 8, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
9: {CoreID: 9, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
10: {CoreID: 10, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
11: {CoreID: 11, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
12: {CoreID: 12, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
13: {CoreID: 13, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
14: {CoreID: 14, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
15: {CoreID: 15, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
16: {CoreID: 16, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4},
17: {CoreID: 17, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4},
18: {CoreID: 18, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4},
19: {CoreID: 19, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 4},
20: {CoreID: 20, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5},
21: {CoreID: 21, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5},
22: {CoreID: 22, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5},
23: {CoreID: 23, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 5},
24: {CoreID: 24, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6},
25: {CoreID: 25, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6},
26: {CoreID: 26, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6},
27: {CoreID: 27, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 6},
28: {CoreID: 28, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7},
29: {CoreID: 29, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7},
30: {CoreID: 30, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7},
31: {CoreID: 31, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 7},
},
}
tests := []struct {
name string
topo *CPUTopology
cpus cpuset.CPUSet
want Alignment
}{{
name: "empty cpuset",
topo: topo,
cpus: cpuset.New(),
want: Alignment{
UncoreCache: true,
},
}, {
name: "single random CPU",
topo: topo,
cpus: cpuset.New(11), // any single id is fine, no special meaning
want: Alignment{
UncoreCache: true,
},
}, {
name: "less CPUs than a uncore cache group",
topo: topo,
cpus: cpuset.New(29, 30, 31), // random cpus as long as they belong to the same uncore cache
want: Alignment{
UncoreCache: true,
},
}, {
name: "enough CPUs to fill a uncore cache group",
topo: topo,
cpus: cpuset.New(8, 9, 10, 11), // random cpus as long as they belong to the same uncore cache
want: Alignment{
UncoreCache: true,
},
}, {
name: "more CPUs than a full a uncore cache group",
topo: topo,
cpus: cpuset.New(9, 10, 11, 23), // random cpus as long as they belong to the same uncore cache
want: Alignment{
UncoreCache: false,
},
}, {
name: "enough CPUs to exactly fill multiple uncore cache groups",
topo: topo,
cpus: cpuset.New(8, 9, 10, 11, 16, 17, 18, 19), // random cpus as long as they belong to the same uncore cache
want: Alignment{
UncoreCache: false,
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.topo.CheckAlignment(tt.cpus)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("AlignmentFromCPUSet() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -101,6 +101,15 @@ func (topo *CPUTopology) CPUNUMANodeID(cpu int) (int, error) {
return info.NUMANodeID, nil
}
// CheckAlignment returns alignment information for the given cpuset in
// the context of the current CPU topology
func (topo *CPUTopology) CheckAlignment(cpus cpuset.CPUSet) Alignment {
cpuList := cpus.UnsortedList()
return Alignment{
UncoreCache: isAlignedAtUncoreCache(topo, cpuList...),
}
}
// CPUInfo contains the NUMA, socket, UncoreCache and core IDs associated with a CPU.
type CPUInfo struct {
NUMANodeID int

View File

@ -150,6 +150,7 @@ const (
AlignedPhysicalCPU = "physical_cpu"
AlignedNUMANode = "numa_node"
AlignedUncoreCache = "uncore_cache"
// Metrics to track kubelet admission rejections.
AdmissionRejectionsTotalKey = "admission_rejections_total"

View File

@ -19,6 +19,7 @@ package e2enode
import (
"context"
"fmt"
"strconv"
"time"
"github.com/onsi/ginkgo/v2"
@ -50,7 +51,10 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa
ginkgo.Context("when querying /metrics", func() {
var oldCfg *kubeletconfig.KubeletConfiguration
var testPod *v1.Pod
var cpuAlloc int64
var smtLevel int
var uncoreGroupSize int
var hasSplitUncore bool
ginkgo.BeforeEach(func(ctx context.Context) {
var err error
@ -60,7 +64,7 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa
}
fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption)
_, cpuAlloc, _ := getLocalNodeCPUDetails(ctx, f)
_, cpuAlloc, _ = getLocalNodeCPUDetails(ctx, f)
smtLevel = getSMTLevel()
// strict SMT alignment is trivially verified and granted on non-SMT systems
@ -75,10 +79,22 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa
framework.Logf("SMT level %d", smtLevel)
uncoreGroupSize = getUncoreCPUGroupSize()
if uncoreGroupSize == 0 {
hasSplitUncore = false
} else {
// check we do physically have split Uncore but also we have enough CPUs available to run
// meaningful tests. We need them both.
hasSplitUncore = (cpuAlloc > int64(uncoreGroupSize))
}
framework.Logf("Uncore Group Size %d; Split Uncore detected=%v", uncoreGroupSize, hasSplitUncore)
// TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably
// check what we do have in the node.
cpuPolicyOptions := map[string]string{
cpumanager.FullPCPUsOnlyOption: "true",
cpumanager.FullPCPUsOnlyOption: "true",
cpumanager.PreferAlignByUnCoreCacheOption: strconv.FormatBool(hasSplitUncore),
}
newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
@ -177,7 +193,7 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa
gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
})
ginkgo.It("should return updated alignment counters when pod successfully run", func(ctx context.Context) {
ginkgo.It("should return updated SMT alignment counters when pod successfully run", func(ctx context.Context) {
ginkgo.By("Creating the test pod")
testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-smt-ok", smtLevel))
@ -289,6 +305,90 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa
ginkgo.By("Ensuring the metrics match the expectations a few more times")
gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsIdle)
})
ginkgo.It("should update alignment counters when pod successfully run taking less than uncore cache group", func(ctx context.Context) {
if !hasSplitUncore {
e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected")
}
if smtLevel >= uncoreGroupSize {
// this doesn't make sense according to the very definition of uncore cache (a cache which spans across core blocks,
// and thread siblings belong to the same block and they share a exclusive cache block)
// so it has to be a configuration or detection issue. Fail loudly.
framework.Failf("Failed preconditions for CPU Manager uncore alignment test - SMT level more than Uncore group size - this is unexpected")
}
ginkgo.By("Creating the test pod")
testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", smtLevel))
// we updated the kubelet config in BeforeEach, so we can assume we start fresh.
// being [Serial], we can also assume noone else but us is running pods.
ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted")
idFn := makeCustomPairID("scope", "boundary")
matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
"kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{
"container::uncore_cache": timelessSample(1),
}),
})
ginkgo.By("Giving the Kubelet time to update the alignment metrics")
gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics)
ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times")
gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics)
})
ginkgo.It("should update alignment counters when pod successfully run taking a full uncore cache group", func(ctx context.Context) {
if !hasSplitUncore {
e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected")
}
ginkgo.By("Creating the test pod")
testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", uncoreGroupSize))
// we updated the kubelet config in BeforeEach, so we can assume we start fresh.
// being [Serial], we can also assume noone else but us is running pods.
ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted")
idFn := makeCustomPairID("scope", "boundary")
matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
"kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{
"container::uncore_cache": timelessSample(1),
}),
})
ginkgo.By("Giving the Kubelet time to update the alignment metrics")
gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics)
ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times")
gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics)
})
ginkgo.It("should not update alignment counters when pod successfully run taking more than a uncore cache group", func(ctx context.Context) {
if !hasSplitUncore {
e2eskipper.Skip("Skipping CPU Manager uncore alignment test - not split Uncore detected")
}
if cpuAlloc < int64(uncoreGroupSize+smtLevel) {
e2eskipper.Skipf("Skipping CPU Manager uncore alignment test - not enough available CPUs (needs %d allocatable %d)", uncoreGroupSize+smtLevel, cpuAlloc)
}
ginkgo.By("Creating the test pod")
testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("count-align-uncore-ok", uncoreGroupSize+smtLevel))
// we updated the kubelet config in BeforeEach, so we can assume we start fresh.
// being [Serial], we can also assume noone else but us is running pods.
ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted")
idFn := makeCustomPairID("scope", "boundary")
matchAlignmentMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
"kubelet_container_aligned_compute_resources_count": gstruct.MatchElements(idFn, gstruct.IgnoreExtras, gstruct.Elements{
"container::uncore_cache": timelessSample(0),
}),
})
ginkgo.By("Giving the Kubelet time to update the alignment metrics")
gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics)
ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times")
gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics)
})
})
})

View File

@ -18,7 +18,10 @@ package e2enode
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"os/exec"
"regexp"
"strconv"
@ -253,6 +256,19 @@ func getSMTLevel() int {
return cpus.Size()
}
func getUncoreCPUGroupSize() int {
cpuID := 0 // this is just the most likely cpu to be present in a random system. No special meaning besides this.
out, err := os.ReadFile(fmt.Sprintf("/sys/devices/system/cpu/cpu%d/cache/index3/shared_cpu_list", cpuID))
if errors.Is(err, fs.ErrNotExist) {
return 0 // no Uncore/LLC cache detected, nothing to do
}
framework.ExpectNoError(err)
// how many cores share a same Uncore/LLC block?
cpus, err := cpuset.Parse(strings.TrimSpace(string(out)))
framework.ExpectNoError(err)
return cpus.Size()
}
func getCPUSiblingList(cpuRes int64) string {
out, err := exec.Command("/bin/sh", "-c", fmt.Sprintf("cat /sys/devices/system/cpu/cpu%d/topology/thread_siblings_list | tr -d \"\n\r\"", cpuRes)).Output()
framework.ExpectNoError(err)