diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 76f8083576f..56cff502cec 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -114,7 +114,6 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/userns" "k8s.io/kubernetes/pkg/kubelet/util" - "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/manager" "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" @@ -1829,7 +1828,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // this conveniently retries any Deferred resize requests // TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan // See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060 - pod, err = kl.handlePodResourcesResize(pod) + pod, err = kl.handlePodResourcesResize(pod, podStatus) if err != nil { return false, err } @@ -2794,23 +2793,28 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) { return true, v1.PodResizeStatusInProgress } -func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) { +// handlePodResourcesResize returns the "allocated pod", which should be used for all resource +// calculations after this function is called. It also updates the cached ResizeStatus according to +// the allocation decision and pod status. +func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) { allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod) if !updated { - // Unless a resize is in-progress, clear the resize status. - resizeStatus, _ := kl.statusManager.GetPodResizeStatus(string(pod.UID)) - if resizeStatus != v1.PodResizeStatusInProgress { - if err := kl.statusManager.SetPodResizeStatus(pod.UID, ""); err != nil { - klog.ErrorS(err, "Failed to clear resize status", "pod", format.Pod(pod)) - } + // Desired resources == allocated resources. Check whether a resize is in progress. + resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus) + if resizeInProgress { + // If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted. + kl.statusManager.SetPodResizeStatus(pod.UID, v1.PodResizeStatusInProgress) + } else { + // (Desired == Allocated == Actual) => clear the resize status. + kl.statusManager.SetPodResizeStatus(pod.UID, "") } - - // Pod is not resizing, nothing more to do here. + // Pod allocation does not need to be updated. return allocatedPod, nil } kl.podResizeMutex.Lock() defer kl.podResizeMutex.Unlock() + // Desired resources != allocated resources. Can we update the allocation to the desired resources? fit, resizeStatus := kl.canResizePod(pod) if fit { // Update pod resource allocation checkpoint @@ -2820,11 +2824,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) { allocatedPod = pod } if resizeStatus != "" { - // Save resize decision to checkpoint - if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, resizeStatus); err != nil { - //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate - klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(allocatedPod)) - } + kl.statusManager.SetPodResizeStatus(pod.UID, resizeStatus) } return allocatedPod, nil } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index faddc801ccb..e671c7af337 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1750,22 +1750,11 @@ func (kl *Kubelet) determinePodResizeStatus(allocatedPod *v1.Pod, podStatus *kub // If pod is terminal, clear the resize status. if podIsTerminal { - if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, ""); err != nil { - klog.ErrorS(err, "SetPodResizeStatus failed for terminal pod", "pod", format.Pod(allocatedPod)) - } + kl.statusManager.SetPodResizeStatus(allocatedPod.UID, "") return "" } - resizeStatus, _ := kl.statusManager.GetPodResizeStatus(string(allocatedPod.UID)) - // If the resize was in-progress and the actual resources match the allocated resources, mark - // the resize as complete by clearing the resize status. - if resizeStatus == v1.PodResizeStatusInProgress && - allocatedResourcesMatchStatus(allocatedPod, podStatus) { - if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, ""); err != nil { - klog.ErrorS(err, "SetPodResizeStatus failed", "pod", format.Pod(allocatedPod)) - } - return "" - } + resizeStatus := kl.statusManager.GetPodResizeStatus(allocatedPod.UID) return resizeStatus } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 70e05796b21..fba760c237d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2665,11 +2665,12 @@ func TestHandlePodResourcesResize(t *testing.T) { defer kubelet.podManager.RemovePod(testPod1) tests := []struct { - name string - pod *v1.Pod - newRequests v1.ResourceList - expectedAllocations v1.ResourceList - expectedResize v1.PodResizeStatus + name string + pod *v1.Pod + newRequests v1.ResourceList + newRequestsAllocated bool // Whether the new requests have already been allocated (but not actuated) + expectedAllocations v1.ResourceList + expectedResize v1.PodResizeStatus }{ { name: "Request CPU and memory decrease - expect InProgress", @@ -2720,25 +2721,63 @@ func TestHandlePodResourcesResize(t *testing.T) { expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M}, expectedResize: v1.PodResizeStatusInfeasible, }, + { + name: "CPU increase in progress - expect InProgress", + pod: testPod2, + newRequests: v1.ResourceList{v1.ResourceCPU: cpu1500m, v1.ResourceMemory: mem1000M}, + newRequestsAllocated: true, + expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1500m, v1.ResourceMemory: mem1000M}, + expectedResize: v1.PodResizeStatusInProgress, + }, + { + name: "No resize", + pod: testPod2, + newRequests: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M}, + expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M}, + expectedResize: "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { kubelet.statusManager = status.NewFakeManager() - require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod)) - pod := tt.pod.DeepCopy() - pod.Spec.Containers[0].Resources.Requests = tt.newRequests - updatedPod, err := kubelet.handlePodResourcesResize(pod) + newPod := tt.pod.DeepCopy() + newPod.Spec.Containers[0].Resources.Requests = tt.newRequests + + if !tt.newRequestsAllocated { + require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod)) + } else { + require.NoError(t, kubelet.statusManager.SetPodAllocation(newPod)) + } + + podStatus := &kubecontainer.PodStatus{ + ID: tt.pod.UID, + Name: tt.pod.Name, + Namespace: tt.pod.Namespace, + ContainerStatuses: make([]*kubecontainer.Status, len(tt.pod.Spec.Containers)), + } + for i, c := range tt.pod.Spec.Containers { + podStatus.ContainerStatuses[i] = &kubecontainer.Status{ + Name: c.Name, + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: c.Resources.Requests.Cpu(), + CPULimit: c.Resources.Limits.Cpu(), + MemoryLimit: c.Resources.Limits.Memory(), + }, + } + } + + updatedPod, err := kubelet.handlePodResourcesResize(newPod, podStatus) require.NoError(t, err) assert.Equal(t, tt.expectedAllocations, updatedPod.Spec.Containers[0].Resources.Requests, "updated pod spec resources") - alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(pod.UID), pod.Spec.Containers[0].Name) + alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(newPod.UID), newPod.Spec.Containers[0].Name) require.True(t, found, "container allocation") assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation") - resizeStatus, found := kubelet.statusManager.GetPodResizeStatus(string(pod.UID)) - require.True(t, found, "pod resize status") + resizeStatus := kubelet.statusManager.GetPodResizeStatus(newPod.UID) assert.Equal(t, tt.expectedResize, resizeStatus) }) } diff --git a/pkg/kubelet/status/fake_status_manager.go b/pkg/kubelet/status/fake_status_manager.go index ebb3970c6e4..6d2031419df 100644 --- a/pkg/kubelet/status/fake_status_manager.go +++ b/pkg/kubelet/status/fake_status_manager.go @@ -68,8 +68,8 @@ func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerNam return m.state.GetContainerResourceAllocation(podUID, containerName) } -func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { - return m.state.GetPodResizeStatus(podUID) +func (m *fakeManager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus { + return m.state.GetPodResizeStatus(string(podUID)) } func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { @@ -86,8 +86,8 @@ func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { return nil } -func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { - return m.state.SetPodResizeStatus(string(podUID), resizeStatus) +func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) { + m.state.SetPodResizeStatus(string(podUID), resizeStatus) } // NewFakeManager creates empty/fake memory manager diff --git a/pkg/kubelet/status/state/checkpoint.go b/pkg/kubelet/status/state/checkpoint.go index 68692f8c1f3..8bc85ac1891 100644 --- a/pkg/kubelet/status/state/checkpoint.go +++ b/pkg/kubelet/status/state/checkpoint.go @@ -28,8 +28,7 @@ import ( var _ checkpointmanager.Checkpoint = &Checkpoint{} type PodResourceAllocationInfo struct { - AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` - ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"` + AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` } // Checkpoint represents a structure to store pod resource allocation checkpoint data diff --git a/pkg/kubelet/status/state/state.go b/pkg/kubelet/status/state/state.go index 4b85f164ab8..10a66be46c3 100644 --- a/pkg/kubelet/status/state/state.go +++ b/pkg/kubelet/status/state/state.go @@ -42,15 +42,13 @@ func (pr PodResourceAllocation) Clone() PodResourceAllocation { type Reader interface { GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) GetPodResourceAllocation() PodResourceAllocation - GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) - GetResizeStatus() PodResizeStatus + GetPodResizeStatus(podUID string) v1.PodResizeStatus } type writer interface { SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetPodResourceAllocation(PodResourceAllocation) error - SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error - SetResizeStatus(PodResizeStatus) error + SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) Delete(podUID string, containerName string) error ClearState() error } diff --git a/pkg/kubelet/status/state/state_checkpoint.go b/pkg/kubelet/status/state/state_checkpoint.go index 2cd17ba926f..2e1ca729100 100644 --- a/pkg/kubelet/status/state/state_checkpoint.go +++ b/pkg/kubelet/status/state/state_checkpoint.go @@ -80,10 +80,6 @@ func (sc *stateCheckpoint) restoreState() error { if err != nil { return fmt.Errorf("failed to set pod resource allocation: %w", err) } - err = sc.cache.SetResizeStatus(praInfo.ResizeStatusEntries) - if err != nil { - return fmt.Errorf("failed to set resize status: %w", err) - } klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") return nil } @@ -92,10 +88,8 @@ func (sc *stateCheckpoint) restoreState() error { func (sc *stateCheckpoint) storeState() error { podAllocation := sc.cache.GetPodResourceAllocation() - podResizeStatus := sc.cache.GetResizeStatus() checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{ - AllocationEntries: podAllocation, - ResizeStatusEntries: podResizeStatus, + AllocationEntries: podAllocation, }) if err != nil { return fmt.Errorf("failed to create checkpoint: %w", err) @@ -123,19 +117,12 @@ func (sc *stateCheckpoint) GetPodResourceAllocation() PodResourceAllocation { } // GetPodResizeStatus returns the last resize decision for a pod -func (sc *stateCheckpoint) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { +func (sc *stateCheckpoint) GetPodResizeStatus(podUID string) v1.PodResizeStatus { sc.mux.RLock() defer sc.mux.RUnlock() return sc.cache.GetPodResizeStatus(podUID) } -// GetResizeStatus returns the set of resize decisions made -func (sc *stateCheckpoint) GetResizeStatus() PodResizeStatus { - sc.mux.RLock() - defer sc.mux.RUnlock() - return sc.cache.GetResizeStatus() -} - // SetContainerResourceAllocation sets resources allocated to a pod's container func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { sc.mux.Lock() @@ -153,19 +140,10 @@ func (sc *stateCheckpoint) SetPodResourceAllocation(a PodResourceAllocation) err } // SetPodResizeStatus sets the last resize decision for a pod -func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error { +func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.SetPodResizeStatus(podUID, resizeStatus) - return sc.storeState() -} - -// SetResizeStatus sets the resize decisions -func (sc *stateCheckpoint) SetResizeStatus(rs PodResizeStatus) error { - sc.mux.Lock() - defer sc.mux.Unlock() - sc.cache.SetResizeStatus(rs) - return sc.storeState() } // Delete deletes allocations for specified pod @@ -199,12 +177,8 @@ func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation return nil } -func (sc *noopStateCheckpoint) GetPodResizeStatus(_ string) (v1.PodResizeStatus, bool) { - return "", false -} - -func (sc *noopStateCheckpoint) GetResizeStatus() PodResizeStatus { - return nil +func (sc *noopStateCheckpoint) GetPodResizeStatus(_ string) v1.PodResizeStatus { + return "" } func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error { @@ -215,13 +189,7 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ PodResourceAllocation) return nil } -func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) error { - return nil -} - -func (sc *noopStateCheckpoint) SetResizeStatus(_ PodResizeStatus) error { - return nil -} +func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {} func (sc *noopStateCheckpoint) Delete(_ string, _ string) error { return nil diff --git a/pkg/kubelet/status/state/state_checkpoint_test.go b/pkg/kubelet/status/state/state_checkpoint_test.go index ca3546553ff..03be8c1c46d 100644 --- a/pkg/kubelet/status/state/state_checkpoint_test.go +++ b/pkg/kubelet/status/state/state_checkpoint_test.go @@ -146,7 +146,6 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { }, }, }, - ResizeStatusEntries: map[string]v1.PodResizeStatus{}, } checkpoint := &Checkpoint{} err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent)) @@ -160,7 +159,6 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) { actualPodResourceAllocationInfo := &PodResourceAllocationInfo{} actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation() - actualPodResourceAllocationInfo.ResizeStatusEntries = sc.cache.GetResizeStatus() require.NoError(t, err, "failed to get pod resource allocation info") require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") } diff --git a/pkg/kubelet/status/state/state_mem.go b/pkg/kubelet/status/state/state_mem.go index 8e90c06f554..c8107dacbe2 100644 --- a/pkg/kubelet/status/state/state_mem.go +++ b/pkg/kubelet/status/state/state_mem.go @@ -54,22 +54,11 @@ func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation { return s.podAllocation.Clone() } -func (s *stateMemory) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { +func (s *stateMemory) GetPodResizeStatus(podUID string) v1.PodResizeStatus { s.RLock() defer s.RUnlock() - resizeStatus, ok := s.podResizeStatus[podUID] - return resizeStatus, ok -} - -func (s *stateMemory) GetResizeStatus() PodResizeStatus { - s.RLock() - defer s.RUnlock() - prs := make(map[string]v1.PodResizeStatus) - for k, v := range s.podResizeStatus { - prs[k] = v - } - return prs + return s.podResizeStatus[podUID] } func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { @@ -94,7 +83,7 @@ func (s *stateMemory) SetPodResourceAllocation(a PodResourceAllocation) error { return nil } -func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error { +func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { s.Lock() defer s.Unlock() @@ -104,19 +93,6 @@ func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResiz delete(s.podResizeStatus, podUID) } klog.V(3).InfoS("Updated pod resize state", "podUID", podUID, "resizeStatus", resizeStatus) - return nil -} - -func (s *stateMemory) SetResizeStatus(rs PodResizeStatus) error { - s.Lock() - defer s.Unlock() - prs := make(map[string]v1.PodResizeStatus) - for k, v := range rs { - prs[k] = v - } - s.podResizeStatus = prs - klog.V(3).InfoS("Updated pod resize state", "resizes", rs) - return nil } func (s *stateMemory) deleteContainer(podUID string, containerName string) { diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 818b638afa7..d3f5ce831c0 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -143,11 +143,11 @@ type Manager interface { // the provided podUIDs. RemoveOrphanedStatuses(podUIDs map[types.UID]bool) - // GetPodResizeStatus returns checkpointed PodStatus.Resize value - GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) + // GetPodResizeStatus returns cached PodStatus.Resize value + GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus - // SetPodResizeStatus checkpoints the last resizing decision for the pod. - SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error + // SetPodResizeStatus caches the last resizing decision for the pod. + SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) allocationManager } @@ -285,12 +285,11 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (* return pod, updated } -// GetPodResizeStatus returns the last checkpointed ResizeStaus value -// If checkpoint manager has not been initialized, it returns nil, false -func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { +// GetPodResizeStatus returns the last cached ResizeStatus value. +func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() - return m.state.GetPodResizeStatus(podUID) + return m.state.GetPodResizeStatus(string(podUID)) } // SetPodAllocation checkpoints the resources allocated to a pod's containers @@ -307,10 +306,10 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error { } // SetPodResizeStatus checkpoints the last resizing decision for the pod. -func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { +func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() - return m.state.SetPodResizeStatus(string(podUID), resizeStatus) + m.state.SetPodResizeStatus(string(podUID), resizeStatus) } func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {