mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #112855 from fromanirh/cpumanager-metrics
node: metrics: cpumanager: add metrics about pinning
This commit is contained in:
commit
d0e86111ef
@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -258,47 +259,56 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
|
||||
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset)
|
||||
}
|
||||
|
||||
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
|
||||
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
|
||||
klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
// container belongs in an exclusively allocated pool
|
||||
|
||||
if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) {
|
||||
// Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted
|
||||
// if the CPU requested is a multiple of the number of virtual cpus per physical cores.
|
||||
// In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put
|
||||
// in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores
|
||||
// and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs,
|
||||
// the pod would be placed on a node where there are enough physical cores available to be allocated.
|
||||
// Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket
|
||||
// and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all
|
||||
// CPUs on a physical core. Allocation of individual threads would never have to occur.
|
||||
return SMTAlignmentError{
|
||||
RequestedCPUs: numCPUs,
|
||||
CpusPerCore: p.topology.CPUsPerCore(),
|
||||
}
|
||||
}
|
||||
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
|
||||
p.updateCPUsToReuse(pod, container, cpuset)
|
||||
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call Topology Manager to get the aligned socket affinity across all hint providers.
|
||||
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
|
||||
klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
|
||||
|
||||
// Allocate CPUs according to the NUMA affinity contained in the hint.
|
||||
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
|
||||
return err
|
||||
}
|
||||
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
|
||||
p.updateCPUsToReuse(pod, container, cpuset)
|
||||
|
||||
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
|
||||
numCPUs := p.guaranteedCPUs(pod, container)
|
||||
if numCPUs == 0 {
|
||||
// container belongs in the shared pool (nothing to do; use default cpuset)
|
||||
return nil
|
||||
}
|
||||
// container belongs in the shared pool (nothing to do; use default cpuset)
|
||||
|
||||
klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
// container belongs in an exclusively allocated pool
|
||||
metrics.CPUManagerPinningRequestsTotal.Inc()
|
||||
defer func() {
|
||||
if rerr != nil {
|
||||
metrics.CPUManagerPinningErrorsTotal.Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) {
|
||||
// Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted
|
||||
// if the CPU requested is a multiple of the number of virtual cpus per physical cores.
|
||||
// In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put
|
||||
// in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores
|
||||
// and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs,
|
||||
// the pod would be placed on a node where there are enough physical cores available to be allocated.
|
||||
// Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket
|
||||
// and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all
|
||||
// CPUs on a physical core. Allocation of individual threads would never have to occur.
|
||||
return SMTAlignmentError{
|
||||
RequestedCPUs: numCPUs,
|
||||
CpusPerCore: p.topology.CPUsPerCore(),
|
||||
}
|
||||
}
|
||||
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
|
||||
p.updateCPUsToReuse(pod, container, cpuset)
|
||||
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call Topology Manager to get the aligned socket affinity across all hint providers.
|
||||
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
|
||||
klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
|
||||
|
||||
// Allocate CPUs according to the NUMA affinity contained in the hint.
|
||||
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
|
||||
return err
|
||||
}
|
||||
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
|
||||
p.updateCPUsToReuse(pod, container, cpuset)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -86,6 +86,10 @@ const (
|
||||
// Metrics to track ephemeral container usage by this kubelet
|
||||
ManagedEphemeralContainersKey = "managed_ephemeral_containers"
|
||||
|
||||
// Metrics to track the CPU manager behavior
|
||||
CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total"
|
||||
CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total"
|
||||
|
||||
// Values used in metric labels
|
||||
Container = "container"
|
||||
InitContainer = "init_container"
|
||||
@ -506,6 +510,26 @@ var (
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
)
|
||||
|
||||
// CPUManagerPinningRequestsTotal tracks the number of times the pod spec will cause the cpu manager to pin cores
|
||||
CPUManagerPinningRequestsTotal = metrics.NewCounter(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: CPUManagerPinningRequestsTotalKey,
|
||||
Help: "The number of cpu core allocations which required pinning.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
)
|
||||
|
||||
// CPUManagerPinningErrorsTotal tracks the number of times the pod spec required the cpu manager to pin cores, but the allocation failed
|
||||
CPUManagerPinningErrorsTotal = metrics.NewCounter(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: CPUManagerPinningErrorsTotalKey,
|
||||
Help: "The number of cpu core allocations which required pinning failed.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
@ -570,6 +594,11 @@ func Register(collectors ...metrics.StableCollector) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
|
||||
legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks)
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
|
||||
legacyregistry.MustRegister(CPUManagerPinningRequestsTotal)
|
||||
legacyregistry.MustRegister(CPUManagerPinningErrorsTotal)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
206
test/e2e_node/cpu_manager_metrics_test.go
Normal file
206
test/e2e_node/cpu_manager_metrics_test.go
Normal file
@ -0,0 +1,206 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2enode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
"github.com/onsi/gomega/gstruct"
|
||||
"github.com/onsi/gomega/types"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
)
|
||||
|
||||
var _ = SIGDescribe("CPU Manager Metrics [Serial][Feature:CPUManager]", func() {
|
||||
f := framework.NewDefaultFramework("cpumanager-metrics")
|
||||
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
|
||||
|
||||
ginkgo.Context("when querying /metrics", func() {
|
||||
var oldCfg *kubeletconfig.KubeletConfiguration
|
||||
var testPod *v1.Pod
|
||||
var smtLevel int
|
||||
|
||||
ginkgo.BeforeEach(func() {
|
||||
var err error
|
||||
if oldCfg == nil {
|
||||
oldCfg, err = getCurrentKubeletConfig()
|
||||
framework.ExpectNoError(err)
|
||||
}
|
||||
|
||||
fullCPUsOnlyOpt := fmt.Sprintf("option=%s", cpumanager.FullPCPUsOnlyOption)
|
||||
_, cpuAlloc, _ := getLocalNodeCPUDetails(f)
|
||||
smtLevel = getSMTLevel()
|
||||
|
||||
// strict SMT alignment is trivially verified and granted on non-SMT systems
|
||||
if smtLevel < 2 {
|
||||
e2eskipper.Skipf("Skipping CPU Manager %s tests since SMT disabled", fullCPUsOnlyOpt)
|
||||
}
|
||||
|
||||
// our tests want to allocate up to a full core, so we need at last 2*2=4 virtual cpus
|
||||
if cpuAlloc < int64(smtLevel*2) {
|
||||
e2eskipper.Skipf("Skipping CPU Manager %s tests since the CPU capacity < 4", fullCPUsOnlyOpt)
|
||||
}
|
||||
|
||||
framework.Logf("SMT level %d", smtLevel)
|
||||
|
||||
// TODO: we assume the first available CPUID is 0, which is pretty fair, but we should probably
|
||||
// check what we do have in the node.
|
||||
cpuPolicyOptions := map[string]string{
|
||||
cpumanager.FullPCPUsOnlyOption: "true",
|
||||
}
|
||||
newCfg := configureCPUManagerInKubelet(oldCfg,
|
||||
&cpuManagerKubeletArguments{
|
||||
policyName: string(cpumanager.PolicyStatic),
|
||||
enableCPUManager: true,
|
||||
reservedSystemCPUs: cpuset.NewCPUSet(0),
|
||||
enableCPUManagerOptions: true,
|
||||
options: cpuPolicyOptions,
|
||||
},
|
||||
)
|
||||
updateKubeletConfig(f, newCfg, true)
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func() {
|
||||
if testPod != nil {
|
||||
deletePodSyncByName(f, testPod.Name)
|
||||
}
|
||||
updateKubeletConfig(f, oldCfg, true)
|
||||
})
|
||||
|
||||
ginkgo.It("should report zero pinning counters after a fresh restart", func() {
|
||||
// we updated the kubelet config in BeforeEach, so we can assume we start fresh.
|
||||
// being [Serial], we can also assume noone else but us is running pods.
|
||||
ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with no pods running")
|
||||
|
||||
matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
|
||||
"kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{
|
||||
"": timelessSample(0),
|
||||
}),
|
||||
"kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{
|
||||
"": timelessSample(0),
|
||||
}),
|
||||
})
|
||||
|
||||
ginkgo.By("Giving the Kubelet time to start up and produce metrics")
|
||||
gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
|
||||
ginkgo.By("Ensuring the metrics match the expectations a few more times")
|
||||
gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
|
||||
})
|
||||
|
||||
ginkgo.It("should report pinning failures when the cpumanager allocation is known to fail", func() {
|
||||
ginkgo.By("Creating the test pod which will be rejected for SMTAlignmentError")
|
||||
testPod = e2epod.NewPodClient(f).Create(makeGuaranteedCPUExclusiveSleeperPod("smt-align-err", 1))
|
||||
|
||||
// we updated the kubelet config in BeforeEach, so we can assume we start fresh.
|
||||
// being [Serial], we can also assume noone else but us is running pods.
|
||||
ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod failed to admit")
|
||||
|
||||
matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
|
||||
"kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{
|
||||
"": timelessSample(1),
|
||||
}),
|
||||
"kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{
|
||||
"": timelessSample(1),
|
||||
}),
|
||||
})
|
||||
|
||||
ginkgo.By("Giving the Kubelet time to start up and produce metrics")
|
||||
gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
|
||||
ginkgo.By("Ensuring the metrics match the expectations a few more times")
|
||||
gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
|
||||
})
|
||||
|
||||
ginkgo.It("should not report any pinning failures when the cpumanager allocation is expected to succeed", func() {
|
||||
ginkgo.By("Creating the test pod")
|
||||
testPod = e2epod.NewPodClient(f).Create(makeGuaranteedCPUExclusiveSleeperPod("smt-align-ok", smtLevel))
|
||||
|
||||
// we updated the kubelet config in BeforeEach, so we can assume we start fresh.
|
||||
// being [Serial], we can also assume noone else but us is running pods.
|
||||
ginkgo.By("Checking the cpumanager metrics right after the kubelet restart, with pod should be admitted")
|
||||
|
||||
matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
|
||||
"kubelet_cpu_manager_pinning_requests_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{
|
||||
"": timelessSample(1),
|
||||
}),
|
||||
"kubelet_cpu_manager_pinning_errors_total": gstruct.MatchAllElements(nodeID, gstruct.Elements{
|
||||
"": timelessSample(0),
|
||||
}),
|
||||
})
|
||||
|
||||
ginkgo.By("Giving the Kubelet time to start up and produce metrics")
|
||||
gomega.Eventually(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
|
||||
ginkgo.By("Ensuring the metrics match the expectations a few more times")
|
||||
gomega.Consistently(getCPUManagerMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
func getCPUManagerMetrics() (e2emetrics.KubeletMetrics, error) {
|
||||
// we are running out of good names, so we need to be unnecessarily specific to avoid clashes
|
||||
ginkgo.By("getting CPU Manager metrics from the metrics API")
|
||||
return e2emetrics.GrabKubeletMetricsWithoutProxy(framework.TestContext.NodeName+":10255", "/metrics")
|
||||
}
|
||||
|
||||
func makeGuaranteedCPUExclusiveSleeperPod(name string, cpus int) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name + "-pod",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: name + "-cnt",
|
||||
Image: busyboxImage,
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpus)),
|
||||
v1.ResourceMemory: resource.MustParse("64Mi"),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpus)),
|
||||
v1.ResourceMemory: resource.MustParse("64Mi"),
|
||||
},
|
||||
},
|
||||
Command: []string{"sh", "-c", "sleep", "1d"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func timelessSample(value interface{}) types.GomegaMatcher {
|
||||
return gstruct.PointTo(gstruct.MatchAllFields(gstruct.Fields{
|
||||
// We already check Metric when matching the Id
|
||||
"Metric": gstruct.Ignore(),
|
||||
"Value": gomega.BeNumerically("==", value),
|
||||
"Timestamp": gstruct.Ignore(),
|
||||
}))
|
||||
}
|
Loading…
Reference in New Issue
Block a user