From 212c4c485172ebda99cd639969f1cc1c32ba6ba7 Mon Sep 17 00:00:00 2001 From: likakuli <1154584512@qq.com> Date: Thu, 14 Sep 2023 17:13:04 +0800 Subject: [PATCH 1/2] feat: revert #103979 for it's duplicated Signed-off-by: likakuli <1154584512@qq.com> --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 27 +--- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 4 +- .../cm/cpumanager/topology_hints_test.go | 6 - pkg/kubelet/cm/devicemanager/manager.go | 24 +--- .../cm/devicemanager/topology_hints.go | 8 -- .../cm/memorymanager/memory_manager.go | 31 +---- .../cm/memorymanager/memory_manager_test.go | 128 +----------------- 7 files changed, 11 insertions(+), 217 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b17af7ff8fa..0986e57bea1 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -146,9 +146,6 @@ type manager struct { // allocatableCPUs is the set of online CPUs as reported by the system, // and available for allocation, minus the reserved set allocatableCPUs cpuset.CPUSet - - // pendingAdmissionPod contain the pod during the admission phase - pendingAdmissionPod *v1.Pod } var _ Manager = &manager{} @@ -254,10 +251,6 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe } func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(p) - // Garbage collect any stranded resources before allocating CPUs. m.removeStaleState() @@ -326,9 +319,6 @@ func (m *manager) State() state.Reader { } func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -336,9 +326,6 @@ func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[str } func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -375,14 +362,11 @@ func (m *manager) removeStaleState() { defer m.Unlock() // Get the list of active pods. - activeAndAdmittedPods := m.activePods() - if m.pendingAdmissionPod != nil { - activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) - } + activePods := m.activePods() // Build a list of (podUID, containerName) pairs for all containers in all active Pods. activeContainers := make(map[string]map[string]struct{}) - for _, pod := range activeAndAdmittedPods { + for _, pod := range activePods { activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { activeContainers[string(pod.UID)][container.Name] = struct{}{} @@ -554,10 +538,3 @@ func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet { func (m *manager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet { return m.state.GetCPUSetOrDefault(podUID, containerName) } - -func (m *manager) setPodPendingAdmission(pod *v1.Pod) { - m.Lock() - defer m.Unlock() - - m.pendingAdmissionPod = pod -} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 0630032c511..0e4767b6cc9 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -325,7 +325,7 @@ func TestCPUManagerAdd(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - mgr.activePods = func() []*v1.Pod { return nil } + mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} } err := mgr.Allocate(pod, container) if !reflect.DeepEqual(err, testCase.expAllocateErr) { @@ -1321,7 +1321,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - mgr.activePods = func() []*v1.Pod { return nil } + mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} } err := mgr.Allocate(pod, container) if !reflect.DeepEqual(err, testCase.expAllocateErr) { diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index dc8f0d49834..322f45baa77 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -245,11 +245,6 @@ func TestGetTopologyHints(t *testing.T) { if len(tc.expectedHints) == 0 && len(hints) == 0 { continue } - - if m.pendingAdmissionPod == nil { - t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()") - } - sort.SliceStable(hints, func(i, j int) bool { return hints[i].LessThan(hints[j]) }) @@ -298,7 +293,6 @@ func TestGetPodTopologyHints(t *testing.T) { if len(tc.expectedHints) == 0 && len(podHints) == 0 { continue } - sort.SliceStable(podHints, func(i, j int) bool { return podHints[i].LessThan(podHints[j]) }) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 32a618e7221..c6ccb4e7f83 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -101,9 +101,6 @@ type ManagerImpl struct { // init containers. devicesToReuse PodReusableDevices - // pendingAdmissionPod contain the pod during the admission phase - pendingAdmissionPod *v1.Pod - // containerMap provides a mapping from (pod, container) -> containerID // for all containers in a pod. Used to detect pods running across a restart containerMap containermap.ContainerMap @@ -364,10 +361,6 @@ func (m *ManagerImpl) Stop() error { // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - if _, ok := m.devicesToReuse[string(pod.UID)]; !ok { m.devicesToReuse[string(pod.UID)] = make(map[string]sets.Set[string]) } @@ -548,20 +541,14 @@ func (m *ManagerImpl) getCheckpoint() (checkpoint.DeviceManagerCheckpoint, error // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. func (m *ManagerImpl) UpdateAllocatedDevices() { + activePods := m.activePods() if !m.sourcesReady.AllReady() { return } - m.mutex.Lock() defer m.mutex.Unlock() - - activeAndAdmittedPods := m.activePods() - if m.pendingAdmissionPod != nil { - activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) - } - podsToBeRemoved := m.podDevices.pods() - for _, pod := range activeAndAdmittedPods { + for _, pod := range activePods { podsToBeRemoved.Delete(string(pod.UID)) } if len(podsToBeRemoved) <= 0 { @@ -1171,13 +1158,6 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool { return len(checkpoints) == 0 } -func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.pendingAdmissionPod = pod -} - func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool { cntID, err := m.containerMap.GetContainerID(podUID, cntName) if err != nil { diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 16630daad81..c7fd4a790e0 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -31,10 +31,6 @@ import ( // ensures the Device Manager is consulted when Topology Aware Hints for each // container are created. func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded device resources before providing TopologyHints m.UpdateAllocatedDevices() @@ -87,10 +83,6 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map // GetPodTopologyHints implements the topologymanager.HintProvider Interface which // ensures the Device Manager is consulted when Topology Aware Hints for Pod are created. func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded device resources before providing TopologyHints m.UpdateAllocatedDevices() diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 7acc5f54229..831afb193ea 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -126,9 +126,6 @@ type manager struct { // allocatableMemory holds the allocatable memory for each NUMA node allocatableMemory []state.Block - - // pendingAdmissionPod contain the pod during the admission phase - pendingAdmissionPod *v1.Pod } var _ Manager = &manager{} @@ -242,10 +239,6 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets. // Allocate is called to pre-allocate memory resources during Pod admission. func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded resources before allocation m.removeStaleState() @@ -284,10 +277,6 @@ func (m *manager) State() state.Reader { // GetPodTopologyHints returns the topology hints for the topology manager func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -296,10 +285,6 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager. // GetTopologyHints returns the topology hints for the topology manager func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -322,15 +307,12 @@ func (m *manager) removeStaleState() { m.Lock() defer m.Unlock() - // Get the list of admitted and active pods. - activeAndAdmittedPods := m.activePods() - if m.pendingAdmissionPod != nil { - activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) - } + // Get the list of active pods. + activePods := m.activePods() // Build a list of (podUID, containerName) pairs for all containers in all active Pods. activeContainers := make(map[string]map[string]struct{}) - for _, pod := range activeAndAdmittedPods { + for _, pod := range activePods { activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { activeContainers[string(pod.UID)][container.Name] = struct{}{} @@ -464,10 +446,3 @@ func (m *manager) GetAllocatableMemory() []state.Block { func (m *manager) GetMemory(podUID, containerName string) []state.Block { return m.state.GetMemoryBlocks(podUID, containerName) } - -func (m *manager) setPodPendingAdmission(pod *v1.Pod) { - m.Lock() - defer m.Unlock() - - m.pendingAdmissionPod = pod -} diff --git a/pkg/kubelet/cm/memorymanager/memory_manager_test.go b/pkg/kubelet/cm/memorymanager/memory_manager_test.go index 5b5a3e1999e..44c654be157 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager_test.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager_test.go @@ -2019,129 +2019,6 @@ func TestNewManager(t *testing.T) { func TestGetTopologyHints(t *testing.T) { testCases := []testMemoryManager{ - { - description: "Successful hint generation", - policyName: policyTypeStatic, - machineInfo: returnMachineInfo(), - reserved: systemReservedMemory{ - 0: map[v1.ResourceName]uint64{ - v1.ResourceMemory: 1 * gb, - }, - 1: map[v1.ResourceName]uint64{ - v1.ResourceMemory: 1 * gb, - }, - }, - assignments: state.ContainerMemoryAssignments{ - "fakePod1": map[string][]state.Block{ - "fakeContainer1": { - { - NUMAAffinity: []int{0}, - Type: v1.ResourceMemory, - Size: 1 * gb, - }, - { - NUMAAffinity: []int{0}, - Type: hugepages1Gi, - Size: 1 * gb, - }, - }, - "fakeContainer2": { - { - NUMAAffinity: []int{0}, - Type: v1.ResourceMemory, - Size: 1 * gb, - }, - { - NUMAAffinity: []int{0}, - Type: hugepages1Gi, - Size: 1 * gb, - }, - }, - }, - }, - machineState: state.NUMANodeMap{ - 0: &state.NUMANodeState{ - Cells: []int{0}, - NumberOfAssignments: 4, - MemoryMap: map[v1.ResourceName]*state.MemoryTable{ - v1.ResourceMemory: { - Allocatable: 9 * gb, - Free: 7 * gb, - Reserved: 2 * gb, - SystemReserved: 1 * gb, - TotalMemSize: 10 * gb, - }, - hugepages1Gi: { - Allocatable: 5 * gb, - Free: 3 * gb, - Reserved: 2 * gb, - SystemReserved: 0 * gb, - TotalMemSize: 5 * gb, - }, - }, - }, - 1: &state.NUMANodeState{ - Cells: []int{1}, - NumberOfAssignments: 0, - MemoryMap: map[v1.ResourceName]*state.MemoryTable{ - v1.ResourceMemory: { - Allocatable: 9 * gb, - Free: 9 * gb, - Reserved: 0 * gb, - SystemReserved: 1 * gb, - TotalMemSize: 10 * gb, - }, - hugepages1Gi: { - Allocatable: 5 * gb, - Free: 5 * gb, - Reserved: 0, - SystemReserved: 0, - TotalMemSize: 5 * gb, - }, - }, - }, - }, - expectedError: nil, - expectedHints: map[string][]topologymanager.TopologyHint{ - string(v1.ResourceMemory): { - { - NUMANodeAffinity: newNUMAAffinity(0), - Preferred: true, - }, - { - NUMANodeAffinity: newNUMAAffinity(1), - Preferred: true, - }, - }, - string(hugepages1Gi): { - { - NUMANodeAffinity: newNUMAAffinity(0), - Preferred: true, - }, - { - NUMANodeAffinity: newNUMAAffinity(1), - Preferred: true, - }, - }, - }, - activePods: []*v1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - UID: "fakePod1", - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "fakeContainer1", - }, - { - Name: "fakeContainer2", - }, - }, - }, - }, - }, - }, { description: "Successful hint generation", policyName: policyTypeStatic, @@ -2255,7 +2132,6 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, - activePods: []*v1.Pod{}, }, } @@ -2268,14 +2144,14 @@ func TestGetTopologyHints(t *testing.T) { containerRuntime: mockRuntimeService{ err: nil, }, - activePods: func() []*v1.Pod { return testCase.activePods }, + activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } mgr.sourcesReady = &sourcesReadyStub{} mgr.state.SetMachineState(testCase.machineState.Clone()) mgr.state.SetMemoryAssignments(testCase.assignments.Clone()) - pod := getPod("fakePod2", "fakeContainer1", requirementsGuaranteed) + pod := getPod("fakePod1", "fakeContainer1", requirementsGuaranteed) container := &pod.Spec.Containers[0] hints := mgr.GetTopologyHints(pod, container) if !reflect.DeepEqual(hints, testCase.expectedHints) { From 14ec0edd10eaeca0139a00d48548513b5efccff0 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 24 Sep 2024 09:53:56 +0200 Subject: [PATCH 2/2] node: metrics: add metrics about cpu pool sizes Add metrics about the sizing of the cpu pools. Currently the cpumanager maintains 2 cpu pools: - shared pool: this is where all pods with non-exclusive cpu allocation run - exclusive pool: this is the union of the set of exclusive cpus allocated to containers, if any (requires static policy in use). By reporting the size of the pools, the users (humans or machines) can get better insights and more feedback about how the resources actually allocated to the workload and how the node resources are used. --- pkg/kubelet/cm/cpumanager/policy_static.go | 31 ++++++++ pkg/kubelet/metrics/metrics.go | 30 ++++++- test/e2e_node/cpu_manager_metrics_test.go | 92 ++++++++++++++++++++++ test/e2e_node/podresources_test.go | 30 ++++--- 4 files changed, 168 insertions(+), 15 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index db8770f96f6..d0278c03f87 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -194,6 +194,7 @@ func (p *staticPolicy) Start(s state.State) error { klog.ErrorS(err, "Static policy invalid state, please drain node and remove policy state file") return err } + p.initializeMetrics(s) return nil } @@ -370,8 +371,10 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) return err } + s.SetCPUSet(string(pod.UID), container.Name, cpuset) p.updateCPUsToReuse(pod, container, cpuset) + p.updateMetricsOnAllocate(cpuset) return nil } @@ -397,6 +400,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa // Mutate the shared pool, adding released cpus. toRelease = toRelease.Difference(cpusInUse) s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) + p.updateMetricsOnRelease(toRelease) } return nil } @@ -720,3 +724,30 @@ func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableC return alignedCPUs } + +func (p *staticPolicy) initializeMetrics(s state.State) { + metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000)) + metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s))) +} + +func (p *staticPolicy) updateMetricsOnAllocate(cset cpuset.CPUSet) { + ncpus := cset.Size() + metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus)) + metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000)) +} + +func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) { + ncpus := cset.Size() + metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(-ncpus)) + metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(ncpus * 1000)) +} + +func countExclusiveCPUs(s state.State) int { + exclusiveCPUs := 0 + for _, cpuAssign := range s.GetCPUAssignments() { + for _, cset := range cpuAssign { + exclusiveCPUs += cset.Size() + } + } + return exclusiveCPUs +} diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 30a194a3a66..da8492ab78f 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -108,8 +108,10 @@ const ( ManagedEphemeralContainersKey = "managed_ephemeral_containers" // Metrics to track the CPU manager behavior - CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total" - CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" + CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total" + CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" + CPUManagerSharedPoolSizeMilliCoresKey = "cpu_manager_shared_pool_size_millicores" + CPUManagerExclusiveCPUsAllocationCountKey = "cpu_manager_exclusive_cpu_allocation_count" // Metrics to track the Memory manager behavior MemoryManagerPinningRequestsTotalKey = "memory_manager_pinning_requests_total" @@ -773,6 +775,27 @@ var ( }, ) + // CPUManagerSharedPoolSizeMilliCores reports the current size of the shared CPU pool for non-guaranteed pods + CPUManagerSharedPoolSizeMilliCores = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerSharedPoolSizeMilliCoresKey, + Help: "The size of the shared CPU pool for non-guaranteed QoS pods, in millicores.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // CPUManagerExclusiveCPUsAllocationCount reports the total number of CPUs exclusively allocated to containers running on this node + CPUManagerExclusiveCPUsAllocationCount = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerExclusiveCPUsAllocationCountKey, + Help: "The total number of CPUs exclusively allocated to containers running on this node", + StabilityLevel: metrics.ALPHA, + }, + ) + + // ContainerAlignedComputeResources reports the count of resources allocation which granted aligned resources, per alignment boundary ContainerAlignedComputeResources = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: KubeletSubsystem, @@ -782,7 +805,6 @@ var ( }, []string{ContainerAlignedComputeResourcesScopeLabelKey, ContainerAlignedComputeResourcesBoundaryLabelKey}, ) - // MemoryManagerPinningRequestTotal tracks the number of times the pod spec required the memory manager to pin memory pages MemoryManagerPinningRequestTotal = metrics.NewCounter( &metrics.CounterOpts{ @@ -1006,6 +1028,8 @@ func Register(collectors ...metrics.StableCollector) { legacyregistry.MustRegister(RunPodSandboxErrors) legacyregistry.MustRegister(CPUManagerPinningRequestsTotal) legacyregistry.MustRegister(CPUManagerPinningErrorsTotal) + legacyregistry.MustRegister(CPUManagerSharedPoolSizeMilliCores) + legacyregistry.MustRegister(CPUManagerExclusiveCPUsAllocationCount) legacyregistry.MustRegister(ContainerAlignedComputeResources) if utilfeature.DefaultFeatureGate.Enabled(features.MemoryManager) { legacyregistry.MustRegister(MemoryManagerPinningRequestTotal) diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go index aacde715cab..7ec1db48451 100644 --- a/test/e2e_node/cpu_manager_metrics_test.go +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -29,8 +29,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" @@ -182,6 +185,95 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) }) + + ginkgo.It("should report the default idle cpu pool size", func(ctx context.Context) { + ginkgo.By("Querying the podresources endpoint to get the baseline") + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + defer func() { + framework.ExpectNoError(conn.Close()) + }() + + ginkgo.By("Checking the pool allocatable resources from the kubelet") + resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectNoError(err, "failed to get the kubelet allocatable resources") + allocatableCPUs, _ := demuxCPUsAndDevicesFromGetAllocatableResources(resp) + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_shared_pool_size_millicores": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(int(allocatableCPUs.Size() * 1000)), + }), + "kubelet_cpu_manager_exclusive_cpu_allocation_count": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics about idle pool size") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations about idle pool size a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetrics) + }) + + ginkgo.It("should report mutating cpu pool size when handling guaranteed pods", func(ctx context.Context) { + ginkgo.By("Querying the podresources endpoint to get the baseline") + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + defer func() { + framework.ExpectNoError(conn.Close()) + }() + + ginkgo.By("Checking the pool allocatable resources from the kubelet") + resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectNoError(err, "failed to get the kubelet allocatable resources") + allocatableCPUs, _ := demuxCPUsAndDevicesFromGetAllocatableResources(resp) + + allocatableCPUsIdleMillis := int(allocatableCPUs.Size() * 1000) + + matchResourceMetricsIdle := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_shared_pool_size_millicores": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(allocatableCPUsIdleMillis), + }), + "kubelet_cpu_manager_exclusive_cpu_allocation_count": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + ginkgo.By(fmt.Sprintf("Pool allocatable resources from the kubelet: shared pool %d cpus %d millis", allocatableCPUs.Size(), allocatableCPUsIdleMillis)) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics about idle pool size") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetricsIdle) + ginkgo.By("Ensuring the metrics match the expectations about idle pool size a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsIdle) + + ginkgo.By("Creating the test pod to consume exclusive cpus from the pool") + testPod = e2epod.NewPodClient(f).CreateSync(ctx, makeGuaranteedCPUExclusiveSleeperPod("smt-cpupool", smtLevel)) + + matchResourceMetricsBusy := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_shared_pool_size_millicores": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(allocatableCPUsIdleMillis - (smtLevel * 1000)), + }), + "kubelet_cpu_manager_exclusive_cpu_allocation_count": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(smtLevel), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetricsBusy) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsBusy) + + deletePodSyncByName(ctx, f, testPod.Name) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetricsIdle) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsIdle) + }) }) }) diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index 494730414a3..b29eb5fba65 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -719,19 +719,16 @@ func podresourcesListTests(ctx context.Context, f *framework.Framework, cli kube } func podresourcesGetAllocatableResourcesTests(ctx context.Context, cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData, onlineCPUs, reservedSystemCPUs cpuset.CPUSet) { + ginkgo.GinkgoHelper() + ginkgo.By("checking the devices known to the kubelet") resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) - framework.ExpectNoErrorWithOffset(1, err) - devs := resp.GetDevices() - var cpus []int - for _, cpuid := range resp.GetCpuIds() { - cpus = append(cpus, int(cpuid)) - } - allocatableCPUs := cpuset.New(cpus...) + framework.ExpectNoError(err, "cannot get allocatable CPUs from podresources") + allocatableCPUs, devs := demuxCPUsAndDevicesFromGetAllocatableResources(resp) if onlineCPUs.Size() == 0 { ginkgo.By("expecting no CPUs reported") - gomega.ExpectWithOffset(1, onlineCPUs.Size()).To(gomega.Equal(reservedSystemCPUs.Size()), "with no online CPUs, no CPUs should be reserved") + gomega.Expect(onlineCPUs.Size()).To(gomega.Equal(reservedSystemCPUs.Size()), "with no online CPUs, no CPUs should be reserved") } else { ginkgo.By(fmt.Sprintf("expecting online CPUs reported - online=%v (%d) reserved=%v (%d)", onlineCPUs, onlineCPUs.Size(), reservedSystemCPUs, reservedSystemCPUs.Size())) if reservedSystemCPUs.Size() > onlineCPUs.Size() { @@ -740,23 +737,32 @@ func podresourcesGetAllocatableResourcesTests(ctx context.Context, cli kubeletpo expectedCPUs := onlineCPUs.Difference(reservedSystemCPUs) ginkgo.By(fmt.Sprintf("expecting CPUs '%v'='%v'", allocatableCPUs, expectedCPUs)) - gomega.ExpectWithOffset(1, allocatableCPUs.Equals(expectedCPUs)).To(gomega.BeTrueBecause("mismatch expecting CPUs")) + gomega.Expect(allocatableCPUs.Equals(expectedCPUs)).To(gomega.BeTrueBecause("mismatch expecting CPUs")) } if sd == nil { // no devices in the environment, so expect no devices ginkgo.By("expecting no devices reported") - gomega.ExpectWithOffset(1, devs).To(gomega.BeEmpty(), fmt.Sprintf("got unexpected devices %#v", devs)) + gomega.Expect(devs).To(gomega.BeEmpty(), fmt.Sprintf("got unexpected devices %#v", devs)) return } ginkgo.By(fmt.Sprintf("expecting some %q devices reported", sd.resourceName)) - gomega.ExpectWithOffset(1, devs).ToNot(gomega.BeEmpty()) + gomega.Expect(devs).ToNot(gomega.BeEmpty()) for _, dev := range devs { gomega.Expect(dev.ResourceName).To(gomega.Equal(sd.resourceName)) - gomega.ExpectWithOffset(1, dev.DeviceIds).ToNot(gomega.BeEmpty()) + gomega.Expect(dev.DeviceIds).ToNot(gomega.BeEmpty()) } } +func demuxCPUsAndDevicesFromGetAllocatableResources(resp *kubeletpodresourcesv1.AllocatableResourcesResponse) (cpuset.CPUSet, []*kubeletpodresourcesv1.ContainerDevices) { + devs := resp.GetDevices() + var cpus []int + for _, cpuid := range resp.GetCpuIds() { + cpus = append(cpus, int(cpuid)) + } + return cpuset.New(cpus...), devs +} + func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubeletpodresourcesv1.PodResourcesListerClient, sidecarContainersEnabled bool) { //var err error ginkgo.By("checking the output when no pods are present")