diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index b17af7ff8fa..0986e57bea1 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -146,9 +146,6 @@ type manager struct { // allocatableCPUs is the set of online CPUs as reported by the system, // and available for allocation, minus the reserved set allocatableCPUs cpuset.CPUSet - - // pendingAdmissionPod contain the pod during the admission phase - pendingAdmissionPod *v1.Pod } var _ Manager = &manager{} @@ -254,10 +251,6 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe } func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(p) - // Garbage collect any stranded resources before allocating CPUs. m.removeStaleState() @@ -326,9 +319,6 @@ func (m *manager) State() state.Reader { } func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -336,9 +326,6 @@ func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[str } func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -375,14 +362,11 @@ func (m *manager) removeStaleState() { defer m.Unlock() // Get the list of active pods. - activeAndAdmittedPods := m.activePods() - if m.pendingAdmissionPod != nil { - activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) - } + activePods := m.activePods() // Build a list of (podUID, containerName) pairs for all containers in all active Pods. activeContainers := make(map[string]map[string]struct{}) - for _, pod := range activeAndAdmittedPods { + for _, pod := range activePods { activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { activeContainers[string(pod.UID)][container.Name] = struct{}{} @@ -554,10 +538,3 @@ func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet { func (m *manager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet { return m.state.GetCPUSetOrDefault(podUID, containerName) } - -func (m *manager) setPodPendingAdmission(pod *v1.Pod) { - m.Lock() - defer m.Unlock() - - m.pendingAdmissionPod = pod -} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 0630032c511..0e4767b6cc9 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -325,7 +325,7 @@ func TestCPUManagerAdd(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - mgr.activePods = func() []*v1.Pod { return nil } + mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} } err := mgr.Allocate(pod, container) if !reflect.DeepEqual(err, testCase.expAllocateErr) { @@ -1321,7 +1321,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - mgr.activePods = func() []*v1.Pod { return nil } + mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} } err := mgr.Allocate(pod, container) if !reflect.DeepEqual(err, testCase.expAllocateErr) { diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index db8770f96f6..d0278c03f87 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -194,6 +194,7 @@ func (p *staticPolicy) Start(s state.State) error { klog.ErrorS(err, "Static policy invalid state, please drain node and remove policy state file") return err } + p.initializeMetrics(s) return nil } @@ -370,8 +371,10 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs) return err } + s.SetCPUSet(string(pod.UID), container.Name, cpuset) p.updateCPUsToReuse(pod, container, cpuset) + p.updateMetricsOnAllocate(cpuset) return nil } @@ -397,6 +400,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa // Mutate the shared pool, adding released cpus. toRelease = toRelease.Difference(cpusInUse) s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) + p.updateMetricsOnRelease(toRelease) } return nil } @@ -720,3 +724,30 @@ func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableC return alignedCPUs } + +func (p *staticPolicy) initializeMetrics(s state.State) { + metrics.CPUManagerSharedPoolSizeMilliCores.Set(float64(p.GetAvailableCPUs(s).Size() * 1000)) + metrics.CPUManagerExclusiveCPUsAllocationCount.Set(float64(countExclusiveCPUs(s))) +} + +func (p *staticPolicy) updateMetricsOnAllocate(cset cpuset.CPUSet) { + ncpus := cset.Size() + metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(ncpus)) + metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(-ncpus * 1000)) +} + +func (p *staticPolicy) updateMetricsOnRelease(cset cpuset.CPUSet) { + ncpus := cset.Size() + metrics.CPUManagerExclusiveCPUsAllocationCount.Add(float64(-ncpus)) + metrics.CPUManagerSharedPoolSizeMilliCores.Add(float64(ncpus * 1000)) +} + +func countExclusiveCPUs(s state.State) int { + exclusiveCPUs := 0 + for _, cpuAssign := range s.GetCPUAssignments() { + for _, cset := range cpuAssign { + exclusiveCPUs += cset.Size() + } + } + return exclusiveCPUs +} diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index dc8f0d49834..322f45baa77 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -245,11 +245,6 @@ func TestGetTopologyHints(t *testing.T) { if len(tc.expectedHints) == 0 && len(hints) == 0 { continue } - - if m.pendingAdmissionPod == nil { - t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()") - } - sort.SliceStable(hints, func(i, j int) bool { return hints[i].LessThan(hints[j]) }) @@ -298,7 +293,6 @@ func TestGetPodTopologyHints(t *testing.T) { if len(tc.expectedHints) == 0 && len(podHints) == 0 { continue } - sort.SliceStable(podHints, func(i, j int) bool { return podHints[i].LessThan(podHints[j]) }) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 32a618e7221..c6ccb4e7f83 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -101,9 +101,6 @@ type ManagerImpl struct { // init containers. devicesToReuse PodReusableDevices - // pendingAdmissionPod contain the pod during the admission phase - pendingAdmissionPod *v1.Pod - // containerMap provides a mapping from (pod, container) -> containerID // for all containers in a pod. Used to detect pods running across a restart containerMap containermap.ContainerMap @@ -364,10 +361,6 @@ func (m *ManagerImpl) Stop() error { // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - if _, ok := m.devicesToReuse[string(pod.UID)]; !ok { m.devicesToReuse[string(pod.UID)] = make(map[string]sets.Set[string]) } @@ -548,20 +541,14 @@ func (m *ManagerImpl) getCheckpoint() (checkpoint.DeviceManagerCheckpoint, error // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. func (m *ManagerImpl) UpdateAllocatedDevices() { + activePods := m.activePods() if !m.sourcesReady.AllReady() { return } - m.mutex.Lock() defer m.mutex.Unlock() - - activeAndAdmittedPods := m.activePods() - if m.pendingAdmissionPod != nil { - activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) - } - podsToBeRemoved := m.podDevices.pods() - for _, pod := range activeAndAdmittedPods { + for _, pod := range activePods { podsToBeRemoved.Delete(string(pod.UID)) } if len(podsToBeRemoved) <= 0 { @@ -1171,13 +1158,6 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool { return len(checkpoints) == 0 } -func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.pendingAdmissionPod = pod -} - func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool { cntID, err := m.containerMap.GetContainerID(podUID, cntName) if err != nil { diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 9b69f38eae1..7b5c9f4c9dd 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -31,10 +31,6 @@ import ( // ensures the Device Manager is consulted when Topology Aware Hints for each // container are created. func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded device resources before providing TopologyHints m.UpdateAllocatedDevices() @@ -87,10 +83,6 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map // GetPodTopologyHints implements the topologymanager.HintProvider Interface which // ensures the Device Manager is consulted when Topology Aware Hints for Pod are created. func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded device resources before providing TopologyHints m.UpdateAllocatedDevices() diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 7acc5f54229..831afb193ea 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -126,9 +126,6 @@ type manager struct { // allocatableMemory holds the allocatable memory for each NUMA node allocatableMemory []state.Block - - // pendingAdmissionPod contain the pod during the admission phase - pendingAdmissionPod *v1.Pod } var _ Manager = &manager{} @@ -242,10 +239,6 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets. // Allocate is called to pre-allocate memory resources during Pod admission. func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded resources before allocation m.removeStaleState() @@ -284,10 +277,6 @@ func (m *manager) State() state.Reader { // GetPodTopologyHints returns the topology hints for the topology manager func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -296,10 +285,6 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager. // GetTopologyHints returns the topology hints for the topology manager func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - // The pod is during the admission phase. We need to save the pod to avoid it - // being cleaned before the admission ended - m.setPodPendingAdmission(pod) - // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -322,15 +307,12 @@ func (m *manager) removeStaleState() { m.Lock() defer m.Unlock() - // Get the list of admitted and active pods. - activeAndAdmittedPods := m.activePods() - if m.pendingAdmissionPod != nil { - activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) - } + // Get the list of active pods. + activePods := m.activePods() // Build a list of (podUID, containerName) pairs for all containers in all active Pods. activeContainers := make(map[string]map[string]struct{}) - for _, pod := range activeAndAdmittedPods { + for _, pod := range activePods { activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { activeContainers[string(pod.UID)][container.Name] = struct{}{} @@ -464,10 +446,3 @@ func (m *manager) GetAllocatableMemory() []state.Block { func (m *manager) GetMemory(podUID, containerName string) []state.Block { return m.state.GetMemoryBlocks(podUID, containerName) } - -func (m *manager) setPodPendingAdmission(pod *v1.Pod) { - m.Lock() - defer m.Unlock() - - m.pendingAdmissionPod = pod -} diff --git a/pkg/kubelet/cm/memorymanager/memory_manager_test.go b/pkg/kubelet/cm/memorymanager/memory_manager_test.go index 5b5a3e1999e..44c654be157 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager_test.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager_test.go @@ -2019,129 +2019,6 @@ func TestNewManager(t *testing.T) { func TestGetTopologyHints(t *testing.T) { testCases := []testMemoryManager{ - { - description: "Successful hint generation", - policyName: policyTypeStatic, - machineInfo: returnMachineInfo(), - reserved: systemReservedMemory{ - 0: map[v1.ResourceName]uint64{ - v1.ResourceMemory: 1 * gb, - }, - 1: map[v1.ResourceName]uint64{ - v1.ResourceMemory: 1 * gb, - }, - }, - assignments: state.ContainerMemoryAssignments{ - "fakePod1": map[string][]state.Block{ - "fakeContainer1": { - { - NUMAAffinity: []int{0}, - Type: v1.ResourceMemory, - Size: 1 * gb, - }, - { - NUMAAffinity: []int{0}, - Type: hugepages1Gi, - Size: 1 * gb, - }, - }, - "fakeContainer2": { - { - NUMAAffinity: []int{0}, - Type: v1.ResourceMemory, - Size: 1 * gb, - }, - { - NUMAAffinity: []int{0}, - Type: hugepages1Gi, - Size: 1 * gb, - }, - }, - }, - }, - machineState: state.NUMANodeMap{ - 0: &state.NUMANodeState{ - Cells: []int{0}, - NumberOfAssignments: 4, - MemoryMap: map[v1.ResourceName]*state.MemoryTable{ - v1.ResourceMemory: { - Allocatable: 9 * gb, - Free: 7 * gb, - Reserved: 2 * gb, - SystemReserved: 1 * gb, - TotalMemSize: 10 * gb, - }, - hugepages1Gi: { - Allocatable: 5 * gb, - Free: 3 * gb, - Reserved: 2 * gb, - SystemReserved: 0 * gb, - TotalMemSize: 5 * gb, - }, - }, - }, - 1: &state.NUMANodeState{ - Cells: []int{1}, - NumberOfAssignments: 0, - MemoryMap: map[v1.ResourceName]*state.MemoryTable{ - v1.ResourceMemory: { - Allocatable: 9 * gb, - Free: 9 * gb, - Reserved: 0 * gb, - SystemReserved: 1 * gb, - TotalMemSize: 10 * gb, - }, - hugepages1Gi: { - Allocatable: 5 * gb, - Free: 5 * gb, - Reserved: 0, - SystemReserved: 0, - TotalMemSize: 5 * gb, - }, - }, - }, - }, - expectedError: nil, - expectedHints: map[string][]topologymanager.TopologyHint{ - string(v1.ResourceMemory): { - { - NUMANodeAffinity: newNUMAAffinity(0), - Preferred: true, - }, - { - NUMANodeAffinity: newNUMAAffinity(1), - Preferred: true, - }, - }, - string(hugepages1Gi): { - { - NUMANodeAffinity: newNUMAAffinity(0), - Preferred: true, - }, - { - NUMANodeAffinity: newNUMAAffinity(1), - Preferred: true, - }, - }, - }, - activePods: []*v1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - UID: "fakePod1", - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "fakeContainer1", - }, - { - Name: "fakeContainer2", - }, - }, - }, - }, - }, - }, { description: "Successful hint generation", policyName: policyTypeStatic, @@ -2255,7 +2132,6 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, - activePods: []*v1.Pod{}, }, } @@ -2268,14 +2144,14 @@ func TestGetTopologyHints(t *testing.T) { containerRuntime: mockRuntimeService{ err: nil, }, - activePods: func() []*v1.Pod { return testCase.activePods }, + activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } mgr.sourcesReady = &sourcesReadyStub{} mgr.state.SetMachineState(testCase.machineState.Clone()) mgr.state.SetMemoryAssignments(testCase.assignments.Clone()) - pod := getPod("fakePod2", "fakeContainer1", requirementsGuaranteed) + pod := getPod("fakePod1", "fakeContainer1", requirementsGuaranteed) container := &pod.Spec.Containers[0] hints := mgr.GetTopologyHints(pod, container) if !reflect.DeepEqual(hints, testCase.expectedHints) { diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 319aedd286d..3050f3c5663 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -109,8 +109,10 @@ const ( ManagedEphemeralContainersKey = "managed_ephemeral_containers" // Metrics to track the CPU manager behavior - CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total" - CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" + CPUManagerPinningRequestsTotalKey = "cpu_manager_pinning_requests_total" + CPUManagerPinningErrorsTotalKey = "cpu_manager_pinning_errors_total" + CPUManagerSharedPoolSizeMilliCoresKey = "cpu_manager_shared_pool_size_millicores" + CPUManagerExclusiveCPUsAllocationCountKey = "cpu_manager_exclusive_cpu_allocation_count" // Metrics to track the Memory manager behavior MemoryManagerPinningRequestsTotalKey = "memory_manager_pinning_requests_total" @@ -783,6 +785,27 @@ var ( }, ) + // CPUManagerSharedPoolSizeMilliCores reports the current size of the shared CPU pool for non-guaranteed pods + CPUManagerSharedPoolSizeMilliCores = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerSharedPoolSizeMilliCoresKey, + Help: "The size of the shared CPU pool for non-guaranteed QoS pods, in millicores.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // CPUManagerExclusiveCPUsAllocationCount reports the total number of CPUs exclusively allocated to containers running on this node + CPUManagerExclusiveCPUsAllocationCount = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: KubeletSubsystem, + Name: CPUManagerExclusiveCPUsAllocationCountKey, + Help: "The total number of CPUs exclusively allocated to containers running on this node", + StabilityLevel: metrics.ALPHA, + }, + ) + + // ContainerAlignedComputeResources reports the count of resources allocation which granted aligned resources, per alignment boundary ContainerAlignedComputeResources = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: KubeletSubsystem, @@ -792,7 +815,6 @@ var ( }, []string{ContainerAlignedComputeResourcesScopeLabelKey, ContainerAlignedComputeResourcesBoundaryLabelKey}, ) - // MemoryManagerPinningRequestTotal tracks the number of times the pod spec required the memory manager to pin memory pages MemoryManagerPinningRequestTotal = metrics.NewCounter( &metrics.CounterOpts{ @@ -1040,6 +1062,8 @@ func Register(collectors ...metrics.StableCollector) { legacyregistry.MustRegister(RunPodSandboxErrors) legacyregistry.MustRegister(CPUManagerPinningRequestsTotal) legacyregistry.MustRegister(CPUManagerPinningErrorsTotal) + legacyregistry.MustRegister(CPUManagerSharedPoolSizeMilliCores) + legacyregistry.MustRegister(CPUManagerExclusiveCPUsAllocationCount) legacyregistry.MustRegister(ContainerAlignedComputeResources) if utilfeature.DefaultFeatureGate.Enabled(features.MemoryManager) { legacyregistry.MustRegister(MemoryManagerPinningRequestTotal) diff --git a/test/e2e_node/cpu_manager_metrics_test.go b/test/e2e_node/cpu_manager_metrics_test.go index aacde715cab..7ec1db48451 100644 --- a/test/e2e_node/cpu_manager_metrics_test.go +++ b/test/e2e_node/cpu_manager_metrics_test.go @@ -29,8 +29,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" @@ -182,6 +185,95 @@ var _ = SIGDescribe("CPU Manager Metrics", framework.WithSerial(), feature.CPUMa ginkgo.By("Ensuring the metrics match the expectations about alignment metrics a few more times") gomega.Consistently(ctx, getKubeletMetrics, 1*time.Minute, 15*time.Second).Should(matchAlignmentMetrics) }) + + ginkgo.It("should report the default idle cpu pool size", func(ctx context.Context) { + ginkgo.By("Querying the podresources endpoint to get the baseline") + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + defer func() { + framework.ExpectNoError(conn.Close()) + }() + + ginkgo.By("Checking the pool allocatable resources from the kubelet") + resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectNoError(err, "failed to get the kubelet allocatable resources") + allocatableCPUs, _ := demuxCPUsAndDevicesFromGetAllocatableResources(resp) + + matchResourceMetrics := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_shared_pool_size_millicores": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(int(allocatableCPUs.Size() * 1000)), + }), + "kubelet_cpu_manager_exclusive_cpu_allocation_count": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics about idle pool size") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetrics) + ginkgo.By("Ensuring the metrics match the expectations about idle pool size a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetrics) + }) + + ginkgo.It("should report mutating cpu pool size when handling guaranteed pods", func(ctx context.Context) { + ginkgo.By("Querying the podresources endpoint to get the baseline") + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err, "LocalEndpoint() failed err: %v", err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err, "GetV1Client() failed err: %v", err) + defer func() { + framework.ExpectNoError(conn.Close()) + }() + + ginkgo.By("Checking the pool allocatable resources from the kubelet") + resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectNoError(err, "failed to get the kubelet allocatable resources") + allocatableCPUs, _ := demuxCPUsAndDevicesFromGetAllocatableResources(resp) + + allocatableCPUsIdleMillis := int(allocatableCPUs.Size() * 1000) + + matchResourceMetricsIdle := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_shared_pool_size_millicores": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(allocatableCPUsIdleMillis), + }), + "kubelet_cpu_manager_exclusive_cpu_allocation_count": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(0), + }), + }) + ginkgo.By(fmt.Sprintf("Pool allocatable resources from the kubelet: shared pool %d cpus %d millis", allocatableCPUs.Size(), allocatableCPUsIdleMillis)) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics about idle pool size") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetricsIdle) + ginkgo.By("Ensuring the metrics match the expectations about idle pool size a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsIdle) + + ginkgo.By("Creating the test pod to consume exclusive cpus from the pool") + testPod = e2epod.NewPodClient(f).CreateSync(ctx, makeGuaranteedCPUExclusiveSleeperPod("smt-cpupool", smtLevel)) + + matchResourceMetricsBusy := gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{ + "kubelet_cpu_manager_shared_pool_size_millicores": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(allocatableCPUsIdleMillis - (smtLevel * 1000)), + }), + "kubelet_cpu_manager_exclusive_cpu_allocation_count": gstruct.MatchAllElements(nodeID, gstruct.Elements{ + "": timelessSample(smtLevel), + }), + }) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetricsBusy) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsBusy) + + deletePodSyncByName(ctx, f, testPod.Name) + + ginkgo.By("Giving the Kubelet time to start up and produce metrics") + gomega.Eventually(ctx, getKubeletMetrics, 1*time.Minute, 10*time.Second).Should(matchResourceMetricsIdle) + ginkgo.By("Ensuring the metrics match the expectations a few more times") + gomega.Consistently(ctx, getKubeletMetrics, 30*time.Second, 10*time.Second).Should(matchResourceMetricsIdle) + }) }) }) diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index 494730414a3..b29eb5fba65 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -719,19 +719,16 @@ func podresourcesListTests(ctx context.Context, f *framework.Framework, cli kube } func podresourcesGetAllocatableResourcesTests(ctx context.Context, cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData, onlineCPUs, reservedSystemCPUs cpuset.CPUSet) { + ginkgo.GinkgoHelper() + ginkgo.By("checking the devices known to the kubelet") resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) - framework.ExpectNoErrorWithOffset(1, err) - devs := resp.GetDevices() - var cpus []int - for _, cpuid := range resp.GetCpuIds() { - cpus = append(cpus, int(cpuid)) - } - allocatableCPUs := cpuset.New(cpus...) + framework.ExpectNoError(err, "cannot get allocatable CPUs from podresources") + allocatableCPUs, devs := demuxCPUsAndDevicesFromGetAllocatableResources(resp) if onlineCPUs.Size() == 0 { ginkgo.By("expecting no CPUs reported") - gomega.ExpectWithOffset(1, onlineCPUs.Size()).To(gomega.Equal(reservedSystemCPUs.Size()), "with no online CPUs, no CPUs should be reserved") + gomega.Expect(onlineCPUs.Size()).To(gomega.Equal(reservedSystemCPUs.Size()), "with no online CPUs, no CPUs should be reserved") } else { ginkgo.By(fmt.Sprintf("expecting online CPUs reported - online=%v (%d) reserved=%v (%d)", onlineCPUs, onlineCPUs.Size(), reservedSystemCPUs, reservedSystemCPUs.Size())) if reservedSystemCPUs.Size() > onlineCPUs.Size() { @@ -740,23 +737,32 @@ func podresourcesGetAllocatableResourcesTests(ctx context.Context, cli kubeletpo expectedCPUs := onlineCPUs.Difference(reservedSystemCPUs) ginkgo.By(fmt.Sprintf("expecting CPUs '%v'='%v'", allocatableCPUs, expectedCPUs)) - gomega.ExpectWithOffset(1, allocatableCPUs.Equals(expectedCPUs)).To(gomega.BeTrueBecause("mismatch expecting CPUs")) + gomega.Expect(allocatableCPUs.Equals(expectedCPUs)).To(gomega.BeTrueBecause("mismatch expecting CPUs")) } if sd == nil { // no devices in the environment, so expect no devices ginkgo.By("expecting no devices reported") - gomega.ExpectWithOffset(1, devs).To(gomega.BeEmpty(), fmt.Sprintf("got unexpected devices %#v", devs)) + gomega.Expect(devs).To(gomega.BeEmpty(), fmt.Sprintf("got unexpected devices %#v", devs)) return } ginkgo.By(fmt.Sprintf("expecting some %q devices reported", sd.resourceName)) - gomega.ExpectWithOffset(1, devs).ToNot(gomega.BeEmpty()) + gomega.Expect(devs).ToNot(gomega.BeEmpty()) for _, dev := range devs { gomega.Expect(dev.ResourceName).To(gomega.Equal(sd.resourceName)) - gomega.ExpectWithOffset(1, dev.DeviceIds).ToNot(gomega.BeEmpty()) + gomega.Expect(dev.DeviceIds).ToNot(gomega.BeEmpty()) } } +func demuxCPUsAndDevicesFromGetAllocatableResources(resp *kubeletpodresourcesv1.AllocatableResourcesResponse) (cpuset.CPUSet, []*kubeletpodresourcesv1.ContainerDevices) { + devs := resp.GetDevices() + var cpus []int + for _, cpuid := range resp.GetCpuIds() { + cpus = append(cpus, int(cpuid)) + } + return cpuset.New(cpus...), devs +} + func podresourcesGetTests(ctx context.Context, f *framework.Framework, cli kubeletpodresourcesv1.PodResourcesListerClient, sidecarContainersEnabled bool) { //var err error ginkgo.By("checking the output when no pods are present")