Don't checkpoint ResizeStatus

This commit is contained in:
Tim Allclair 2024-11-04 15:30:48 -08:00
parent 4a4748d23c
commit ea53083c14
10 changed files with 35 additions and 105 deletions

View File

@ -114,7 +114,6 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/userns" "k8s.io/kubernetes/pkg/kubelet/userns"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/manager" "k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
@ -2799,25 +2798,22 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) {
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) { func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) {
allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod) allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod)
if !updated { if !updated {
// Desired resources == allocated resources. Check whether a resize is in progress.
resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus) resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus)
if resizeInProgress { if resizeInProgress {
// If a resize in progress, make sure the cache has the correct state in case the Kubelet restarted. // If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted.
if err := kl.statusManager.SetPodResizeStatus(pod.UID, v1.PodResizeStatusInProgress); err != nil { kl.statusManager.SetPodResizeStatus(pod.UID, v1.PodResizeStatusInProgress)
klog.ErrorS(err, "Failed to set resize status to InProgress", "pod", format.Pod(pod))
}
} else { } else {
// (Desired == Allocated == Actual) => clear the resize status. // (Desired == Allocated == Actual) => clear the resize status.
if err := kl.statusManager.SetPodResizeStatus(pod.UID, ""); err != nil { kl.statusManager.SetPodResizeStatus(pod.UID, "")
klog.ErrorS(err, "Failed to clear resize status", "pod", format.Pod(pod))
}
} }
// Pod allocation does not need to be updated.
// Pod is not resizing, nothing more to do here.
return allocatedPod, nil return allocatedPod, nil
} }
kl.podResizeMutex.Lock() kl.podResizeMutex.Lock()
defer kl.podResizeMutex.Unlock() defer kl.podResizeMutex.Unlock()
// Desired resources != allocated resources. Can we update the allocation to the desired resources?
fit, resizeStatus := kl.canResizePod(pod) fit, resizeStatus := kl.canResizePod(pod)
if fit { if fit {
// Update pod resource allocation checkpoint // Update pod resource allocation checkpoint
@ -2827,9 +2823,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
allocatedPod = pod allocatedPod = pod
} }
if resizeStatus != "" { if resizeStatus != "" {
if err := kl.statusManager.SetPodResizeStatus(pod.UID, resizeStatus); err != nil { kl.statusManager.SetPodResizeStatus(pod.UID, resizeStatus)
klog.ErrorS(err, "Failed to set resize status", "pod", format.Pod(pod), "resizeStatus", resizeStatus)
}
} }
return allocatedPod, nil return allocatedPod, nil
} }

View File

@ -1750,13 +1750,11 @@ func (kl *Kubelet) determinePodResizeStatus(allocatedPod *v1.Pod, podStatus *kub
// If pod is terminal, clear the resize status. // If pod is terminal, clear the resize status.
if podIsTerminal { if podIsTerminal {
if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, ""); err != nil { kl.statusManager.SetPodResizeStatus(allocatedPod.UID, "")
klog.ErrorS(err, "SetPodResizeStatus failed for terminal pod", "pod", format.Pod(allocatedPod))
}
return "" return ""
} }
resizeStatus, _ := kl.statusManager.GetPodResizeStatus(string(allocatedPod.UID)) resizeStatus := kl.statusManager.GetPodResizeStatus(allocatedPod.UID)
return resizeStatus return resizeStatus
} }

View File

@ -2777,7 +2777,7 @@ func TestHandlePodResourcesResize(t *testing.T) {
require.True(t, found, "container allocation") require.True(t, found, "container allocation")
assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation") assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation")
resizeStatus, _ := kubelet.statusManager.GetPodResizeStatus(string(newPod.UID)) resizeStatus := kubelet.statusManager.GetPodResizeStatus(newPod.UID)
assert.Equal(t, tt.expectedResize, resizeStatus) assert.Equal(t, tt.expectedResize, resizeStatus)
}) })
} }

View File

@ -68,8 +68,8 @@ func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerNam
return m.state.GetContainerResourceAllocation(podUID, containerName) return m.state.GetContainerResourceAllocation(podUID, containerName)
} }
func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { func (m *fakeManager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
return m.state.GetPodResizeStatus(podUID) return m.state.GetPodResizeStatus(string(podUID))
} }
func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
@ -86,8 +86,8 @@ func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
return nil return nil
} }
func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
return m.state.SetPodResizeStatus(string(podUID), resizeStatus) m.state.SetPodResizeStatus(string(podUID), resizeStatus)
} }
// NewFakeManager creates empty/fake memory manager // NewFakeManager creates empty/fake memory manager

