Merge pull request #128551 from tallclair/allocated-checkpoint

[FG:InPlacePodVerticalScaling] Don't checkpoint ResizeStatus
This commit is contained in:
Kubernetes Prow Robot 2024-11-06 04:19:36 +00:00 committed by GitHub
commit 5e0b818ff9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 94 additions and 128 deletions

View File

@ -114,7 +114,6 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/userns"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
@ -1829,7 +1828,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// this conveniently retries any Deferred resize requests
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan
// See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060
pod, err = kl.handlePodResourcesResize(pod)
pod, err = kl.handlePodResourcesResize(pod, podStatus)
if err != nil {
return false, err
}
@ -2794,23 +2793,28 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) {
return true, v1.PodResizeStatusInProgress
}
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) {
// handlePodResourcesResize returns the "allocated pod", which should be used for all resource
// calculations after this function is called. It also updates the cached ResizeStatus according to
// the allocation decision and pod status.
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) {
allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod)
if !updated {
// Unless a resize is in-progress, clear the resize status.
resizeStatus, _ := kl.statusManager.GetPodResizeStatus(string(pod.UID))
if resizeStatus != v1.PodResizeStatusInProgress {
if err := kl.statusManager.SetPodResizeStatus(pod.UID, ""); err != nil {
klog.ErrorS(err, "Failed to clear resize status", "pod", format.Pod(pod))
}
// Desired resources == allocated resources. Check whether a resize is in progress.
resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus)
if resizeInProgress {
// If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted.
kl.statusManager.SetPodResizeStatus(pod.UID, v1.PodResizeStatusInProgress)
} else {
// (Desired == Allocated == Actual) => clear the resize status.
kl.statusManager.SetPodResizeStatus(pod.UID, "")
}
// Pod is not resizing, nothing more to do here.
// Pod allocation does not need to be updated.
return allocatedPod, nil
}
kl.podResizeMutex.Lock()
defer kl.podResizeMutex.Unlock()
// Desired resources != allocated resources. Can we update the allocation to the desired resources?
fit, resizeStatus := kl.canResizePod(pod)
if fit {
// Update pod resource allocation checkpoint
@ -2820,11 +2824,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) {
allocatedPod = pod
}
if resizeStatus != "" {
// Save resize decision to checkpoint
if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, resizeStatus); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(allocatedPod))
}
kl.statusManager.SetPodResizeStatus(pod.UID, resizeStatus)
}
return allocatedPod, nil
}

View File

@ -1750,22 +1750,11 @@ func (kl *Kubelet) determinePodResizeStatus(allocatedPod *v1.Pod, podStatus *kub
// If pod is terminal, clear the resize status.
if podIsTerminal {
if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, ""); err != nil {
klog.ErrorS(err, "SetPodResizeStatus failed for terminal pod", "pod", format.Pod(allocatedPod))
}
kl.statusManager.SetPodResizeStatus(allocatedPod.UID, "")
return ""
}
resizeStatus, _ := kl.statusManager.GetPodResizeStatus(string(allocatedPod.UID))
// If the resize was in-progress and the actual resources match the allocated resources, mark
// the resize as complete by clearing the resize status.
if resizeStatus == v1.PodResizeStatusInProgress &&
allocatedResourcesMatchStatus(allocatedPod, podStatus) {
if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, ""); err != nil {
klog.ErrorS(err, "SetPodResizeStatus failed", "pod", format.Pod(allocatedPod))
}
return ""
}
resizeStatus := kl.statusManager.GetPodResizeStatus(allocatedPod.UID)
return resizeStatus
}

View File

@ -2665,11 +2665,12 @@ func TestHandlePodResourcesResize(t *testing.T) {
defer kubelet.podManager.RemovePod(testPod1)
tests := []struct {
name string
pod *v1.Pod
newRequests v1.ResourceList
expectedAllocations v1.ResourceList
expectedResize v1.PodResizeStatus
name string
pod *v1.Pod
newRequests v1.ResourceList
newRequestsAllocated bool // Whether the new requests have already been allocated (but not actuated)
expectedAllocations v1.ResourceList
expectedResize v1.PodResizeStatus
}{
{
name: "Request CPU and memory decrease - expect InProgress",
@ -2720,25 +2721,63 @@ func TestHandlePodResourcesResize(t *testing.T) {
expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
expectedResize: v1.PodResizeStatusInfeasible,
},
{
name: "CPU increase in progress - expect InProgress",
pod: testPod2,
newRequests: v1.ResourceList{v1.ResourceCPU: cpu1500m, v1.ResourceMemory: mem1000M},
newRequestsAllocated: true,
expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1500m, v1.ResourceMemory: mem1000M},
expectedResize: v1.PodResizeStatusInProgress,
},
{
name: "No resize",
pod: testPod2,
newRequests: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
expectedResize: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kubelet.statusManager = status.NewFakeManager()
require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod))
pod := tt.pod.DeepCopy()
pod.Spec.Containers[0].Resources.Requests = tt.newRequests
updatedPod, err := kubelet.handlePodResourcesResize(pod)
newPod := tt.pod.DeepCopy()
newPod.Spec.Containers[0].Resources.Requests = tt.newRequests
if !tt.newRequestsAllocated {
require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod))
} else {
require.NoError(t, kubelet.statusManager.SetPodAllocation(newPod))
}
podStatus := &kubecontainer.PodStatus{
ID: tt.pod.UID,
Name: tt.pod.Name,
Namespace: tt.pod.Namespace,
ContainerStatuses: make([]*kubecontainer.Status, len(tt.pod.Spec.Containers)),
}
for i, c := range tt.pod.Spec.Containers {
podStatus.ContainerStatuses[i] = &kubecontainer.Status{
Name: c.Name,
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: c.Resources.Requests.Cpu(),
CPULimit: c.Resources.Limits.Cpu(),
MemoryLimit: c.Resources.Limits.Memory(),
},
}
}
updatedPod, err := kubelet.handlePodResourcesResize(newPod, podStatus)
require.NoError(t, err)
assert.Equal(t, tt.expectedAllocations, updatedPod.Spec.Containers[0].Resources.Requests, "updated pod spec resources")
alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(pod.UID), pod.Spec.Containers[0].Name)
alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(newPod.UID), newPod.Spec.Containers[0].Name)
require.True(t, found, "container allocation")
assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation")
resizeStatus, found := kubelet.statusManager.GetPodResizeStatus(string(pod.UID))
require.True(t, found, "pod resize status")
resizeStatus := kubelet.statusManager.GetPodResizeStatus(newPod.UID)
assert.Equal(t, tt.expectedResize, resizeStatus)
})
}

View File

@ -68,8 +68,8 @@ func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerNam
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
return m.state.GetPodResizeStatus(podUID)
func (m *fakeManager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
return m.state.GetPodResizeStatus(string(podUID))
}
func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
@ -86,8 +86,8 @@ func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
return nil
}
func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error {
return m.state.SetPodResizeStatus(string(podUID), resizeStatus)
func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
m.state.SetPodResizeStatus(string(podUID), resizeStatus)
}
// NewFakeManager creates empty/fake memory manager

View File

@ -28,8 +28,7 @@ import (
var _ checkpointmanager.Checkpoint = &Checkpoint{}
type PodResourceAllocationInfo struct {
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"`
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
}
// Checkpoint represents a structure to store pod resource allocation checkpoint data

View File

@ -42,15 +42,13 @@ func (pr PodResourceAllocation) Clone() PodResourceAllocation {
type Reader interface {
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
GetPodResourceAllocation() PodResourceAllocation
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool)
GetResizeStatus() PodResizeStatus
GetPodResizeStatus(podUID string) v1.PodResizeStatus
}
type writer interface {
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(PodResourceAllocation) error
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error
SetResizeStatus(PodResizeStatus) error
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus)
Delete(podUID string, containerName string) error
ClearState() error
}

View File

@ -80,10 +80,6 @@ func (sc *stateCheckpoint) restoreState() error {
if err != nil {
return fmt.Errorf("failed to set pod resource allocation: %w", err)
}
err = sc.cache.SetResizeStatus(praInfo.ResizeStatusEntries)
if err != nil {
return fmt.Errorf("failed to set resize status: %w", err)
}
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return nil
}
@ -92,10 +88,8 @@ func (sc *stateCheckpoint) restoreState() error {
func (sc *stateCheckpoint) storeState() error {
podAllocation := sc.cache.GetPodResourceAllocation()
podResizeStatus := sc.cache.GetResizeStatus()
checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{
AllocationEntries: podAllocation,
ResizeStatusEntries: podResizeStatus,
AllocationEntries: podAllocation,
})
if err != nil {
return fmt.Errorf("failed to create checkpoint: %w", err)
@ -123,19 +117,12 @@ func (sc *stateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
}
// GetPodResizeStatus returns the last resize decision for a pod
func (sc *stateCheckpoint) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
func (sc *stateCheckpoint) GetPodResizeStatus(podUID string) v1.PodResizeStatus {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetPodResizeStatus(podUID)
}
// GetResizeStatus returns the set of resize decisions made
func (sc *stateCheckpoint) GetResizeStatus() PodResizeStatus {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetResizeStatus()
}
// SetContainerResourceAllocation sets resources allocated to a pod's container
func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error {
sc.mux.Lock()
@ -153,19 +140,10 @@ func (sc *stateCheckpoint) SetPodResourceAllocation(a PodResourceAllocation) err
}
// SetPodResizeStatus sets the last resize decision for a pod
func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error {
func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetPodResizeStatus(podUID, resizeStatus)
return sc.storeState()
}
// SetResizeStatus sets the resize decisions
func (sc *stateCheckpoint) SetResizeStatus(rs PodResizeStatus) error {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetResizeStatus(rs)
return sc.storeState()
}
// Delete deletes allocations for specified pod
@ -199,12 +177,8 @@ func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation
return nil
}
func (sc *noopStateCheckpoint) GetPodResizeStatus(_ string) (v1.PodResizeStatus, bool) {
return "", false
}
func (sc *noopStateCheckpoint) GetResizeStatus() PodResizeStatus {
return nil
func (sc *noopStateCheckpoint) GetPodResizeStatus(_ string) v1.PodResizeStatus {
return ""
}
func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error {
@ -215,13 +189,7 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ PodResourceAllocation)
return nil
}
func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) error {
return nil
}
func (sc *noopStateCheckpoint) SetResizeStatus(_ PodResizeStatus) error {
return nil
}
func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {}
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
return nil

View File

@ -146,7 +146,6 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
},
},
},
ResizeStatusEntries: map[string]v1.PodResizeStatus{},
}
checkpoint := &Checkpoint{}
err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent))
@ -160,7 +159,6 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
actualPodResourceAllocationInfo := &PodResourceAllocationInfo{}
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
actualPodResourceAllocationInfo.ResizeStatusEntries = sc.cache.GetResizeStatus()
require.NoError(t, err, "failed to get pod resource allocation info")
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
}

View File

@ -54,22 +54,11 @@ func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation {
return s.podAllocation.Clone()
}
func (s *stateMemory) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
func (s *stateMemory) GetPodResizeStatus(podUID string) v1.PodResizeStatus {
s.RLock()
defer s.RUnlock()
resizeStatus, ok := s.podResizeStatus[podUID]
return resizeStatus, ok
}
func (s *stateMemory) GetResizeStatus() PodResizeStatus {
s.RLock()
defer s.RUnlock()
prs := make(map[string]v1.PodResizeStatus)
for k, v := range s.podResizeStatus {
prs[k] = v
}
return prs
return s.podResizeStatus[podUID]
}
func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error {
@ -94,7 +83,7 @@ func (s *stateMemory) SetPodResourceAllocation(a PodResourceAllocation) error {
return nil
}
func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error {
func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
s.Lock()
defer s.Unlock()
@ -104,19 +93,6 @@ func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResiz
delete(s.podResizeStatus, podUID)
}
klog.V(3).InfoS("Updated pod resize state", "podUID", podUID, "resizeStatus", resizeStatus)
return nil
}
func (s *stateMemory) SetResizeStatus(rs PodResizeStatus) error {
s.Lock()
defer s.Unlock()
prs := make(map[string]v1.PodResizeStatus)
for k, v := range rs {
prs[k] = v
}
s.podResizeStatus = prs
klog.V(3).InfoS("Updated pod resize state", "resizes", rs)
return nil
}
func (s *stateMemory) deleteContainer(podUID string, containerName string) {

View File

@ -143,11 +143,11 @@ type Manager interface {
// the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
// GetPodResizeStatus returns checkpointed PodStatus.Resize value
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool)
// GetPodResizeStatus returns cached PodStatus.Resize value
GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error
// SetPodResizeStatus caches the last resizing decision for the pod.
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus)
allocationManager
}
@ -285,12 +285,11 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*
return pod, updated
}
// GetPodResizeStatus returns the last checkpointed ResizeStaus value
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
// GetPodResizeStatus returns the last cached ResizeStatus value.
func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
return m.state.GetPodResizeStatus(podUID)
return m.state.GetPodResizeStatus(string(podUID))
}
// SetPodAllocation checkpoints the resources allocated to a pod's containers
@ -307,10 +306,10 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
}
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error {
func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
return m.state.SetPodResizeStatus(string(podUID), resizeStatus)
m.state.SetPodResizeStatus(string(podUID), resizeStatus)
}
func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {