Merge pull request #130491 from swatisehgal/cpu-allocation-numa-spread-metric

Add metrics to capture CPU allocation spread across NUMA nodes
This commit is contained in:
Kubernetes Prow Robot 2025-03-19 11:52:40 -07:00 committed by GitHub
commit 1039822a74
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 184 additions and 11 deletions

View File

@ -18,6 +18,7 @@ package cpumanager
import ( import (
"fmt" "fmt"
"strconv"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -389,7 +390,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs) s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs)
p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs) p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs)
p.updateMetricsOnAllocate(cpuAllocation) p.updateMetricsOnAllocate(s, cpuAllocation)
klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String()) klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String())
return nil return nil
@ -416,7 +417,8 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
// Mutate the shared pool, adding released cpus. // Mutate the shared pool, adding released cpus.
toRelease = toRelease.Difference(cpusInUse) toRelease = toRelease.Difference(cpusInUse)
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
p.updateMetricsOnRelease(toRelease) p.updateMetricsOnRelease(s, toRelease)
} }
return nil return nil
} }
@ -755,33 +757,60 @@ func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableC
func (p *staticPolicy) initializeMetrics(s state.State) { func (p *staticPolicy) initializeMetrics(s state.State) {
metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000)) metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000))
metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s)))
metrics.ContainerAlignedComputeResourcesFailure.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists metrics.ContainerAlignedComputeResourcesFailure.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Add(0) // ensure the value exists metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Add(0) // ensure the value exists
totalAssignedCPUs := getTotalAssignedExclusiveCPUs(s)
metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(totalAssignedCPUs.Size()))
updateAllocationPerNUMAMetric(p.topology, totalAssignedCPUs)
} }
func (p *staticPolicy) updateMetricsOnAllocate(cpuAlloc topology.Allocation) { func (p *staticPolicy) updateMetricsOnAllocate(s state.State, cpuAlloc topology.Allocation) {
ncpus := cpuAlloc.CPUs.Size() ncpus := cpuAlloc.CPUs.Size()
metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus)) metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus))
metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000)) metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000))
if cpuAlloc.Aligned.UncoreCache { if cpuAlloc.Aligned.UncoreCache {
metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Inc() metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Inc()
} }
totalAssignedCPUs := getTotalAssignedExclusiveCPUs(s)
updateAllocationPerNUMAMetric(p.topology, totalAssignedCPUs)
} }
func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) { func (p *staticPolicy) updateMetricsOnRelease(s state.State, cset cpuset.CPUSet) {
ncpus := cset.Size() ncpus := cset.Size()
metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(-ncpus)) metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(-ncpus))
metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(ncpus * 1000)) metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(ncpus * 1000))
totalAssignedCPUs := getTotalAssignedExclusiveCPUs(s)
updateAllocationPerNUMAMetric(p.topology, totalAssignedCPUs.Difference(cset))
} }
func countExclusiveCPUs(s state.State) int { func getTotalAssignedExclusiveCPUs(s state.State) cpuset.CPUSet {
exclusiveCPUs := 0 totalAssignedCPUs := cpuset.New()
for _, cpuAssign := range s.GetCPUAssignments() { for _, assignment := range s.GetCPUAssignments() {
for _, cset := range cpuAssign { for _, cset := range assignment {
exclusiveCPUs += cset.Size() totalAssignedCPUs = totalAssignedCPUs.Union(cset)
} }
}
return totalAssignedCPUs
}
func updateAllocationPerNUMAMetric(topo *topology.CPUTopology, allocatedCPUs cpuset.CPUSet) {
numaCount := make(map[int]int)
// Count CPUs allocated per NUMA node
for _, cpuID := range allocatedCPUs.UnsortedList() {
numaNode, err := topo.CPUNUMANodeID(cpuID)
if err != nil {
//NOTE: We are logging the error but it is highly unlikely to happen as the CPUset
// is already computed, evaluated and there is no room for user tampering.
klog.ErrorS(err, "Unable to determine NUMA node", "cpuID", cpuID)
}
numaCount[numaNode]++
}
// Update metric
for numaNode, count := range numaCount {
metrics.CPUManagerAllocationPerNUMA.WithLabelValues(strconv.Itoa(numaNode)).Set(float64(count))
} }
return exclusiveCPUs
} }

View File

@ -113,6 +113,7 @@ const (
CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total"
CPUManagerSharedPoolSizeMilliCoresKey = "cpu_manager_shared_pool_size_millicores" CPUManagerSharedPoolSizeMilliCoresKey = "cpu_manager_shared_pool_size_millicores"
CPUManagerExclusiveCPUsAllocationCountKey = "cpu_manager_exclusive_cpu_allocation_count" CPUManagerExclusiveCPUsAllocationCountKey = "cpu_manager_exclusive_cpu_allocation_count"
CPUManagerAllocationPerNUMAKey = "cpu_manager_allocation_per_numa"
// Metrics to track the Memory manager behavior // Metrics to track the Memory manager behavior
MemoryManagerPinningRequestsTotalKey = "memory_manager_pinning_requests_total" MemoryManagerPinningRequestsTotalKey = "memory_manager_pinning_requests_total"
@ -815,6 +816,17 @@ var (
}, },
) )
// CPUManagerAllocationPerNUMA tracks the count of CPUs allocated per NUMA node
CPUManagerAllocationPerNUMA = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: KubeletSubsystem,
Name: CPUManagerAllocationPerNUMAKey,
Help: "Number of CPUs allocated per NUMA node",
StabilityLevel: metrics.ALPHA,
},
[]string{AlignedNUMANode},
)
// ContainerAlignedComputeResources reports the count of resources allocation which granted aligned resources, per alignment boundary // ContainerAlignedComputeResources reports the count of resources allocation which granted aligned resources, per alignment boundary
ContainerAlignedComputeResources = metrics.NewCounterVec( ContainerAlignedComputeResources = metrics.NewCounterVec(
&metrics.CounterOpts{ &metrics.CounterOpts{
@ -1126,6 +1138,7 @@ func Register(collectors ...metrics.StableCollector) {
legacyregistry.MustRegister(CPUManagerPinningErrorsTotal) legacyregistry.MustRegister(CPUManagerPinningErrorsTotal)
legacyregistry.MustRegister(CPUManagerSharedPoolSizeMilliCores) legacyregistry.MustRegister(CPUManagerSharedPoolSizeMilliCores)
legacyregistry.MustRegister(CPUManagerExclusiveCPUsAllocationCount) legacyregistry.MustRegister(CPUManagerExclusiveCPUsAllocationCount)
legacyregistry.MustRegister(CPUManagerAllocationPerNUMA)
legacyregistry.MustRegister(ContainerAlignedComputeResources) legacyregistry.MustRegister(ContainerAlignedComputeResources)
legacyregistry.MustRegister(ContainerAlignedComputeResourcesFailure) legacyregistry.MustRegister(ContainerAlignedComputeResourcesFailure)
legacyregistry.MustRegister(MemoryManagerPinningRequestTotal) legacyregistry.MustRegister(MemoryManagerPinningRequestTotal)

View File

@ -34,6 +34,7 @@ import (
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -389,6 +390,129 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa
ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times")
gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics)
}) })
ginkgo.It("should report zero counters for allocation per NUMA after a fresh restart", func(ctx context.Context) {
cpuPolicyOptions := map[string]string{
cpumanager.DistributeCPUsAcrossNUMAOption: "true",
cpumanager.FullPCPUsOnlyOption: "true",
}
newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
reservedSystemCPUs: cpuset.New(0),
enableCPUManagerOptions: true,
options: cpuPolicyOptions,
},
)
updateKubeletConfig(ctx, f, newCfg, true)
ginkgo.By("Checking the cpumanager allocation per NUMA metric right after the kubelet restart, with no pods running")
numaNodes, _, _ := hostCheck()
framework.Logf("numaNodes on the system %d", numaNodes)
keys := make(map[interface{}]types.GomegaMatcher)
idFn := makeCustomLabelID(metrics.AlignedNUMANode)
for i := 0; i < numaNodes; i++ {
keys["kubelet_cpu_manager_allocation_per_numa"] = gstruct.MatchAllElements(idFn, gstruct.Elements{
fmt.Sprintf("%d", i): timelessSample(0),
})
}
matchSpreadMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, keys)
ginkgo.By("Giving the Kubelet time to start up and produce metrics")
gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics)
ginkgo.By("Ensuring the metrics match the expectations a few more times")
gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics)
})
ginkgo.It("should report allocation per NUMA metric when handling guaranteed pods", func(ctx context.Context) {
var cpusNumPerNUMA, coresNumPerNUMA, numaNodes, threadsPerCore int
cpuPolicyOptions := map[string]string{
cpumanager.DistributeCPUsAcrossNUMAOption: "true",
cpumanager.FullPCPUsOnlyOption: "true",
}
newCfg := configureCPUManagerInKubelet(oldCfg,
&cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
reservedSystemCPUs: cpuset.New(0),
enableCPUManagerOptions: true,
options: cpuPolicyOptions,
},
)
updateKubeletConfig(ctx, f, newCfg, true)
numaNodes, coresNumPerNUMA, threadsPerCore = hostCheck()
cpusNumPerNUMA = coresNumPerNUMA * threadsPerCore
framework.Logf("numaNodes on the system %d", numaNodes)
framework.Logf("Cores per NUMA on the system %d", coresNumPerNUMA)
framework.Logf("Threads per Core on the system %d", threadsPerCore)
framework.Logf("CPUs per NUMA on the system %d", cpusNumPerNUMA)
smtLevel = getSMTLevel()
framework.Logf("SMT Level on the system %d", smtLevel)
ginkgo.By("Querying the podresources endpoint to get the baseline")
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err, "GetV1Client() failed err: %v", err)
defer func() {
framework.ExpectNoError(conn.Close())
}()
ginkgo.By("Checking the pool allocatable resources from the kubelet")
resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectNoError(err, "failed to get the kubelet allocatable resources")
allocatableCPUs, _ := demuxCPUsAndDevicesFromGetAllocatableResources(resp)
// 'distribute-cpus-across-numa' policy option ensures that CPU allocations are evenly distributed
// across NUMA nodes in cases where more than one NUMA node is required to satisfy the allocation.
// So, we want to ensure that the CPU Request exceeds the number of CPUs that can fit within a single
// NUMA node. We have to pick cpuRequest such that:
// 1. CPURequest > cpusNumPerNUMA
// 2. Not occupy all the CPUs on the node ande leave room for reserved CPU
// 3. CPURequest is a multiple if number of NUMA nodes to allow equal CPU distribution across NUMA nodes
//
// In summary: cpusNumPerNUMA < CPURequest < ((cpusNumPerNuma * numaNodes) - reservedCPUscount)
// Considering all these constraints we select: CPURequest= (cpusNumPerNUMA-smtLevel)*numaNodes
cpuRequest := (cpusNumPerNUMA - smtLevel) * numaNodes
if cpuRequest > allocatableCPUs.Size() {
e2eskipper.Skipf("Pod requesting %d CPUs which is more than allocatable CPUs:%d", cpuRequest, allocatableCPUs.Size())
}
ginkgo.By("Creating the test pod")
testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("test-pod-allocation-per-numa", cpuRequest))
ginkgo.By("Checking the cpumanager metrics after pod creation")
keys := make(map[interface{}]types.GomegaMatcher)
idFn := makeCustomLabelID(metrics.AlignedNUMANode)
// On a clean environment with no other pods running if distribute-across-numa policy option is enabled
for i := 0; i < numaNodes; i++ {
keys["kubelet_cpu_manager_allocation_per_numa"] = gstruct.MatchAllElements(idFn, gstruct.Elements{
fmt.Sprintf("%d", i): timelessSample(2),
})
}
matchSpreadMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, keys)
ginkgo.By("Giving the Kubelet time to start up and produce metrics")
gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics)
ginkgo.By("Ensuring the metrics match the expectations a few more times")
gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics)
})
}) })
}) })

View File

@ -187,6 +187,13 @@ func makeCustomPairID(pri, sec string) func(interface{}) string {
} }
} }
func makeCustomLabelID(label string) func(interface{}) string {
return func(element interface{}) string {
el := element.(*model.Sample)
return string(el.Metric[model.LabelName(label)])
}
}
func boundedSample(lower, upper interface{}) types.GomegaMatcher { func boundedSample(lower, upper interface{}) types.GomegaMatcher {
return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{ return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{
// We already check Metric when matching the Id // We already check Metric when matching the Id