diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 4e684a5b902..0832fb3c43b 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -26,13 +26,10 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - utilfeature "k8s.io/apiserver/pkg/util/feature" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1resource "k8s.io/kubernetes/pkg/api/v1/resource" - "k8s.io/kubernetes/pkg/features" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" volumeutils "k8s.io/kubernetes/pkg/volume/util" @@ -1252,12 +1249,6 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats for _, container := range containers { if container.Name == containerStats.Name { requests := container.Resources.Requests[resourceToReclaim] - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && - (resourceToReclaim == v1.ResourceMemory || resourceToReclaim == v1.ResourceCPU) { - if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { - requests = cs.AllocatedResources[resourceToReclaim] - } - } var usage *resource.Quantity switch resourceToReclaim { case v1.ResourceEphemeralStorage: diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index cff0e866ac7..604614992db 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -31,11 +31,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - utilfeature "k8s.io/apiserver/pkg/util/feature" - featuregatetesting "k8s.io/component-base/featuregate/testing" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" - "k8s.io/kubernetes/pkg/features" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -3366,56 +3363,6 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool { return true } -func TestEvictonMessageWithResourceResize(t *testing.T) { - testpod := newPod("testpod", 1, []v1.Container{ - newContainer("testcontainer", newResourceList("", "200Mi", ""), newResourceList("", "", "")), - }, nil) - testpod.Status = v1.PodStatus{ - ContainerStatuses: []v1.ContainerStatus{ - { - Name: "testcontainer", - AllocatedResources: newResourceList("", "100Mi", ""), - }, - }, - } - testpodMemory := resource.MustParse("150Mi") - testpodStats := newPodMemoryStats(testpod, testpodMemory) - testpodMemoryBytes := uint64(testpodMemory.Value()) - testpodStats.Containers = []statsapi.ContainerStats{ - { - Name: "testcontainer", - Memory: &statsapi.MemoryStats{ - WorkingSetBytes: &testpodMemoryBytes, - }, - }, - } - stats := map[*v1.Pod]statsapi.PodStats{ - testpod: testpodStats, - } - statsFn := func(pod *v1.Pod) (statsapi.PodStats, bool) { - result, found := stats[pod] - return result, found - } - threshold := []evictionapi.Threshold{} - observations := signalObservations{} - - for _, enabled := range []bool{true, false} { - t.Run(fmt.Sprintf("InPlacePodVerticalScaling enabled=%v", enabled), func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, enabled) - msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn, threshold, observations) - if enabled { - if !strings.Contains(msg, "testcontainer was using 150Mi, request is 100Mi") { - t.Errorf("Expected 'exceeds memory' eviction message was not found.") - } - } else { - if strings.Contains(msg, "which exceeds its request") { - t.Errorf("Found 'exceeds memory' eviction message which was not expected.") - } - } - }) - } -} - func TestStatsNotFoundForPod(t *testing.T) { pod1 := newPod("fake-pod1", defaultPriority, []v1.Container{ newContainer("fake-container1", newResourceList("", "", ""), newResourceList("", "", "")), diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7201d7ea4df..65264fa41e4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -33,7 +33,6 @@ import ( "time" cadvisorapi "github.com/google/cadvisor/info/v1" - "github.com/google/go-cmp/cmp" "github.com/opencontainers/selinux/go-selinux" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -1568,7 +1567,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { os.Exit(1) } // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs - kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod) + kl.evictionManager.Start(kl.StatsProvider, kl.getAllocatedPods, kl.PodIsFinished, evictionMonitoringPeriod) // container log manager must start after container runtime is up to retrieve information from container runtime // and inform container to reopen log file after log rotation. @@ -1789,6 +1788,17 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType } } + if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && !kubetypes.IsStaticPod(pod) { + // Handle pod resize here instead of doing it in HandlePodUpdates because + // this conveniently retries any Deferred resize requests + // TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan + // See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060 + pod, err = kl.handlePodResourcesResize(pod) + if err != nil { + return false, err + } + } + // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) @@ -1943,16 +1953,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // Ensure the pod is being probed kl.probeManager.AddPod(pod) - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - // Handle pod resize here instead of doing it in HandlePodUpdates because - // this conveniently retries any Deferred resize requests - // TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan - // See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060 - if kl.podWorkers.CouldHaveRunningContainers(pod.UID) && !kubetypes.IsStaticPod(pod) { - pod = kl.handlePodResourcesResize(pod) - } - } - // TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker. // Currently, using cancellation from that context causes test failures. To remove this WithoutCancel, // any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling @@ -1975,7 +1975,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType return false, nil } - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) { + if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) { // While resize is in progress, periodically call PLEG to update pod cache runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { @@ -2296,9 +2296,7 @@ func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, strin // Use allocated resources values from checkpoint store (source of truth) to determine fit otherPods := make([]*v1.Pod, 0, len(pods)) for _, p := range pods { - op := p.DeepCopy() - kl.updateContainerResourceAllocation(op) - + op, _ := kl.statusManager.UpdatePodFromAllocation(p) otherPods = append(otherPods, op) } attrs.OtherPods = otherPods @@ -2579,16 +2577,15 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // To handle kubelet restarts, test pod admissibility using AllocatedResources values // (for cpu & memory) from checkpoint store. If found, that is the source of truth. - podCopy := pod.DeepCopy() - kl.updateContainerResourceAllocation(podCopy) + allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod) // Check if we can admit the pod; if not, reject it. - if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok { + if ok, reason, message := kl.canAdmitPod(activePods, allocatedPod); !ok { kl.rejectPod(pod, reason, message) continue } // For new pod, checkpoint the resource values at which the Pod has been admitted - if err := kl.statusManager.SetPodAllocation(podCopy); err != nil { + if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil { //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod)) } @@ -2609,17 +2606,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } -// updateContainerResourceAllocation updates AllocatedResources values -// (for cpu & memory) from checkpoint store -func (kl *Kubelet) updateContainerResourceAllocation(pod *v1.Pod) { - for i, c := range pod.Spec.Containers { - allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name) - if found { - pod.Spec.Containers[i].Resources = allocatedResources - } - } -} - // HandlePodUpdates is the callback in the SyncHandler interface for pods // being updated from a config source. func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { @@ -2749,36 +2735,47 @@ func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { } } -func isPodResizeInProgress(pod *v1.Pod, podStatus *v1.PodStatus) bool { +func isPodResizeInProgress(pod *v1.Pod, podStatus *kubecontainer.PodStatus) bool { for _, c := range pod.Spec.Containers { - if cs, ok := podutil.GetContainerStatus(podStatus.ContainerStatuses, c.Name); ok { - if cs.Resources == nil { + if cs := podStatus.FindContainerStatusByName(c.Name); cs != nil { + if cs.State != kubecontainer.ContainerStateRunning || cs.Resources == nil { continue } - if !cmp.Equal(c.Resources.Limits, cs.Resources.Limits) || !cmp.Equal(cs.AllocatedResources, cs.Resources.Requests) { - return true + if c.Resources.Requests != nil { + if cs.Resources.CPURequest != nil && !cs.Resources.CPURequest.Equal(*c.Resources.Requests.Cpu()) { + return true + } + } + if c.Resources.Limits != nil { + if cs.Resources.CPULimit != nil && !cs.Resources.CPULimit.Equal(*c.Resources.Limits.Cpu()) { + return true + } + if cs.Resources.MemoryLimit != nil && !cs.Resources.MemoryLimit.Equal(*c.Resources.Limits.Memory()) { + return true + } } } } return false } -func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) { +// canResizePod determines if the requested resize is currently feasible. +// Returns true if the resize can proceed. +func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) { var otherActivePods []*v1.Pod node, err := kl.getNodeAnyWay() if err != nil { klog.ErrorS(err, "getNodeAnyway function failed") - return false, nil, "" + return false, "" } - podCopy := pod.DeepCopy() cpuAvailable := node.Status.Allocatable.Cpu().MilliValue() memAvailable := node.Status.Allocatable.Memory().Value() - cpuRequests := resource.GetResourceRequest(podCopy, v1.ResourceCPU) - memRequests := resource.GetResourceRequest(podCopy, v1.ResourceMemory) + cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU) + memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory) if cpuRequests > cpuAvailable || memRequests > memAvailable { - klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", podCopy.Name) - return false, podCopy, v1.PodResizeStatusInfeasible + klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", klog.KObj(pod)) + return false, v1.PodResizeStatusInfeasible } // Treat the existing pod needing resize as a new pod with desired resources seeking admit. @@ -2790,76 +2787,42 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) } } - if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, podCopy); !ok { + if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, pod); !ok { // Log reason and return. Let the next sync iteration retry the resize - klog.V(3).InfoS("Resize cannot be accommodated", "pod", podCopy.Name, "reason", failReason, "message", failMessage) - return false, podCopy, v1.PodResizeStatusDeferred + klog.V(3).InfoS("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage) + return false, v1.PodResizeStatusDeferred } - for _, container := range podCopy.Spec.Containers { - idx, found := podutil.GetIndexOfContainerStatus(podCopy.Status.ContainerStatuses, container.Name) - if found { - for rName, rQuantity := range container.Resources.Requests { - podCopy.Status.ContainerStatuses[idx].AllocatedResources[rName] = rQuantity - } - } - } - return true, podCopy, v1.PodResizeStatusInProgress + return true, v1.PodResizeStatusInProgress } -func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) *v1.Pod { - if pod.Status.Phase != v1.PodRunning { - return pod - } - podResized := false - for _, container := range pod.Spec.Containers { - if len(container.Resources.Requests) == 0 { - continue - } - containerStatus, found := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name) - if !found { - klog.V(5).InfoS("ContainerStatus not found", "pod", pod.Name, "container", container.Name) - break - } - if len(containerStatus.AllocatedResources) != len(container.Resources.Requests) { - klog.V(5).InfoS("ContainerStatus.AllocatedResources length mismatch", "pod", pod.Name, "container", container.Name) - break - } - if !cmp.Equal(container.Resources.Requests, containerStatus.AllocatedResources) { - podResized = true - break - } - } - if !podResized { - return pod +func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) { + allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod) + if !updated { + // Pod is not resizing, nothing more to do here. + return pod, nil } kl.podResizeMutex.Lock() defer kl.podResizeMutex.Unlock() - fit, updatedPod, resizeStatus := kl.canResizePod(pod) - if updatedPod == nil { - return pod - } + fit, resizeStatus := kl.canResizePod(pod) if fit { // Update pod resource allocation checkpoint - if err := kl.statusManager.SetPodAllocation(updatedPod); err != nil { - //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate - klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(updatedPod)) - return pod + if err := kl.statusManager.SetPodAllocation(pod); err != nil { + return nil, err } + } else { + // If resize isn't immediately feasible, proceed with the allocated pod. + pod = allocatedPod } if resizeStatus != "" { // Save resize decision to checkpoint - if err := kl.statusManager.SetPodResizeStatus(updatedPod.UID, resizeStatus); err != nil { + if err := kl.statusManager.SetPodResizeStatus(pod.UID, resizeStatus); err != nil { //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate - klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(updatedPod)) - return pod + klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(pod)) } - updatedPod.Status.Resize = resizeStatus } - kl.podManager.UpdatePod(updatedPod) - kl.statusManager.SetPodStatus(updatedPod, updatedPod.Status) - return updatedPod + return pod, nil } // LatestLoopEntryTime returns the last time in the sync loop monitor. diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index c2cf2acc0be..6072a48027c 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -207,6 +207,21 @@ func (kl *Kubelet) GetActivePods() []*v1.Pod { return activePods } +// getAllocatedPods returns the active pods (see GetActivePods), but updates the pods to their +// allocated state. +func (kl *Kubelet) getAllocatedPods() []*v1.Pod { + activePods := kl.GetActivePods() + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + return activePods + } + + allocatedPods := make([]*v1.Pod, len(activePods)) + for i, pod := range activePods { + allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod) + } + return allocatedPods +} + // makeBlockVolumes maps the raw block devices specified in the path of the container // Experimental func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumepathhandler.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) { @@ -2081,13 +2096,14 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon } } container := kubecontainer.GetContainerSpec(pod, cName) - // AllocatedResources values come from checkpoint. It is the source-of-truth. + + // Always set the status to the latest allocated resources, even if it differs from the + // allocation used by the current sync loop. alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) if found { status.AllocatedResources = alloc.Requests } else if !(container.Resources.Requests == nil && container.Resources.Limits == nil) { - // Log error and fallback to AllocatedResources in oldStatus if it exists - klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName) + // This case is expected for ephemeral containers. if oldStatusFound { status.AllocatedResources = oldStatus.AllocatedResources } @@ -2108,46 +2124,46 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon } // Convert Limits - if container.Resources.Limits != nil { + if alloc.Limits != nil { limits = make(v1.ResourceList) if cStatus.Resources != nil && cStatus.Resources.CPULimit != nil { limits[v1.ResourceCPU] = cStatus.Resources.CPULimit.DeepCopy() } else { - determineResource(v1.ResourceCPU, container.Resources.Limits, oldStatus.Resources.Limits, limits) + determineResource(v1.ResourceCPU, alloc.Limits, oldStatus.Resources.Limits, limits) } if cStatus.Resources != nil && cStatus.Resources.MemoryLimit != nil { limits[v1.ResourceMemory] = cStatus.Resources.MemoryLimit.DeepCopy() } else { - determineResource(v1.ResourceMemory, container.Resources.Limits, oldStatus.Resources.Limits, limits) + determineResource(v1.ResourceMemory, alloc.Limits, oldStatus.Resources.Limits, limits) } - if ephemeralStorage, found := container.Resources.Limits[v1.ResourceEphemeralStorage]; found { + if ephemeralStorage, found := alloc.Limits[v1.ResourceEphemeralStorage]; found { limits[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() } - if storage, found := container.Resources.Limits[v1.ResourceStorage]; found { + if storage, found := alloc.Limits[v1.ResourceStorage]; found { limits[v1.ResourceStorage] = storage.DeepCopy() } - convertCustomResources(container.Resources.Limits, limits) + convertCustomResources(alloc.Limits, limits) } // Convert Requests - if status.AllocatedResources != nil { + if alloc.Requests != nil { requests = make(v1.ResourceList) if cStatus.Resources != nil && cStatus.Resources.CPURequest != nil { requests[v1.ResourceCPU] = cStatus.Resources.CPURequest.DeepCopy() } else { - determineResource(v1.ResourceCPU, status.AllocatedResources, oldStatus.Resources.Requests, requests) + determineResource(v1.ResourceCPU, alloc.Requests, oldStatus.Resources.Requests, requests) } - if memory, found := status.AllocatedResources[v1.ResourceMemory]; found { + if memory, found := alloc.Requests[v1.ResourceMemory]; found { requests[v1.ResourceMemory] = memory.DeepCopy() } - if ephemeralStorage, found := status.AllocatedResources[v1.ResourceEphemeralStorage]; found { + if ephemeralStorage, found := alloc.Requests[v1.ResourceEphemeralStorage]; found { requests[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() } - if storage, found := status.AllocatedResources[v1.ResourceStorage]; found { + if storage, found := alloc.Requests[v1.ResourceStorage]; found { requests[v1.ResourceStorage] = storage.DeepCopy() } - convertCustomResources(status.AllocatedResources, requests) + convertCustomResources(alloc.Requests, requests) } resources := &v1.ResourceRequirements{ diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 05ddb14feba..68743f71138 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2391,6 +2391,13 @@ func TestPodResourceAllocationReset(t *testing.T) { kubelet := testKubelet.kubelet kubelet.statusManager = status.NewFakeManager() + // fakePodWorkers trigger syncPodFn synchronously on update, but entering + // kubelet.SyncPod while holding the podResizeMutex can lead to deadlock. + kubelet.podWorkers.(*fakePodWorkers).syncPodFn = + func(_ context.Context, _ kubetypes.SyncPodType, _, _ *v1.Pod, _ *kubecontainer.PodStatus) (bool, error) { + return false, nil + } + nodes := []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, @@ -2575,7 +2582,6 @@ func TestHandlePodResourcesResize(t *testing.T) { testKubelet := newTestKubelet(t, false) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet - kubelet.statusManager = status.NewFakeManager() cpu500m := resource.MustParse("500m") cpu1000m := resource.MustParse("1") @@ -2717,14 +2723,24 @@ func TestHandlePodResourcesResize(t *testing.T) { } for _, tt := range tests { - tt.pod.Spec.Containers[0].Resources.Requests = tt.newRequests - tt.pod.Status.ContainerStatuses[0].AllocatedResources = v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M} - kubelet.handlePodResourcesResize(tt.pod) - updatedPod, found := kubelet.podManager.GetPodByName(tt.pod.Namespace, tt.pod.Name) - assert.True(t, found, "expected to find pod %s", tt.pod.Name) - assert.Equal(t, tt.expectedAllocations, updatedPod.Status.ContainerStatuses[0].AllocatedResources, tt.name) - assert.Equal(t, tt.expectedResize, updatedPod.Status.Resize, tt.name) - testKubelet.fakeKubeClient.ClearActions() + t.Run(tt.name, func(t *testing.T) { + kubelet.statusManager = status.NewFakeManager() + require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod)) + + pod := tt.pod.DeepCopy() + pod.Spec.Containers[0].Resources.Requests = tt.newRequests + updatedPod, err := kubelet.handlePodResourcesResize(pod) + require.NoError(t, err) + assert.Equal(t, tt.expectedAllocations, updatedPod.Spec.Containers[0].Resources.Requests, "updated pod spec resources") + + alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(pod.UID), pod.Spec.Containers[0].Name) + require.True(t, found, "container allocation") + assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation") + + resizeStatus, found := kubelet.statusManager.GetPodResizeStatus(string(pod.UID)) + require.True(t, found, "pod resize status") + assert.Equal(t, tt.expectedResize, resizeStatus) + }) } } @@ -3276,3 +3292,132 @@ func TestSyncPodSpans(t *testing.T) { assert.Equalf(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), "runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID()) } } + +func TestIsPodResizeInProgress(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "12345", + Name: "test", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "c1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI), + }, + }, + }, { + Name: "c2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), + }, + }, + }}, + }, + } + steadyStateC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(400, resource.DecimalSI), + }, + } + resizeMemC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(800, resource.DecimalSI), + }, + } + resizeCPUReqC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(200, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(400, resource.DecimalSI), + }, + } + resizeCPULimitC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(600, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(400, resource.DecimalSI), + }, + } + steadyStateC2Status := &kubecontainer.Status{ + Name: "c2", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(500, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(700, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(800, resource.DecimalSI), + }, + } + mkPodStatus := func(containerStatuses ...*kubecontainer.Status) *kubecontainer.PodStatus { + return &kubecontainer.PodStatus{ + ID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + ContainerStatuses: containerStatuses, + } + } + tests := []struct { + name string + status *kubecontainer.PodStatus + expectResize bool + }{{ + name: "steady state", + status: mkPodStatus(steadyStateC1Status, steadyStateC2Status), + expectResize: false, + }, { + name: "terminated container", + status: mkPodStatus(&kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateExited, + Resources: resizeMemC1Status.Resources, + }, steadyStateC2Status), + expectResize: false, + }, { + name: "missing container", + status: mkPodStatus(steadyStateC2Status), + expectResize: false, + }, { + name: "resizing memory limit", + status: mkPodStatus(resizeMemC1Status, steadyStateC2Status), + expectResize: true, + }, { + name: "resizing cpu request", + status: mkPodStatus(resizeCPUReqC1Status, steadyStateC2Status), + expectResize: true, + }, { + name: "resizing cpu limit", + status: mkPodStatus(resizeCPULimitC1Status, steadyStateC2Status), + expectResize: true, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expectResize, isPodResizeInProgress(pod, test.status)) + }) + } +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go index 2cbf83bb621..37f0cf3562c 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go @@ -805,291 +805,67 @@ func TestGenerateLinuxContainerResources(t *testing.T) { }, }, }, - Status: v1.PodStatus{}, } for _, tc := range []struct { name string - scalingFg bool limits v1.ResourceList requests v1.ResourceList - cStatus []v1.ContainerStatus expected *runtimeapi.LinuxContainerResources cgroupVersion CgroupVersion }{ { - "requests & limits, cpu & memory, guaranteed qos - no container status", - true, + "requests & limits, cpu & memory, guaranteed qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, cgroupV1, }, { - "requests & limits, cpu & memory, burstable qos - no container status", - true, + "requests & limits, cpu & memory, burstable qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970}, cgroupV1, }, { - "best-effort qos - no container status", - true, + "best-effort qos", nil, nil, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000}, cgroupV1, }, { - "requests & limits, cpu & memory, guaranteed qos - empty resources container status", - true, + "requests & limits, cpu & memory, guaranteed qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, burstable qos - empty resources container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999}, - cgroupV1, - }, - { - "best-effort qos - empty resources container status", - true, - nil, - nil, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - no container status", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "best-effort qos - no container status", - false, - nil, - nil, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - no container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, cgroupV2, }, { - "requests & limits, cpu & memory, burstable qos - no container status", - true, + "requests & limits, cpu & memory, burstable qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}}, cgroupV2, }, { - "best-effort qos - no container status", - true, + "best-effort qos", nil, nil, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - empty resources container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, burstable qos - empty resources container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "best-effort qos - empty resources container status", - true, - nil, - nil, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - no container status", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "best-effort qos - no container status", - false, - nil, - nil, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}}, cgroupV2, }, } { - t.Run(tc.name, func(t *testing.T) { + t.Run(fmt.Sprintf("cgroup%s:%s", tc.cgroupVersion, tc.name), func(t *testing.T) { defer setSwapControllerAvailableDuringTest(false)() - if tc.scalingFg { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) - } setCgroupVersionDuringTest(tc.cgroupVersion) pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests} - if len(tc.cStatus) > 0 { - pod.Status.ContainerStatuses = tc.cStatus - } + resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false) tc.expected.HugepageLimits = resources.HugepageLimits - if !cmp.Equal(resources, tc.expected) { - t.Errorf("Test %s: expected resources %+v, but got %+v", tc.name, tc.expected, resources) - } + assert.Equal(t, tc.expected, resources) }) } } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 4ef341f7ab5..2f0ddfaa55f 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -26,7 +26,6 @@ import ( "time" cadvisorapi "github.com/google/cadvisor/info/v1" - "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/trace" grpcstatus "google.golang.org/grpc/status" crierror "k8s.io/cri-api/pkg/errors" @@ -47,7 +46,6 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/kubernetes/pkg/api/legacyscheme" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/credentialprovider/plugin" "k8s.io/kubernetes/pkg/features" @@ -551,60 +549,49 @@ func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool { return true } -func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool { +// computePodResizeAction determines the actions required (if any) to resize the given container. +// Returns whether to keep (true) or restart (false) the container. +func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) (keepContainer bool) { container := pod.Spec.Containers[containerIdx] - if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 { - return true - } // Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired) // with v1.Status.Resources / runtime.Status.Resources (last known actual). // Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources. // Skip if runtime containerID doesn't match pod.Status containerID (container is restarting) - apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name) - if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil || - kubeContainerStatus.State != kubecontainer.ContainerStateRunning || - kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID || - !cmp.Equal(container.Resources.Requests, apiContainerStatus.AllocatedResources) { + if kubeContainerStatus.State != kubecontainer.ContainerStateRunning { return true } - desiredMemoryLimit := container.Resources.Limits.Memory().Value() - desiredCPULimit := container.Resources.Limits.Cpu().MilliValue() - desiredCPURequest := container.Resources.Requests.Cpu().MilliValue() - currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value() - currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue() - currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue() - // Runtime container status resources (from CRI), if set, supercedes v1(api) container status resrouces. - if kubeContainerStatus.Resources != nil { - if kubeContainerStatus.Resources.MemoryLimit != nil { - currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value() - } - if kubeContainerStatus.Resources.CPULimit != nil { - currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue() - } - if kubeContainerStatus.Resources.CPURequest != nil { - currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue() - } - } - - // Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during - // handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest. - if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest { + if kubeContainerStatus.Resources == nil { + // Not enough information to actuate a resize. + klog.V(4).InfoS("Missing runtime resource information for container", "pod", klog.KObj(pod), "container", container.Name) return true } desiredResources := containerResources{ - memoryLimit: desiredMemoryLimit, - memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(), - cpuLimit: desiredCPULimit, - cpuRequest: desiredCPURequest, + memoryLimit: container.Resources.Limits.Memory().Value(), + memoryRequest: container.Resources.Requests.Memory().Value(), + cpuLimit: container.Resources.Limits.Cpu().MilliValue(), + cpuRequest: container.Resources.Requests.Cpu().MilliValue(), } - currentResources := containerResources{ - memoryLimit: currentMemoryLimit, - memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(), - cpuLimit: currentCPULimit, - cpuRequest: currentCPURequest, + + // Default current values to the desired values so that a resize isn't triggered for missing values. + currentResources := desiredResources + if kubeContainerStatus.Resources.MemoryLimit != nil { + currentResources.memoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value() + } + if kubeContainerStatus.Resources.CPULimit != nil { + currentResources.cpuLimit = kubeContainerStatus.Resources.CPULimit.MilliValue() + } + if kubeContainerStatus.Resources.CPURequest != nil { + currentResources.cpuRequest = kubeContainerStatus.Resources.CPURequest.MilliValue() + } + // Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during + // handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest. + + if currentResources == desiredResources { + // No resize required. + return true } resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy) @@ -637,9 +624,9 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe changes.ContainersToUpdate[rName][0] = cUpdateInfo } } - resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) - resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit) - resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit) + resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit) + resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest) if restartCPULim || restartCPUReq || restartMemLim { // resize policy requires this container to restart changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{ @@ -652,12 +639,12 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe return false } else { if resizeMemLim { - markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) + markContainerForUpdate(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit) } if resizeCPULim { - markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit) + markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit) } else if resizeCPUReq { - markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest) } } return true @@ -942,10 +929,6 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod * if isInPlacePodVerticalScalingAllowed(pod) { changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo) - latestPodStatus, err := m.GetPodStatus(ctx, podStatus.ID, pod.Name, pod.Namespace) - if err == nil { - podStatus = latestPodStatus - } } // Number of running containers to keep. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 0c241952f76..d2982e4fa02 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -52,6 +52,7 @@ import ( imagetypes "k8s.io/kubernetes/pkg/kubelet/images" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/utils/ptr" ) var ( @@ -2174,7 +2175,7 @@ func makeBasePodAndStatusWithInitAndEphemeralContainers() (*v1.Pod, *kubecontain func TestComputePodActionsForPodResize(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) - fakeRuntime, _, m, err := createTestRuntimeManager() + _, _, m, err := createTestRuntimeManager() m.machineInfo.MemoryCapacity = 17179860387 // 16GB assert.NoError(t, err) @@ -2188,18 +2189,19 @@ func TestComputePodActionsForPodResize(t *testing.T) { memPolicyRestartRequired := v1.ContainerResizePolicy{ResourceName: v1.ResourceMemory, RestartPolicy: v1.RestartContainer} for desc, test := range map[string]struct { - podResizePolicyFn func(*v1.Pod) - mutatePodFn func(*v1.Pod) + setupFn func(*v1.Pod, *kubecontainer.PodStatus) getExpectedPodActionsFn func(*v1.Pod, *kubecontainer.PodStatus) *podActions }{ "Update container CPU and memory resources": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + MemoryLimit: ptr.To(mem200M.DeepCopy()), } } }, @@ -2244,13 +2246,15 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU resources": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2281,13 +2285,15 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container memory resources": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[2] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2318,12 +2324,15 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Nothing when spec.Resources and status.Resources are equal": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m}, } - pod.Status.ContainerStatuses[1].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + } } }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { @@ -2337,16 +2346,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU and memory resources with Restart policy for CPU": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[0].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[0].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[0] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[0].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2368,16 +2377,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU and memory resources with Restart policy for memory": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[2] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2399,16 +2408,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container memory resources with Restart policy for CPU": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2439,16 +2448,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU resources with Restart policy for memory": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[2] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2479,41 +2488,28 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, } { - pod, kps := makeBasePodAndStatus() - for idx := range pod.Spec.Containers { - // default resize policy when pod resize feature is enabled - pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired} - } - if test.podResizePolicyFn != nil { - test.podResizePolicyFn(pod) - } - for idx := range pod.Spec.Containers { - // compute hash - if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil { - kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx]) + t.Run(desc, func(t *testing.T) { + pod, status := makeBasePodAndStatus() + for idx := range pod.Spec.Containers { + // default resize policy when pod resize feature is enabled + pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired} } - } - makeAndSetFakePod(t, m, fakeRuntime, pod) - ctx := context.Background() - status, _ := m.GetPodStatus(ctx, kps.ID, pod.Name, pod.Namespace) - for idx := range pod.Spec.Containers { - if rcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); rcs != nil { - if csIdx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[idx].Name); found { - pod.Status.ContainerStatuses[csIdx].ContainerID = rcs.ID.String() + if test.setupFn != nil { + test.setupFn(pod, status) + } + + for idx := range pod.Spec.Containers { + // compute hash + if kcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil { + kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx]) } } - } - for idx := range pod.Spec.Containers { - if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil { - kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx]) - } - } - if test.mutatePodFn != nil { - test.mutatePodFn(pod) - } - expectedActions := test.getExpectedPodActionsFn(pod, status) - actions := m.computePodActions(ctx, pod, status) - verifyActions(t, expectedActions, &actions, desc) + + ctx := context.Background() + expectedActions := test.getExpectedPodActionsFn(pod, status) + actions := m.computePodActions(ctx, pod, status) + verifyActions(t, expectedActions, &actions, desc) + }) } } diff --git a/pkg/kubelet/qos/policy.go b/pkg/kubelet/qos/policy.go index 7117be21255..93d0934c280 100644 --- a/pkg/kubelet/qos/policy.go +++ b/pkg/kubelet/qos/policy.go @@ -18,10 +18,7 @@ package qos import ( v1 "k8s.io/api/core/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -63,11 +60,6 @@ func GetContainerOOMScoreAdjust(pod *v1.Pod, container *v1.Container, memoryCapa // targets for OOM kills. // Note that this is a heuristic, it won't work if a container has many small processes. memoryRequest := container.Resources.Requests.Memory().Value() - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { - memoryRequest = cs.AllocatedResources.Memory().Value() - } - } oomScoreAdjust := 1000 - (1000*memoryRequest)/memoryCapacity // A guaranteed pod using 100% of memory can have an OOM score of 10. Ensure // that burstable pods have a higher OOM score adjustment. diff --git a/pkg/kubelet/status/fake_status_manager.go b/pkg/kubelet/status/fake_status_manager.go index c421883e176..50e91b5e2b5 100644 --- a/pkg/kubelet/status/fake_status_manager.go +++ b/pkg/kubelet/status/fake_status_manager.go @@ -69,8 +69,12 @@ func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerNam } func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { - klog.InfoS("GetPodResizeStatus()") - return "", false + return m.state.GetPodResizeStatus(podUID) +} + +func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { + allocs := m.state.GetPodResourceAllocation() + return updatePodFromAllocation(pod, allocs) } func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { @@ -86,8 +90,7 @@ func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { } func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { - klog.InfoS("SetPodResizeStatus()") - return nil + return m.state.SetPodResizeStatus(string(podUID), resizeStatus) } // NewFakeManager creates empty/fake memory manager diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 1e23838b9cf..818b638afa7 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -143,17 +143,27 @@ type Manager interface { // the provided podUIDs. RemoveOrphanedStatuses(podUIDs map[types.UID]bool) - // GetContainerResourceAllocation returns checkpointed AllocatedResources value for the container - GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) - // GetPodResizeStatus returns checkpointed PodStatus.Resize value GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) - // SetPodAllocation checkpoints the resources allocated to a pod's containers. - SetPodAllocation(pod *v1.Pod) error - // SetPodResizeStatus checkpoints the last resizing decision for the pod. SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error + + allocationManager +} + +// TODO(tallclair): Refactor allocation state handling out of the status manager. +type allocationManager interface { + // GetContainerResourceAllocation returns the checkpointed AllocatedResources value for the container + GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) + + // UpdatePodFromAllocation overwrites the pod spec with the allocation. + // This function does a deep copy only if updates are needed. + // Returns the updated (or original) pod, and whether there was an allocation stored. + UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) + + // SetPodAllocation checkpoints the resources allocated to a pod's containers. + SetPodAllocation(pod *v1.Pod) error } const syncPeriod = 10 * time.Second @@ -242,6 +252,39 @@ func (m *manager) GetContainerResourceAllocation(podUID string, containerName st return m.state.GetContainerResourceAllocation(podUID, containerName) } +// UpdatePodFromAllocation overwrites the pod spec with the allocation. +// This function does a deep copy only if updates are needed. +func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { + m.podStatusesLock.RLock() + defer m.podStatusesLock.RUnlock() + // TODO(tallclair): This clones the whole cache, but we only need 1 pod. + allocs := m.state.GetPodResourceAllocation() + return updatePodFromAllocation(pod, allocs) +} + +func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) { + allocated, found := allocs[string(pod.UID)] + if !found { + return pod, false + } + + updated := false + for i, c := range pod.Spec.Containers { + if cAlloc, ok := allocated[c.Name]; ok { + if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) { + // Allocation differs from pod spec, update + if !updated { + // If this is the first update, copy the pod + pod = pod.DeepCopy() + updated = true + } + pod.Spec.Containers[i].Resources = cAlloc + } + } + } + return pod, updated +} + // GetPodResizeStatus returns the last checkpointed ResizeStaus value // If checkpoint manager has not been initialized, it returns nil, false func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 4e6b9d47010..19cef9fb9ce 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -32,6 +32,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -42,6 +43,7 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" + "k8s.io/kubernetes/pkg/kubelet/status/state" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util" @@ -2034,6 +2036,105 @@ func TestMergePodStatus(t *testing.T) { } +func TestUpdatePodFromAllocation(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "12345", + Name: "test", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "c1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI), + }, + }, + }, { + Name: "c2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), + }, + }, + }}, + }, + } + + resizedPod := pod.DeepCopy() + resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI) + + tests := []struct { + name string + pod *v1.Pod + allocs state.PodResourceAllocation + expectPod *v1.Pod + expectUpdate bool + }{{ + name: "steady state", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c1": *pod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: false, + }, { + name: "no allocations", + pod: pod, + allocs: state.PodResourceAllocation{}, + expectUpdate: false, + }, { + name: "missing container allocation", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: false, + }, { + name: "resized container", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: true, + expectPod: resizedPod, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pod := test.pod.DeepCopy() + allocatedPod, updated := updatePodFromAllocation(pod, test.allocs) + + if test.expectUpdate { + assert.True(t, updated, "updated") + assert.Equal(t, test.expectPod, allocatedPod) + assert.NotEqual(t, pod, allocatedPod) + } else { + assert.False(t, updated, "updated") + assert.Same(t, pod, allocatedPod) + } + }) + } +} + func statusEqual(left, right v1.PodStatus) bool { left.Conditions = nil right.Conditions = nil