diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index ec793cb5bb1..4777c132e81 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -133,6 +133,9 @@ type manager struct { // allocatableCPUs is the set of online CPUs as reported by the system allocatableCPUs cpuset.CPUSet + + // pendingAdmissionPod contain the pod during the admission phase + pendingAdmissionPod *v1.Pod } var _ Manager = &manager{} @@ -236,6 +239,10 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe } func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(p) + // Garbage collect any stranded resources before allocating CPUs. m.removeStaleState() @@ -304,6 +311,9 @@ func (m *manager) State() state.Reader { } func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -311,6 +321,9 @@ func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[str } func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -343,11 +356,14 @@ func (m *manager) removeStaleState() { defer m.Unlock() // Get the list of active pods. - activePods := m.activePods() + activeAndAdmittedPods := m.activePods() + if m.pendingAdmissionPod != nil { + activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) + } // Build a list of (podUID, containerName) pairs for all containers in all active Pods. activeContainers := make(map[string]map[string]struct{}) - for _, pod := range activePods { + for _, pod := range activeAndAdmittedPods { activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { activeContainers[string(pod.UID)][container.Name] = struct{}{} @@ -493,3 +509,10 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet { return m.state.GetCPUSetOrDefault(podUID, containerName) } + +func (m *manager) setPodPendingAdmission(pod *v1.Pod) { + m.Lock() + defer m.Unlock() + + m.pendingAdmissionPod = pod +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index dc818cb0f58..db0a3560a85 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -277,7 +277,7 @@ func TestCPUManagerAdd(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} } + mgr.activePods = func() []*v1.Pod { return nil } err := mgr.Allocate(pod, container) if !reflect.DeepEqual(err, testCase.expAllocateErr) { @@ -1043,7 +1043,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) { pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] - mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} } + mgr.activePods = func() []*v1.Pod { return nil } err := mgr.Allocate(pod, container) if !reflect.DeepEqual(err, testCase.expAllocateErr) { diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index e7ac5eb9669..9b8abe77488 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -188,6 +188,11 @@ func TestGetTopologyHints(t *testing.T) { if len(tc.expectedHints) == 0 && len(hints) == 0 { continue } + + if m.pendingAdmissionPod == nil { + t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()") + } + sort.SliceStable(hints, func(i, j int) bool { return hints[i].LessThan(hints[j]) }) @@ -236,6 +241,7 @@ func TestGetPodTopologyHints(t *testing.T) { if len(tc.expectedHints) == 0 && len(podHints) == 0 { continue } + sort.SliceStable(podHints, func(i, j int) bool { return podHints[i].LessThan(podHints[j]) }) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 3925a3fd6e6..424aacfc224 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -108,6 +108,9 @@ type ManagerImpl struct { // devicesToReuse contains devices that can be reused as they have been allocated to // init containers. devicesToReuse PodReusableDevices + + // pendingAdmissionPod contain the pod during the admission phase + pendingAdmissionPod *v1.Pod } type endpointInfo struct { @@ -367,6 +370,10 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool { // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) + if _, ok := m.devicesToReuse[string(pod.UID)]; !ok { m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String) } @@ -619,14 +626,20 @@ func (m *ManagerImpl) readCheckpoint() error { // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. func (m *ManagerImpl) UpdateAllocatedDevices() { - activePods := m.activePods() if !m.sourcesReady.AllReady() { return } + m.mutex.Lock() defer m.mutex.Unlock() + + activeAndAdmittedPods := m.activePods() + if m.pendingAdmissionPod != nil { + activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) + } + podsToBeRemoved := m.podDevices.pods() - for _, pod := range activePods { + for _, pod := range activeAndAdmittedPods { podsToBeRemoved.Delete(string(pod.UID)) } if len(podsToBeRemoved) <= 0 { @@ -1117,3 +1130,10 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool { } return false } + +func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.pendingAdmissionPod = pod +} diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go index 32f26f139a8..4b094c7adbf 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -29,6 +29,10 @@ import ( // ensures the Device Manager is consulted when Topology Aware Hints for each // container are created. func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) + // Garbage collect any stranded device resources before providing TopologyHints m.UpdateAllocatedDevices() @@ -83,6 +87,10 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map // GetPodTopologyHints implements the topologymanager.HintProvider Interface which // ensures the Device Manager is consulted when Topology Aware Hints for Pod are created. func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) + // Garbage collect any stranded device resources before providing TopologyHints m.UpdateAllocatedDevices() diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 4c8a786c5e8..3bf4dc0bf0e 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -124,6 +124,9 @@ type manager struct { // allocatableMemory holds the allocatable memory for each NUMA node allocatableMemory []state.Block + + // pendingAdmissionPod contain the pod during the admission phase + pendingAdmissionPod *v1.Pod } var _ Manager = &manager{} @@ -230,6 +233,10 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets. // Allocate is called to pre-allocate memory resources during Pod admission. func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) + // Garbage collect any stranded resources before allocation m.removeStaleState() @@ -268,6 +275,10 @@ func (m *manager) State() state.Reader { // GetPodTopologyHints returns the topology hints for the topology manager func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) + // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -276,6 +287,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager. // GetTopologyHints returns the topology hints for the topology manager func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { + // The pod is during the admission phase. We need to save the pod to avoid it + // being cleaned before the admission ended + m.setPodPendingAdmission(pod) + // Garbage collect any stranded resources before providing TopologyHints m.removeStaleState() // Delegate to active policy @@ -298,12 +313,15 @@ func (m *manager) removeStaleState() { m.Lock() defer m.Unlock() - // Get the list of active pods. - activePods := m.activePods() + // Get the list of admitted and active pods. + activeAndAdmittedPods := m.activePods() + if m.pendingAdmissionPod != nil { + activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod) + } // Build a list of (podUID, containerName) pairs for all containers in all active Pods. activeContainers := make(map[string]map[string]struct{}) - for _, pod := range activePods { + for _, pod := range activeAndAdmittedPods { activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { activeContainers[string(pod.UID)][container.Name] = struct{}{} @@ -430,3 +448,10 @@ func (m *manager) GetAllocatableMemory() []state.Block { func (m *manager) GetMemory(podUID, containerName string) []state.Block { return m.state.GetMemoryBlocks(podUID, containerName) } + +func (m *manager) setPodPendingAdmission(pod *v1.Pod) { + m.Lock() + defer m.Unlock() + + m.pendingAdmissionPod = pod +} diff --git a/pkg/kubelet/cm/memorymanager/memory_manager_test.go b/pkg/kubelet/cm/memorymanager/memory_manager_test.go index d21615919b8..846b63c51f0 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager_test.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager_test.go @@ -2019,6 +2019,129 @@ func TestNewManager(t *testing.T) { func TestGetTopologyHints(t *testing.T) { testCases := []testMemoryManager{ + { + description: "Successful hint generation", + policyName: policyTypeStatic, + machineInfo: returnMachineInfo(), + reserved: systemReservedMemory{ + 0: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 1 * gb, + }, + 1: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 1 * gb, + }, + }, + assignments: state.ContainerMemoryAssignments{ + "fakePod1": map[string][]state.Block{ + "fakeContainer1": { + { + NUMAAffinity: []int{0}, + Type: v1.ResourceMemory, + Size: 1 * gb, + }, + { + NUMAAffinity: []int{0}, + Type: hugepages1Gi, + Size: 1 * gb, + }, + }, + "fakeContainer2": { + { + NUMAAffinity: []int{0}, + Type: v1.ResourceMemory, + Size: 1 * gb, + }, + { + NUMAAffinity: []int{0}, + Type: hugepages1Gi, + Size: 1 * gb, + }, + }, + }, + }, + machineState: state.NUMANodeMap{ + 0: &state.NUMANodeState{ + Cells: []int{0}, + NumberOfAssignments: 4, + MemoryMap: map[v1.ResourceName]*state.MemoryTable{ + v1.ResourceMemory: { + Allocatable: 9 * gb, + Free: 7 * gb, + Reserved: 2 * gb, + SystemReserved: 1 * gb, + TotalMemSize: 10 * gb, + }, + hugepages1Gi: { + Allocatable: 5 * gb, + Free: 3 * gb, + Reserved: 2 * gb, + SystemReserved: 0 * gb, + TotalMemSize: 5 * gb, + }, + }, + }, + 1: &state.NUMANodeState{ + Cells: []int{1}, + NumberOfAssignments: 0, + MemoryMap: map[v1.ResourceName]*state.MemoryTable{ + v1.ResourceMemory: { + Allocatable: 9 * gb, + Free: 9 * gb, + Reserved: 0 * gb, + SystemReserved: 1 * gb, + TotalMemSize: 10 * gb, + }, + hugepages1Gi: { + Allocatable: 5 * gb, + Free: 5 * gb, + Reserved: 0, + SystemReserved: 0, + TotalMemSize: 5 * gb, + }, + }, + }, + }, + expectedError: nil, + expectedHints: map[string][]topologymanager.TopologyHint{ + string(v1.ResourceMemory): { + { + NUMANodeAffinity: newNUMAAffinity(0), + Preferred: true, + }, + { + NUMANodeAffinity: newNUMAAffinity(1), + Preferred: true, + }, + }, + string(hugepages1Gi): { + { + NUMANodeAffinity: newNUMAAffinity(0), + Preferred: true, + }, + { + NUMANodeAffinity: newNUMAAffinity(1), + Preferred: true, + }, + }, + }, + activePods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "fakePod1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fakeContainer1", + }, + { + Name: "fakeContainer2", + }, + }, + }, + }, + }, + }, { description: "Successful hint generation", policyName: policyTypeStatic, @@ -2132,6 +2255,7 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, + activePods: []*v1.Pod{}, }, } @@ -2144,14 +2268,14 @@ func TestGetTopologyHints(t *testing.T) { containerRuntime: mockRuntimeService{ err: nil, }, - activePods: func() []*v1.Pod { return nil }, + activePods: func() []*v1.Pod { return testCase.activePods }, podStatusProvider: mockPodStatusProvider{}, } mgr.sourcesReady = &sourcesReadyStub{} mgr.state.SetMachineState(testCase.machineState.Clone()) mgr.state.SetMemoryAssignments(testCase.assignments.Clone()) - pod := getPod("fakePod1", "fakeContainer1", requirementsGuaranteed) + pod := getPod("fakePod2", "fakeContainer1", requirementsGuaranteed) container := &pod.Spec.Containers[0] hints := mgr.GetTopologyHints(pod, container) if !reflect.DeepEqual(hints, testCase.expectedHints) {