From 47d329978135810912393ec199d3927a1887affe Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 4 Oct 2022 14:36:42 +0200 Subject: [PATCH 1/2] node: metrics: cpumanager: add pinning metrics In order to improve the observability of the cpumanager, add and populate metrics to track if the combination of the kubelet configuration and podspec would trigger exclusive core allocation and pinning. We should avoid leaking any node/machine specific information (e.g. core ids, even though this is admittedly an extreme example); tracking these metrics seems to be a good first step, because it allows us to get feedback without exposing details. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/cpumanager/policy_static.go | 90 ++++++++++++---------- pkg/kubelet/metrics/metrics.go | 29 +++++++ 2 files changed, 79 insertions(+), 40 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index ff9e6a20e14..9eabd25034a 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) const ( @@ -252,47 +253,56 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset) } -func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { - if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { - klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) - // container belongs in an exclusively allocated pool - - if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { - // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted - // if the CPU requested is a multiple of the number of virtual cpus per physical cores. - // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put - // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores - // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, - // the pod would be placed on a node where there are enough physical cores available to be allocated. - // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket - // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all - // CPUs on a physical core. Allocation of individual threads would never have to occur. - return SMTAlignmentError{ - RequestedCPUs: numCPUs, - CpusPerCore: p.topology.CPUsPerCore(), - } - } - if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { - p.updateCPUsToReuse(pod, container, cpuset) - klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) - return nil - } - - // Call Topology Manager to get the aligned socket affinity across all hint providers. - hint := p.affinity.GetAffinity(string(pod.UID), container.Name) - klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) - - // Allocate CPUs according to the NUMA affinity contained in the hint. - cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) - if err != nil { - klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) - return err - } - s.SetCPUSet(string(pod.UID), container.Name, cpuset) - p.updateCPUsToReuse(pod, container, cpuset) - +func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { + numCPUs := p.guaranteedCPUs(pod, container) + if numCPUs == 0 { + // container belongs in the shared pool (nothing to do; use default cpuset) + return nil } - // container belongs in the shared pool (nothing to do; use default cpuset) + + klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) + // container belongs in an exclusively allocated pool + metrics.CPUManagerPinningRequestsTotal.Inc() + defer func() { + if rerr != nil { + metrics.CPUManagerPinningErrorsTotal.Inc() + } + }() + + if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { + // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted + // if the CPU requested is a multiple of the number of virtual cpus per physical cores. + // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put + // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores + // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, + // the pod would be placed on a node where there are enough physical cores available to be allocated. + // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket + // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all + // CPUs on a physical core. Allocation of individual threads would never have to occur. + return SMTAlignmentError{ + RequestedCPUs: numCPUs, + CpusPerCore: p.topology.CPUsPerCore(), + } + } + if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + p.updateCPUsToReuse(pod, container, cpuset) + klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) + return nil + } + + // Call Topology Manager to get the aligned socket affinity across all hint providers. + hint := p.affinity.GetAffinity(string(pod.UID), container.Name) + klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) + + // Allocate CPUs according to the NUMA affinity contained in the hint. + cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) + if err != nil { + klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) + return err + } + s.SetCPUSet(string(pod.UID), container.Name, cpuset) + p.updateCPUsToReuse(pod, container, cpuset) + return nil } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 00fe1ec5572..cc18e128585 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -86,6 +86,10 @@ const ( // Metrics to track ephemeral container usage by this kubelet ManagedEphemeralContainersKey = "managed_ephemeral_containers" + // Metrics to track the CPU manager behavior + CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total" + CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" + // Values used in metric labels Container = "container" InitContainer = "init_container" @@ -506,6 +510,26 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // CPUManagerPinningRequestsTotal tracks the number of times the pod spec will cause the cpu manager to pin cores + CPUManagerPinningRequestsTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerPinningRequestsTotalKey, + Help: "The number of cpu core allocations which required pinning.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // CPUManagerPinningErrorsTotal tracks the number of times the pod spec required the cpu manager to pin cores, but the allocation failed + CPUManagerPinningErrorsTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerPinningErrorsTotalKey, + Help: "The number of cpu core allocations which required pinning failed.", + StabilityLevel: metrics.ALPHA, + }, + ) ) var registerMetrics sync.Once @@ -570,6 +594,11 @@ func Register(collectors ...metrics.StableCollector) { if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) { legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks) } + + if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) { + legacyregistry.MustRegister(CPUManagerPinningRequestsTotal) + legacyregistry.MustRegister(CPUManagerPinningErrorsTotal) + } }) } From bdc08eaa4b9ec38fa20573691a27e99862b4729c Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Wed, 5 Oct 2022 19:21:27 +0200 Subject: [PATCH 2/2] e2e: node: add tests for cpumanager metrics Add tests to verify the cpumanager metrics are populated. Signed-off-by: Francesco Romani --- test/e2e_node/cpu_manager_metrics_test.go | 206 ++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 test/e2e_node/cpu_manager_metrics_test.go diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go new file mode 100644 index 00000000000..a899d393210 --- /dev/null +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -0,0 +1,206 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + "github.com/onsi/gomega/types" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/test/e2e/framework" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + admissionapi "k8s.io/pod-security-admission/api" +) + +var _ = SIGDescribe("CPU Manager Metrics [Serial][Feature:CPUManager]", func() { + f := framework.NewDefaultFramework("cpumanager-metrics") + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + ginkgo.Context("when querying /metrics", func() { + var oldCfg *kubeletconfig.KubeletConfiguration + var testPod *v1.Pod + var smtLevel int + + ginkgo.BeforeEach(func() { + var err error + if oldCfg == nil { + oldCfg, err = getCurrentKubeletConfig() + framework.ExpectNoError(err) + } + + fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption) + _, cpuAlloc, _ := getLocalNodeCPUDetails(f) + smtLevel = getSMTLevel() + + // strict SMT alignment is trivially verified and granted on non-SMT systems + if smtLevel < 2 { + e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt) + } + + // our tests want to allocate up to a full core, so we need at last 2*2=4 virtual cpus + if cpuAlloc < int64(smtLevel*2) { + e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt) + } + + framework.Logf("SMT level %d", smtLevel) + + // TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably + // check what we do have in the node. + cpuPolicyOptions := map[string]string{ + cpumanager.FullPCPUsOnlyOption: "true", + } + newCfg := configureCPUManagerInKubelet(oldCfg, + &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + enableCPUManager: true, + reservedSystemCPUs: cpuset.NewCPUSet(0), + enableCPUManagerOptions: true, + options: cpuPolicyOptions, + }, + ) + updateKubeletConfig(f, newCfg, true) + }) + + ginkgo.AfterEach(func() { + if testPod != nil { + deletePodSyncByName(f, testPod.Name) + } + updateKubeletConfig(f, oldCfg, true) + }) + + ginkgo.It("should report zero pinning counters after a fresh restart", func() { + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with no pods running") + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + "kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + }) + + ginkgo.It("should report pinning failures when the cpumanager allocation is known to fail", func() { + ginkgo.By("Creating the test pod which will be rejected for SMTAlignmentError") + testPod = e2epod.NewPodClient(f).Create(makeGuaranteedCPUExclusiveSleeperPod("smt-align-err", 1)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod failed to admit") + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(1), + }), + "kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(1), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + }) + + ginkgo.It("should not report any pinning failures when the cpumanager allocation is expected to succeed", func() { + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(makeGuaranteedCPUExclusiveSleeperPod("smt-align-ok", smtLevel)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(1), + }), + "kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + }) + }) +}) + +func getCPUManagerMetrics() (e2emetrics.KubeletMetrics, error) { + // we are running out of good names, so we need to be unnecessarily specific to avoid clashes + ginkgo.By("getting CPU Manager metrics from the metrics API") + return e2emetrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics") +} + +func makeGuaranteedCPUExclusiveSleeperPod(name string, cpus int) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-pod", + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: name + "-cnt", + Image: busyboxImage, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpus)), + v1.ResourceMemory: resource.MustParse("64Mi"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpus)), + v1.ResourceMemory: resource.MustParse("64Mi"), + }, + }, + Command: []string{"sh", "-c", "sleep", "1d"}, + }, + }, + }, + } +} + +func timelessSample(value interface{}) types.GomegaMatcher { + return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{ + // We already check Metric when matching the Id + "Metric": gstruct.Ignore(), + "Value": gomega.BeNumerically("==", value), + "Timestamp": gstruct.Ignore(), + })) +}