diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 2983765870f..bb82c53406e 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) const ( @@ -258,47 +259,56 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset) } -func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { - if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { - klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) - // container belongs in an exclusively allocated pool - - if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { - // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted - // if the CPU requested is a multiple of the number of virtual cpus per physical cores. - // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put - // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores - // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, - // the pod would be placed on a node where there are enough physical cores available to be allocated. - // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket - // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all - // CPUs on a physical core. Allocation of individual threads would never have to occur. - return SMTAlignmentError{ - RequestedCPUs: numCPUs, - CpusPerCore: p.topology.CPUsPerCore(), - } - } - if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { - p.updateCPUsToReuse(pod, container, cpuset) - klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) - return nil - } - - // Call Topology Manager to get the aligned socket affinity across all hint providers. - hint := p.affinity.GetAffinity(string(pod.UID), container.Name) - klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) - - // Allocate CPUs according to the NUMA affinity contained in the hint. - cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) - if err != nil { - klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) - return err - } - s.SetCPUSet(string(pod.UID), container.Name, cpuset) - p.updateCPUsToReuse(pod, container, cpuset) - +func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { + numCPUs := p.guaranteedCPUs(pod, container) + if numCPUs == 0 { + // container belongs in the shared pool (nothing to do; use default cpuset) + return nil } - // container belongs in the shared pool (nothing to do; use default cpuset) + + klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) + // container belongs in an exclusively allocated pool + metrics.CPUManagerPinningRequestsTotal.Inc() + defer func() { + if rerr != nil { + metrics.CPUManagerPinningErrorsTotal.Inc() + } + }() + + if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { + // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted + // if the CPU requested is a multiple of the number of virtual cpus per physical cores. + // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put + // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores + // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, + // the pod would be placed on a node where there are enough physical cores available to be allocated. + // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket + // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all + // CPUs on a physical core. Allocation of individual threads would never have to occur. + return SMTAlignmentError{ + RequestedCPUs: numCPUs, + CpusPerCore: p.topology.CPUsPerCore(), + } + } + if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + p.updateCPUsToReuse(pod, container, cpuset) + klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) + return nil + } + + // Call Topology Manager to get the aligned socket affinity across all hint providers. + hint := p.affinity.GetAffinity(string(pod.UID), container.Name) + klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) + + // Allocate CPUs according to the NUMA affinity contained in the hint. + cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) + if err != nil { + klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) + return err + } + s.SetCPUSet(string(pod.UID), container.Name, cpuset) + p.updateCPUsToReuse(pod, container, cpuset) + return nil } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 00fe1ec5572..cc18e128585 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -86,6 +86,10 @@ const ( // Metrics to track ephemeral container usage by this kubelet ManagedEphemeralContainersKey = "managed_ephemeral_containers" + // Metrics to track the CPU manager behavior + CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total" + CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" + // Values used in metric labels Container = "container" InitContainer = "init_container" @@ -506,6 +510,26 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // CPUManagerPinningRequestsTotal tracks the number of times the pod spec will cause the cpu manager to pin cores + CPUManagerPinningRequestsTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerPinningRequestsTotalKey, + Help: "The number of cpu core allocations which required pinning.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // CPUManagerPinningErrorsTotal tracks the number of times the pod spec required the cpu manager to pin cores, but the allocation failed + CPUManagerPinningErrorsTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerPinningErrorsTotalKey, + Help: "The number of cpu core allocations which required pinning failed.", + StabilityLevel: metrics.ALPHA, + }, + ) ) var registerMetrics sync.Once @@ -570,6 +594,11 @@ func Register(collectors ...metrics.StableCollector) { if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) { legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks) } + + if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) { + legacyregistry.MustRegister(CPUManagerPinningRequestsTotal) + legacyregistry.MustRegister(CPUManagerPinningErrorsTotal) + } }) } diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go new file mode 100644 index 00000000000..a899d393210 --- /dev/null +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -0,0 +1,206 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + "github.com/onsi/gomega/types" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/test/e2e/framework" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + admissionapi "k8s.io/pod-security-admission/api" +) + +var _ = SIGDescribe("CPU Manager Metrics [Serial][Feature:CPUManager]", func() { + f := framework.NewDefaultFramework("cpumanager-metrics") + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + ginkgo.Context("when querying /metrics", func() { + var oldCfg *kubeletconfig.KubeletConfiguration + var testPod *v1.Pod + var smtLevel int + + ginkgo.BeforeEach(func() { + var err error + if oldCfg == nil { + oldCfg, err = getCurrentKubeletConfig() + framework.ExpectNoError(err) + } + + fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption) + _, cpuAlloc, _ := getLocalNodeCPUDetails(f) + smtLevel = getSMTLevel() + + // strict SMT alignment is trivially verified and granted on non-SMT systems + if smtLevel < 2 { + e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt) + } + + // our tests want to allocate up to a full core, so we need at last 2*2=4 virtual cpus + if cpuAlloc < int64(smtLevel*2) { + e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt) + } + + framework.Logf("SMT level %d", smtLevel) + + // TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably + // check what we do have in the node. + cpuPolicyOptions := map[string]string{ + cpumanager.FullPCPUsOnlyOption: "true", + } + newCfg := configureCPUManagerInKubelet(oldCfg, + &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + enableCPUManager: true, + reservedSystemCPUs: cpuset.NewCPUSet(0), + enableCPUManagerOptions: true, + options: cpuPolicyOptions, + }, + ) + updateKubeletConfig(f, newCfg, true) + }) + + ginkgo.AfterEach(func() { + if testPod != nil { + deletePodSyncByName(f, testPod.Name) + } + updateKubeletConfig(f, oldCfg, true) + }) + + ginkgo.It("should report zero pinning counters after a fresh restart", func() { + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with no pods running") + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + "kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + }) + + ginkgo.It("should report pinning failures when the cpumanager allocation is known to fail", func() { + ginkgo.By("Creating the test pod which will be rejected for SMTAlignmentError") + testPod = e2epod.NewPodClient(f).Create(makeGuaranteedCPUExclusiveSleeperPod("smt-align-err", 1)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod failed to admit") + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(1), + }), + "kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(1), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + }) + + ginkgo.It("should not report any pinning failures when the cpumanager allocation is expected to succeed", func() { + ginkgo.By("Creating the test pod") + testPod = e2epod.NewPodClient(f).Create(makeGuaranteedCPUExclusiveSleeperPod("smt-align-ok", smtLevel)) + + // we updated the kubelet config in BeforeEach, so we can assume we start fresh. + // being [Serial], we can also assume noone else but us is running pods. + ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted") + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(1), + }), + "kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics) + }) + }) +}) + +func getCPUManagerMetrics() (e2emetrics.KubeletMetrics, error) { + // we are running out of good names, so we need to be unnecessarily specific to avoid clashes + ginkgo.By("getting CPU Manager metrics from the metrics API") + return e2emetrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics") +} + +func makeGuaranteedCPUExclusiveSleeperPod(name string, cpus int) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-pod", + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: name + "-cnt", + Image: busyboxImage, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpus)), + v1.ResourceMemory: resource.MustParse("64Mi"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpus)), + v1.ResourceMemory: resource.MustParse("64Mi"), + }, + }, + Command: []string{"sh", "-c", "sleep", "1d"}, + }, + }, + }, + } +} + +func timelessSample(value interface{}) types.GomegaMatcher { + return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{ + // We already check Metric when matching the Id + "Metric": gstruct.Ignore(), + "Value": gomega.BeNumerically("==", value), + "Timestamp": gstruct.Ignore(), + })) +}