From f1031be019c929c5c0731b1944aa78033441a7b8 Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Fri, 21 Feb 2025 11:14:55 +0000 Subject: [PATCH 1/2] node: cpumgr: metrics: add metrics for allocation per NUMA Signed-off-by: Swati Sehgal --- pkg/kubelet/cm/cpumanager/policy_static.go | 51 +++++++++++++++++----- pkg/kubelet/metrics/metrics.go | 13 ++++++ 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index a9828474d64..e183d71e4a6 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -18,6 +18,7 @@ package cpumanager import ( "fmt" + "strconv" v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -389,7 +390,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs) p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs) - p.updateMetricsOnAllocate(cpuAllocation) + p.updateMetricsOnAllocate(s, cpuAllocation) klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String()) return nil @@ -416,7 +417,8 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa // Mutate the shared pool, adding released cpus. toRelease = toRelease.Difference(cpusInUse) s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) - p.updateMetricsOnRelease(toRelease) + p.updateMetricsOnRelease(s, toRelease) + } return nil } @@ -755,33 +757,60 @@ func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableC func (p *staticPolicy) initializeMetrics(s state.State) { metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000)) - metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s))) metrics.ContainerAlignedComputeResourcesFailure.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedPhysicalCPU).Add(0) // ensure the value exists metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Add(0) // ensure the value exists + totalAssignedCPUs := getTotalAssignedExclusiveCPUs(s) + metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(totalAssignedCPUs.Size())) + updateAllocationPerNUMAMetric(p.topology, totalAssignedCPUs) } -func (p *staticPolicy) updateMetricsOnAllocate(cpuAlloc topology.Allocation) { +func (p *staticPolicy) updateMetricsOnAllocate(s state.State, cpuAlloc topology.Allocation) { ncpus := cpuAlloc.CPUs.Size() metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus)) metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000)) if cpuAlloc.Aligned.UncoreCache { metrics.ContainerAlignedComputeResources.WithLabelValues(metrics.AlignScopeContainer, metrics.AlignedUncoreCache).Inc() } + totalAssignedCPUs := getTotalAssignedExclusiveCPUs(s) + updateAllocationPerNUMAMetric(p.topology, totalAssignedCPUs) } -func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) { +func (p *staticPolicy) updateMetricsOnRelease(s state.State, cset cpuset.CPUSet) { ncpus := cset.Size() metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(-ncpus)) metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(ncpus * 1000)) + totalAssignedCPUs := getTotalAssignedExclusiveCPUs(s) + updateAllocationPerNUMAMetric(p.topology, totalAssignedCPUs.Difference(cset)) } -func countExclusiveCPUs(s state.State) int { - exclusiveCPUs := 0 - for _, cpuAssign := range s.GetCPUAssignments() { - for _, cset := range cpuAssign { - exclusiveCPUs += cset.Size() +func getTotalAssignedExclusiveCPUs(s state.State) cpuset.CPUSet { + totalAssignedCPUs := cpuset.New() + for _, assignment := range s.GetCPUAssignments() { + for _, cset := range assignment { + totalAssignedCPUs = totalAssignedCPUs.Union(cset) } + + } + return totalAssignedCPUs +} + +func updateAllocationPerNUMAMetric(topo *topology.CPUTopology, allocatedCPUs cpuset.CPUSet) { + numaCount := make(map[int]int) + + // Count CPUs allocated per NUMA node + for _, cpuID := range allocatedCPUs.UnsortedList() { + numaNode, err := topo.CPUNUMANodeID(cpuID) + if err != nil { + //NOTE: We are logging the error but it is highly unlikely to happen as the CPUset + // is already computed, evaluated and there is no room for user tampering. + klog.ErrorS(err, "Unable to determine NUMA node", "cpuID", cpuID) + } + numaCount[numaNode]++ + } + + // Update metric + for numaNode, count := range numaCount { + metrics.CPUManagerAllocationPerNUMA.WithLabelValues(strconv.Itoa(numaNode)).Set(float64(count)) } - return exclusiveCPUs } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 04d62880b0b..7a04387ab6f 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -113,6 +113,7 @@ const ( CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" CPUManagerSharedPoolSizeMilliCoresKey = "cpu_manager_shared_pool_size_millicores" CPUManagerExclusiveCPUsAllocationCountKey = "cpu_manager_exclusive_cpu_allocation_count" + CPUManagerAllocationPerNUMAKey = "cpu_manager_allocation_per_numa" // Metrics to track the Memory manager behavior MemoryManagerPinningRequestsTotalKey = "memory_manager_pinning_requests_total" @@ -815,6 +816,17 @@ var ( }, ) + // CPUManagerAllocationPerNUMA tracks the count of CPUs allocated per NUMA node + CPUManagerAllocationPerNUMA = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerAllocationPerNUMAKey, + Help: "Number of CPUs allocated per NUMA node", + StabilityLevel: metrics.ALPHA, + }, + []string{AlignedNUMANode}, + ) + // ContainerAlignedComputeResources reports the count of resources allocation which granted aligned resources, per alignment boundary ContainerAlignedComputeResources = metrics.NewCounterVec( &metrics.CounterOpts{ @@ -1126,6 +1138,7 @@ func Register(collectors ...metrics.StableCollector) { legacyregistry.MustRegister(CPUManagerPinningErrorsTotal) legacyregistry.MustRegister(CPUManagerSharedPoolSizeMilliCores) legacyregistry.MustRegister(CPUManagerExclusiveCPUsAllocationCount) + legacyregistry.MustRegister(CPUManagerAllocationPerNUMA) legacyregistry.MustRegister(ContainerAlignedComputeResources) legacyregistry.MustRegister(ContainerAlignedComputeResourcesFailure) legacyregistry.MustRegister(MemoryManagerPinningRequestTotal) From b6a3ac47de0780e2f74b17ea3f2af732b030be2d Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Fri, 28 Feb 2025 11:04:53 +0000 Subject: [PATCH 2/2] node: cpumgr: e2e: tests for NUMA allocation spread metrics Signed-off-by: Swati Sehgal --- test/e2e_node/cpu_manager_metrics_test.go | 124 ++++++++++++++++++++++ test/e2e_node/resource_metrics_test.go | 7 ++ 2 files changed, 131 insertions(+) diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go index b4db0f40806..26e5a428d43 100644 --- a/test/e2e_node/cpu_manager_metrics_test.go +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -34,6 +34,7 @@ import ( kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" @@ -389,6 +390,129 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) }) + ginkgo.It("should report zero counters for allocation per NUMA after a fresh restart", func(ctx context.Context) { + + cpuPolicyOptions := map[string]string{ + cpumanager.DistributeCPUsAcrossNUMAOption: "true", + cpumanager.FullPCPUsOnlyOption: "true", + } + newCfg := configureCPUManagerInKubelet(oldCfg, + &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: cpuset.New(0), + enableCPUManagerOptions: true, + options: cpuPolicyOptions, + }, + ) + + updateKubeletConfig(ctx, f, newCfg, true) + + ginkgo.By("Checking the cpumanager allocation per NUMA metric right after the kubelet restart, with no pods running") + numaNodes, _, _ := hostCheck() + + framework.Logf("numaNodes on the system %d", numaNodes) + + keys := make(map[interface{}]types.GomegaMatcher) + idFn := makeCustomLabelID(metrics.AlignedNUMANode) + + for i := 0; i < numaNodes; i++ { + keys["kubelet_cpu_manager_allocation_per_numa"] = gstruct.MatchAllElements(idFn, gstruct.Elements{ + fmt.Sprintf("%d", i): timelessSample(0), + }) + + } + + matchSpreadMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, keys) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics) + + }) + + ginkgo.It("should report allocation per NUMA metric when handling guaranteed pods", func(ctx context.Context) { + var cpusNumPerNUMA, coresNumPerNUMA, numaNodes, threadsPerCore int + cpuPolicyOptions := map[string]string{ + cpumanager.DistributeCPUsAcrossNUMAOption: "true", + cpumanager.FullPCPUsOnlyOption: "true", + } + newCfg := configureCPUManagerInKubelet(oldCfg, + &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: cpuset.New(0), + enableCPUManagerOptions: true, + options: cpuPolicyOptions, + }, + ) + + updateKubeletConfig(ctx, f, newCfg, true) + + numaNodes, coresNumPerNUMA, threadsPerCore = hostCheck() + cpusNumPerNUMA = coresNumPerNUMA * threadsPerCore + + framework.Logf("numaNodes on the system %d", numaNodes) + framework.Logf("Cores per NUMA on the system %d", coresNumPerNUMA) + framework.Logf("Threads per Core on the system %d", threadsPerCore) + framework.Logf("CPUs per NUMA on the system %d", cpusNumPerNUMA) + + smtLevel = getSMTLevel() + framework.Logf("SMT Level on the system %d", smtLevel) + + ginkgo.By("Querying the podresources endpoint to get the baseline") + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + defer func() { + framework.ExpectNoError(conn.Close()) + }() + + ginkgo.By("Checking the pool allocatable resources from the kubelet") + resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectNoError(err, "failed to get the kubelet allocatable resources") + allocatableCPUs, _ := demuxCPUsAndDevicesFromGetAllocatableResources(resp) + + // 'distribute-cpus-across-numa' policy option ensures that CPU allocations are evenly distributed + // across NUMA nodes in cases where more than one NUMA node is required to satisfy the allocation. + // So, we want to ensure that the CPU Request exceeds the number of CPUs that can fit within a single + // NUMA node. We have to pick cpuRequest such that: + // 1. CPURequest > cpusNumPerNUMA + // 2. Not occupy all the CPUs on the node ande leave room for reserved CPU + // 3. CPURequest is a multiple if number of NUMA nodes to allow equal CPU distribution across NUMA nodes + // + // In summary: cpusNumPerNUMA < CPURequest < ((cpusNumPerNuma * numaNodes) - reservedCPUscount) + // Considering all these constraints we select: CPURequest= (cpusNumPerNUMA-smtLevel)*numaNodes + cpuRequest := (cpusNumPerNUMA - smtLevel) * numaNodes + if cpuRequest > allocatableCPUs.Size() { + e2eskipper.Skipf("Pod requesting %d CPUs which is more than allocatable CPUs:%d", cpuRequest, allocatableCPUs.Size()) + } + + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(ctx, makeGuaranteedCPUExclusiveSleeperPod("test-pod-allocation-per-numa", cpuRequest)) + + ginkgo.By("Checking the cpumanager metrics after pod creation") + + keys := make(map[interface{}]types.GomegaMatcher) + idFn := makeCustomLabelID(metrics.AlignedNUMANode) + + // On a clean environment with no other pods running if distribute-across-numa policy option is enabled + for i := 0; i < numaNodes; i++ { + keys["kubelet_cpu_manager_allocation_per_numa"] = gstruct.MatchAllElements(idFn, gstruct.Elements{ + fmt.Sprintf("%d", i): timelessSample(2), + }) + + } + + matchSpreadMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, keys) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchSpreadMetrics) + }) + }) }) diff --git a/test/e2e_node/resource_metrics_test.go b/test/e2e_node/resource_metrics_test.go index 011866eb7bd..36079c6f23e 100644 --- a/test/e2e_node/resource_metrics_test.go +++ b/test/e2e_node/resource_metrics_test.go @@ -187,6 +187,13 @@ func makeCustomPairID(pri, sec string) func(interface{}) string { } } +func makeCustomLabelID(label string) func(interface{}) string { + return func(element interface{}) string { + el := element.(*model.Sample) + return string(el.Metric[model.LabelName(label)]) + } +} + func boundedSample(lower, upper interface{}) types.GomegaMatcher { return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{ // We already check Metric when matching the Id