diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index ff9e6a20e14..9eabd25034a 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" + "k8s.io/kubernetes/pkg/kubelet/metrics" ) const ( @@ -252,47 +253,56 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset) } -func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { - if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { - klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) - // container belongs in an exclusively allocated pool - - if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { - // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted - // if the CPU requested is a multiple of the number of virtual cpus per physical cores. - // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put - // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores - // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, - // the pod would be placed on a node where there are enough physical cores available to be allocated. - // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket - // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all - // CPUs on a physical core. Allocation of individual threads would never have to occur. - return SMTAlignmentError{ - RequestedCPUs: numCPUs, - CpusPerCore: p.topology.CPUsPerCore(), - } - } - if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { - p.updateCPUsToReuse(pod, container, cpuset) - klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) - return nil - } - - // Call Topology Manager to get the aligned socket affinity across all hint providers. - hint := p.affinity.GetAffinity(string(pod.UID), container.Name) - klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) - - // Allocate CPUs according to the NUMA affinity contained in the hint. - cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) - if err != nil { - klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) - return err - } - s.SetCPUSet(string(pod.UID), container.Name, cpuset) - p.updateCPUsToReuse(pod, container, cpuset) - +func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { + numCPUs := p.guaranteedCPUs(pod, container) + if numCPUs == 0 { + // container belongs in the shared pool (nothing to do; use default cpuset) + return nil } - // container belongs in the shared pool (nothing to do; use default cpuset) + + klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) + // container belongs in an exclusively allocated pool + metrics.CPUManagerPinningRequestsTotal.Inc() + defer func() { + if rerr != nil { + metrics.CPUManagerPinningErrorsTotal.Inc() + } + }() + + if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) { + // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted + // if the CPU requested is a multiple of the number of virtual cpus per physical cores. + // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put + // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores + // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs, + // the pod would be placed on a node where there are enough physical cores available to be allocated. + // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket + // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all + // CPUs on a physical core. Allocation of individual threads would never have to occur. + return SMTAlignmentError{ + RequestedCPUs: numCPUs, + CpusPerCore: p.topology.CPUsPerCore(), + } + } + if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + p.updateCPUsToReuse(pod, container, cpuset) + klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) + return nil + } + + // Call Topology Manager to get the aligned socket affinity across all hint providers. + hint := p.affinity.GetAffinity(string(pod.UID), container.Name) + klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint) + + // Allocate CPUs according to the NUMA affinity contained in the hint. + cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) + if err != nil { + klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) + return err + } + s.SetCPUSet(string(pod.UID), container.Name, cpuset) + p.updateCPUsToReuse(pod, container, cpuset) + return nil } diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 00fe1ec5572..cc18e128585 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -86,6 +86,10 @@ const ( // Metrics to track ephemeral container usage by this kubelet ManagedEphemeralContainersKey = "managed_ephemeral_containers" + // Metrics to track the CPU manager behavior + CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total" + CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" + // Values used in metric labels Container = "container" InitContainer = "init_container" @@ -506,6 +510,26 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // CPUManagerPinningRequestsTotal tracks the number of times the pod spec will cause the cpu manager to pin cores + CPUManagerPinningRequestsTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerPinningRequestsTotalKey, + Help: "The number of cpu core allocations which required pinning.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // CPUManagerPinningErrorsTotal tracks the number of times the pod spec required the cpu manager to pin cores, but the allocation failed + CPUManagerPinningErrorsTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerPinningErrorsTotalKey, + Help: "The number of cpu core allocations which required pinning failed.", + StabilityLevel: metrics.ALPHA, + }, + ) ) var registerMetrics sync.Once @@ -570,6 +594,11 @@ func Register(collectors ...metrics.StableCollector) { if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) { legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks) } + + if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) { + legacyregistry.MustRegister(CPUManagerPinningRequestsTotal) + legacyregistry.MustRegister(CPUManagerPinningErrorsTotal) + } }) }