mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #103979 from cynepco3hahue/save_admitted_pods
Do not clear state of pods pending admission for CPU/Memory/Device manager
This commit is contained in:
commit
6a043332be
@ -133,6 +133,9 @@ type manager struct {
|
||||
|
||||
// allocatableCPUs is the set of online CPUs as reported by the system
|
||||
allocatableCPUs cpuset.CPUSet
|
||||
|
||||
// pendingAdmissionPod contain the pod during the admission phase
|
||||
pendingAdmissionPod *v1.Pod
|
||||
}
|
||||
|
||||
var _ Manager = &manager{}
|
||||
@ -236,6 +239,10 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
|
||||
}
|
||||
|
||||
func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(p)
|
||||
|
||||
// Garbage collect any stranded resources before allocating CPUs.
|
||||
m.removeStaleState()
|
||||
|
||||
@ -304,6 +311,9 @@ func (m *manager) State() state.Reader {
|
||||
}
|
||||
|
||||
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
// Garbage collect any stranded resources before providing TopologyHints
|
||||
m.removeStaleState()
|
||||
// Delegate to active policy
|
||||
@ -311,6 +321,9 @@ func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[str
|
||||
}
|
||||
|
||||
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
// Garbage collect any stranded resources before providing TopologyHints
|
||||
m.removeStaleState()
|
||||
// Delegate to active policy
|
||||
@ -343,11 +356,14 @@ func (m *manager) removeStaleState() {
|
||||
defer m.Unlock()
|
||||
|
||||
// Get the list of active pods.
|
||||
activePods := m.activePods()
|
||||
activeAndAdmittedPods := m.activePods()
|
||||
if m.pendingAdmissionPod != nil {
|
||||
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
|
||||
}
|
||||
|
||||
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
|
||||
activeContainers := make(map[string]map[string]struct{})
|
||||
for _, pod := range activePods {
|
||||
for _, pod := range activeAndAdmittedPods {
|
||||
activeContainers[string(pod.UID)] = make(map[string]struct{})
|
||||
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
|
||||
activeContainers[string(pod.UID)][container.Name] = struct{}{}
|
||||
@ -493,3 +509,10 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet)
|
||||
func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
|
||||
return m.state.GetCPUSetOrDefault(podUID, containerName)
|
||||
}
|
||||
|
||||
func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.pendingAdmissionPod = pod
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ func TestCPUManagerAdd(t *testing.T) {
|
||||
|
||||
pod := makePod("fakePod", "fakeContainer", "2", "2")
|
||||
container := &pod.Spec.Containers[0]
|
||||
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
|
||||
mgr.activePods = func() []*v1.Pod { return nil }
|
||||
|
||||
err := mgr.Allocate(pod, container)
|
||||
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
|
||||
@ -1043,7 +1043,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
|
||||
|
||||
pod := makePod("fakePod", "fakeContainer", "2", "2")
|
||||
container := &pod.Spec.Containers[0]
|
||||
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
|
||||
mgr.activePods = func() []*v1.Pod { return nil }
|
||||
|
||||
err := mgr.Allocate(pod, container)
|
||||
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
|
||||
|
@ -188,6 +188,11 @@ func TestGetTopologyHints(t *testing.T) {
|
||||
if len(tc.expectedHints) == 0 && len(hints) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if m.pendingAdmissionPod == nil {
|
||||
t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()")
|
||||
}
|
||||
|
||||
sort.SliceStable(hints, func(i, j int) bool {
|
||||
return hints[i].LessThan(hints[j])
|
||||
})
|
||||
@ -236,6 +241,7 @@ func TestGetPodTopologyHints(t *testing.T) {
|
||||
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
sort.SliceStable(podHints, func(i, j int) bool {
|
||||
return podHints[i].LessThan(podHints[j])
|
||||
})
|
||||
|
@ -108,6 +108,9 @@ type ManagerImpl struct {
|
||||
// devicesToReuse contains devices that can be reused as they have been allocated to
|
||||
// init containers.
|
||||
devicesToReuse PodReusableDevices
|
||||
|
||||
// pendingAdmissionPod contain the pod during the admission phase
|
||||
pendingAdmissionPod *v1.Pod
|
||||
}
|
||||
|
||||
type endpointInfo struct {
|
||||
@ -367,6 +370,10 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
|
||||
// Allocate is the call that you can use to allocate a set of devices
|
||||
// from the registered device plugins.
|
||||
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
|
||||
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
|
||||
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
|
||||
}
|
||||
@ -619,14 +626,20 @@ func (m *ManagerImpl) readCheckpoint() error {
|
||||
|
||||
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
|
||||
func (m *ManagerImpl) UpdateAllocatedDevices() {
|
||||
activePods := m.activePods()
|
||||
if !m.sourcesReady.AllReady() {
|
||||
return
|
||||
}
|
||||
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
activeAndAdmittedPods := m.activePods()
|
||||
if m.pendingAdmissionPod != nil {
|
||||
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
|
||||
}
|
||||
|
||||
podsToBeRemoved := m.podDevices.pods()
|
||||
for _, pod := range activePods {
|
||||
for _, pod := range activeAndAdmittedPods {
|
||||
podsToBeRemoved.Delete(string(pod.UID))
|
||||
}
|
||||
if len(podsToBeRemoved) <= 0 {
|
||||
@ -1117,3 +1130,10 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.pendingAdmissionPod = pod
|
||||
}
|
||||
|
@ -29,6 +29,10 @@ import (
|
||||
// ensures the Device Manager is consulted when Topology Aware Hints for each
|
||||
// container are created.
|
||||
func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
|
||||
// Garbage collect any stranded device resources before providing TopologyHints
|
||||
m.UpdateAllocatedDevices()
|
||||
|
||||
@ -83,6 +87,10 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
|
||||
// GetPodTopologyHints implements the topologymanager.HintProvider Interface which
|
||||
// ensures the Device Manager is consulted when Topology Aware Hints for Pod are created.
|
||||
func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
|
||||
// Garbage collect any stranded device resources before providing TopologyHints
|
||||
m.UpdateAllocatedDevices()
|
||||
|
||||
|
@ -124,6 +124,9 @@ type manager struct {
|
||||
|
||||
// allocatableMemory holds the allocatable memory for each NUMA node
|
||||
allocatableMemory []state.Block
|
||||
|
||||
// pendingAdmissionPod contain the pod during the admission phase
|
||||
pendingAdmissionPod *v1.Pod
|
||||
}
|
||||
|
||||
var _ Manager = &manager{}
|
||||
@ -230,6 +233,10 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.
|
||||
|
||||
// Allocate is called to pre-allocate memory resources during Pod admission.
|
||||
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
|
||||
// Garbage collect any stranded resources before allocation
|
||||
m.removeStaleState()
|
||||
|
||||
@ -268,6 +275,10 @@ func (m *manager) State() state.Reader {
|
||||
|
||||
// GetPodTopologyHints returns the topology hints for the topology manager
|
||||
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
|
||||
// Garbage collect any stranded resources before providing TopologyHints
|
||||
m.removeStaleState()
|
||||
// Delegate to active policy
|
||||
@ -276,6 +287,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.
|
||||
|
||||
// GetTopologyHints returns the topology hints for the topology manager
|
||||
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
|
||||
// The pod is during the admission phase. We need to save the pod to avoid it
|
||||
// being cleaned before the admission ended
|
||||
m.setPodPendingAdmission(pod)
|
||||
|
||||
// Garbage collect any stranded resources before providing TopologyHints
|
||||
m.removeStaleState()
|
||||
// Delegate to active policy
|
||||
@ -298,12 +313,15 @@ func (m *manager) removeStaleState() {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Get the list of active pods.
|
||||
activePods := m.activePods()
|
||||
// Get the list of admitted and active pods.
|
||||
activeAndAdmittedPods := m.activePods()
|
||||
if m.pendingAdmissionPod != nil {
|
||||
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
|
||||
}
|
||||
|
||||
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
|
||||
activeContainers := make(map[string]map[string]struct{})
|
||||
for _, pod := range activePods {
|
||||
for _, pod := range activeAndAdmittedPods {
|
||||
activeContainers[string(pod.UID)] = make(map[string]struct{})
|
||||
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
|
||||
activeContainers[string(pod.UID)][container.Name] = struct{}{}
|
||||
@ -430,3 +448,10 @@ func (m *manager) GetAllocatableMemory() []state.Block {
|
||||
func (m *manager) GetMemory(podUID, containerName string) []state.Block {
|
||||
return m.state.GetMemoryBlocks(podUID, containerName)
|
||||
}
|
||||
|
||||
func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.pendingAdmissionPod = pod
|
||||
}
|
||||
|
@ -2019,6 +2019,129 @@ func TestNewManager(t *testing.T) {
|
||||
|
||||
func TestGetTopologyHints(t *testing.T) {
|
||||
testCases := []testMemoryManager{
|
||||
{
|
||||
description: "Successful hint generation",
|
||||
policyName: policyTypeStatic,
|
||||
machineInfo: returnMachineInfo(),
|
||||
reserved: systemReservedMemory{
|
||||
0: map[v1.ResourceName]uint64{
|
||||
v1.ResourceMemory: 1 * gb,
|
||||
},
|
||||
1: map[v1.ResourceName]uint64{
|
||||
v1.ResourceMemory: 1 * gb,
|
||||
},
|
||||
},
|
||||
assignments: state.ContainerMemoryAssignments{
|
||||
"fakePod1": map[string][]state.Block{
|
||||
"fakeContainer1": {
|
||||
{
|
||||
NUMAAffinity: []int{0},
|
||||
Type: v1.ResourceMemory,
|
||||
Size: 1 * gb,
|
||||
},
|
||||
{
|
||||
NUMAAffinity: []int{0},
|
||||
Type: hugepages1Gi,
|
||||
Size: 1 * gb,
|
||||
},
|
||||
},
|
||||
"fakeContainer2": {
|
||||
{
|
||||
NUMAAffinity: []int{0},
|
||||
Type: v1.ResourceMemory,
|
||||
Size: 1 * gb,
|
||||
},
|
||||
{
|
||||
NUMAAffinity: []int{0},
|
||||
Type: hugepages1Gi,
|
||||
Size: 1 * gb,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
machineState: state.NUMANodeMap{
|
||||
0: &state.NUMANodeState{
|
||||
Cells: []int{0},
|
||||
NumberOfAssignments: 4,
|
||||
MemoryMap: map[v1.ResourceName]*state.MemoryTable{
|
||||
v1.ResourceMemory: {
|
||||
Allocatable: 9 * gb,
|
||||
Free: 7 * gb,
|
||||
Reserved: 2 * gb,
|
||||
SystemReserved: 1 * gb,
|
||||
TotalMemSize: 10 * gb,
|
||||
},
|
||||
hugepages1Gi: {
|
||||
Allocatable: 5 * gb,
|
||||
Free: 3 * gb,
|
||||
Reserved: 2 * gb,
|
||||
SystemReserved: 0 * gb,
|
||||
TotalMemSize: 5 * gb,
|
||||
},
|
||||
},
|
||||
},
|
||||
1: &state.NUMANodeState{
|
||||
Cells: []int{1},
|
||||
NumberOfAssignments: 0,
|
||||
MemoryMap: map[v1.ResourceName]*state.MemoryTable{
|
||||
v1.ResourceMemory: {
|
||||
Allocatable: 9 * gb,
|
||||
Free: 9 * gb,
|
||||
Reserved: 0 * gb,
|
||||
SystemReserved: 1 * gb,
|
||||
TotalMemSize: 10 * gb,
|
||||
},
|
||||
hugepages1Gi: {
|
||||
Allocatable: 5 * gb,
|
||||
Free: 5 * gb,
|
||||
Reserved: 0,
|
||||
SystemReserved: 0,
|
||||
TotalMemSize: 5 * gb,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: nil,
|
||||
expectedHints: map[string][]topologymanager.TopologyHint{
|
||||
string(v1.ResourceMemory): {
|
||||
{
|
||||
NUMANodeAffinity: newNUMAAffinity(0),
|
||||
Preferred: true,
|
||||
},
|
||||
{
|
||||
NUMANodeAffinity: newNUMAAffinity(1),
|
||||
Preferred: true,
|
||||
},
|
||||
},
|
||||
string(hugepages1Gi): {
|
||||
{
|
||||
NUMANodeAffinity: newNUMAAffinity(0),
|
||||
Preferred: true,
|
||||
},
|
||||
{
|
||||
NUMANodeAffinity: newNUMAAffinity(1),
|
||||
Preferred: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
activePods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: "fakePod1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "fakeContainer1",
|
||||
},
|
||||
{
|
||||
Name: "fakeContainer2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "Successful hint generation",
|
||||
policyName: policyTypeStatic,
|
||||
@ -2132,6 +2255,7 @@ func TestGetTopologyHints(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
activePods: []*v1.Pod{},
|
||||
},
|
||||
}
|
||||
|
||||
@ -2144,14 +2268,14 @@ func TestGetTopologyHints(t *testing.T) {
|
||||
containerRuntime: mockRuntimeService{
|
||||
err: nil,
|
||||
},
|
||||
activePods: func() []*v1.Pod { return nil },
|
||||
activePods: func() []*v1.Pod { return testCase.activePods },
|
||||
podStatusProvider: mockPodStatusProvider{},
|
||||
}
|
||||
mgr.sourcesReady = &sourcesReadyStub{}
|
||||
mgr.state.SetMachineState(testCase.machineState.Clone())
|
||||
mgr.state.SetMemoryAssignments(testCase.assignments.Clone())
|
||||
|
||||
pod := getPod("fakePod1", "fakeContainer1", requirementsGuaranteed)
|
||||
pod := getPod("fakePod2", "fakeContainer1", requirementsGuaranteed)
|
||||
container := &pod.Spec.Containers[0]
|
||||
hints := mgr.GetTopologyHints(pod, container)
|
||||
if !reflect.DeepEqual(hints, testCase.expectedHints) {
|
||||
|
Loading…
Reference in New Issue
Block a user