View File

@ -28,8 +28,7 @@ import (
var _ checkpointmanager.Checkpoint = &Checkpoint{} var _ checkpointmanager.Checkpoint = &Checkpoint{}
type PodResourceAllocationInfo struct { type PodResourceAllocationInfo struct {
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"`
} }
// Checkpoint represents a structure to store pod resource allocation checkpoint data // Checkpoint represents a structure to store pod resource allocation checkpoint data

View File

@ -42,15 +42,13 @@ func (pr PodResourceAllocation) Clone() PodResourceAllocation {
type Reader interface { type Reader interface {
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
GetPodResourceAllocation() PodResourceAllocation GetPodResourceAllocation() PodResourceAllocation
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) GetPodResizeStatus(podUID string) v1.PodResizeStatus
GetResizeStatus() PodResizeStatus
} }
type writer interface { type writer interface {
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(PodResourceAllocation) error SetPodResourceAllocation(PodResourceAllocation) error
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus)
SetResizeStatus(PodResizeStatus) error
Delete(podUID string, containerName string) error Delete(podUID string, containerName string) error
ClearState() error ClearState() error
} }

View File

@ -80,10 +80,6 @@ func (sc *stateCheckpoint) restoreState() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to set pod resource allocation: %w", err) return fmt.Errorf("failed to set pod resource allocation: %w", err)
} }
err = sc.cache.SetResizeStatus(praInfo.ResizeStatusEntries)
if err != nil {
return fmt.Errorf("failed to set resize status: %w", err)
}
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return nil return nil
} }
@ -92,10 +88,8 @@ func (sc *stateCheckpoint) restoreState() error {
func (sc *stateCheckpoint) storeState() error { func (sc *stateCheckpoint) storeState() error {
podAllocation := sc.cache.GetPodResourceAllocation() podAllocation := sc.cache.GetPodResourceAllocation()
podResizeStatus := sc.cache.GetResizeStatus()
checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{ checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{
AllocationEntries: podAllocation, AllocationEntries: podAllocation,
ResizeStatusEntries: podResizeStatus,
}) })
if err != nil { if err != nil {
return fmt.Errorf("failed to create checkpoint: %w", err) return fmt.Errorf("failed to create checkpoint: %w", err)
@ -123,19 +117,12 @@ func (sc *stateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
} }
// GetPodResizeStatus returns the last resize decision for a pod // GetPodResizeStatus returns the last resize decision for a pod
func (sc *stateCheckpoint) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { func (sc *stateCheckpoint) GetPodResizeStatus(podUID string) v1.PodResizeStatus {
sc.mux.RLock() sc.mux.RLock()
defer sc.mux.RUnlock() defer sc.mux.RUnlock()
return sc.cache.GetPodResizeStatus(podUID) return sc.cache.GetPodResizeStatus(podUID)
} }
// GetResizeStatus returns the set of resize decisions made
func (sc *stateCheckpoint) GetResizeStatus() PodResizeStatus {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetResizeStatus()
}
// SetContainerResourceAllocation sets resources allocated to a pod's container // SetContainerResourceAllocation sets resources allocated to a pod's container
func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error {
sc.mux.Lock() sc.mux.Lock()
@ -153,19 +140,10 @@ func (sc *stateCheckpoint) SetPodResourceAllocation(a PodResourceAllocation) err
} }
// SetPodResizeStatus sets the last resize decision for a pod // SetPodResizeStatus sets the last resize decision for a pod
func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error { func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
sc.mux.Lock() sc.mux.Lock()
defer sc.mux.Unlock() defer sc.mux.Unlock()
sc.cache.SetPodResizeStatus(podUID, resizeStatus) sc.cache.SetPodResizeStatus(podUID, resizeStatus)
return sc.storeState()
}
// SetResizeStatus sets the resize decisions
func (sc *stateCheckpoint) SetResizeStatus(rs PodResizeStatus) error {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetResizeStatus(rs)
return sc.storeState()
} }
// Delete deletes allocations for specified pod // Delete deletes allocations for specified pod
@ -199,12 +177,8 @@ func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation
return nil return nil
} }
func (sc *noopStateCheckpoint) GetPodResizeStatus(_ string) (v1.PodResizeStatus, bool) { func (sc *noopStateCheckpoint) GetPodResizeStatus(_ string) v1.PodResizeStatus {
return "", false return ""
}
func (sc *noopStateCheckpoint) GetResizeStatus() PodResizeStatus {
return nil
} }
func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error { func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error {
@ -215,13 +189,7 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ PodResourceAllocation)
return nil return nil
} }
func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) error { func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {}
return nil
}
func (sc *noopStateCheckpoint) SetResizeStatus(_ PodResizeStatus) error {
return nil
}
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error { func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
return nil return nil

View File

@ -146,7 +146,6 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
}, },
}, },
}, },
ResizeStatusEntries: map[string]v1.PodResizeStatus{},
} }
checkpoint := &Checkpoint{} checkpoint := &Checkpoint{}
err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent)) err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent))
@ -160,7 +159,6 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
actualPodResourceAllocationInfo := &PodResourceAllocationInfo{} actualPodResourceAllocationInfo := &PodResourceAllocationInfo{}
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation() actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
actualPodResourceAllocationInfo.ResizeStatusEntries = sc.cache.GetResizeStatus()
require.NoError(t, err, "failed to get pod resource allocation info") require.NoError(t, err, "failed to get pod resource allocation info")
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
} }

View File

@ -54,22 +54,11 @@ func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation {
return s.podAllocation.Clone() return s.podAllocation.Clone()
} }
func (s *stateMemory) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { func (s *stateMemory) GetPodResizeStatus(podUID string) v1.PodResizeStatus {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
resizeStatus, ok := s.podResizeStatus[podUID] return s.podResizeStatus[podUID]
return resizeStatus, ok
}
func (s *stateMemory) GetResizeStatus() PodResizeStatus {
s.RLock()
defer s.RUnlock()
prs := make(map[string]v1.PodResizeStatus)
for k, v := range s.podResizeStatus {
prs[k] = v
}
return prs
} }
func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error {
@ -94,7 +83,7 @@ func (s *stateMemory) SetPodResourceAllocation(a PodResourceAllocation) error {
return nil return nil
} }
func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error { func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -104,19 +93,6 @@ func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResiz
delete(s.podResizeStatus, podUID) delete(s.podResizeStatus, podUID)
} }
klog.V(3).InfoS("Updated pod resize state", "podUID", podUID, "resizeStatus", resizeStatus) klog.V(3).InfoS("Updated pod resize state", "podUID", podUID, "resizeStatus", resizeStatus)
return nil
}
func (s *stateMemory) SetResizeStatus(rs PodResizeStatus) error {
s.Lock()
defer s.Unlock()
prs := make(map[string]v1.PodResizeStatus)
for k, v := range rs {
prs[k] = v
}
s.podResizeStatus = prs
klog.V(3).InfoS("Updated pod resize state", "resizes", rs)
return nil
} }
func (s *stateMemory) deleteContainer(podUID string, containerName string) { func (s *stateMemory) deleteContainer(podUID string, containerName string) {

View File

@ -143,11 +143,11 @@ type Manager interface {
// the provided podUIDs. // the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool) RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
// GetPodResizeStatus returns checkpointed PodStatus.Resize value // GetPodResizeStatus returns cached PodStatus.Resize value
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus
// SetPodResizeStatus checkpoints the last resizing decision for the pod. // SetPodResizeStatus caches the last resizing decision for the pod.
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus)
allocationManager allocationManager
} }
@ -285,12 +285,11 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*
return pod, updated return pod, updated
} }
// GetPodResizeStatus returns the last checkpointed ResizeStaus value // GetPodResizeStatus returns the last cached ResizeStatus value.
// If checkpoint manager has not been initialized, it returns nil, false func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
m.podStatusesLock.RLock() m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock() defer m.podStatusesLock.RUnlock()
return m.state.GetPodResizeStatus(podUID) return m.state.GetPodResizeStatus(string(podUID))
} }
// SetPodAllocation checkpoints the resources allocated to a pod's containers // SetPodAllocation checkpoints the resources allocated to a pod's containers
@ -307,10 +306,10 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
} }
// SetPodResizeStatus checkpoints the last resizing decision for the pod. // SetPodResizeStatus checkpoints the last resizing decision for the pod.
func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
m.podStatusesLock.RLock() m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock() defer m.podStatusesLock.RUnlock()
return m.state.SetPodResizeStatus(string(podUID), resizeStatus) m.state.SetPodResizeStatus(string(podUID), resizeStatus)
} }
func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {