diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 4e684a5b902..0832fb3c43b 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -26,13 +26,10 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - utilfeature "k8s.io/apiserver/pkg/util/feature" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1resource "k8s.io/kubernetes/pkg/api/v1/resource" - "k8s.io/kubernetes/pkg/features" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" volumeutils "k8s.io/kubernetes/pkg/volume/util" @@ -1252,12 +1249,6 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats for _, container := range containers { if container.Name == containerStats.Name { requests := container.Resources.Requests[resourceToReclaim] - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && - (resourceToReclaim == v1.ResourceMemory || resourceToReclaim == v1.ResourceCPU) { - if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { - requests = cs.AllocatedResources[resourceToReclaim] - } - } var usage *resource.Quantity switch resourceToReclaim { case v1.ResourceEphemeralStorage: diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index cff0e866ac7..604614992db 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -31,11 +31,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - utilfeature "k8s.io/apiserver/pkg/util/feature" - featuregatetesting "k8s.io/component-base/featuregate/testing" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" - "k8s.io/kubernetes/pkg/features" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -3366,56 +3363,6 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool { return true } -func TestEvictonMessageWithResourceResize(t *testing.T) { - testpod := newPod("testpod", 1, []v1.Container{ - newContainer("testcontainer", newResourceList("", "200Mi", ""), newResourceList("", "", "")), - }, nil) - testpod.Status = v1.PodStatus{ - ContainerStatuses: []v1.ContainerStatus{ - { - Name: "testcontainer", - AllocatedResources: newResourceList("", "100Mi", ""), - }, - }, - } - testpodMemory := resource.MustParse("150Mi") - testpodStats := newPodMemoryStats(testpod, testpodMemory) - testpodMemoryBytes := uint64(testpodMemory.Value()) - testpodStats.Containers = []statsapi.ContainerStats{ - { - Name: "testcontainer", - Memory: &statsapi.MemoryStats{ - WorkingSetBytes: &testpodMemoryBytes, - }, - }, - } - stats := map[*v1.Pod]statsapi.PodStats{ - testpod: testpodStats, - } - statsFn := func(pod *v1.Pod) (statsapi.PodStats, bool) { - result, found := stats[pod] - return result, found - } - threshold := []evictionapi.Threshold{} - observations := signalObservations{} - - for _, enabled := range []bool{true, false} { - t.Run(fmt.Sprintf("InPlacePodVerticalScaling enabled=%v", enabled), func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, enabled) - msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn, threshold, observations) - if enabled { - if !strings.Contains(msg, "testcontainer was using 150Mi, request is 100Mi") { - t.Errorf("Expected 'exceeds memory' eviction message was not found.") - } - } else { - if strings.Contains(msg, "which exceeds its request") { - t.Errorf("Found 'exceeds memory' eviction message which was not expected.") - } - } - }) - } -} - func TestStatsNotFoundForPod(t *testing.T) { pod1 := newPod("fake-pod1", defaultPriority, []v1.Container{ newContainer("fake-container1", newResourceList("", "", ""), newResourceList("", "", "")), diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c37d8e88ec7..5c07e4367af 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -27,13 +27,13 @@ import ( "os" "path/filepath" sysruntime "runtime" + "slices" "sort" "sync" "sync/atomic" "time" cadvisorapi "github.com/google/cadvisor/info/v1" - "github.com/google/go-cmp/cmp" "github.com/opencontainers/selinux/go-selinux" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -1581,7 +1581,8 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { os.Exit(1) } // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs - kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod) + // Eviction decisions are based on the allocated (rather than desired) pod resources. + kl.evictionManager.Start(kl.StatsProvider, kl.getAllocatedPods, kl.PodIsFinished, evictionMonitoringPeriod) // container log manager must start after container runtime is up to retrieve information from container runtime // and inform container to reopen log file after log rotation. @@ -1806,6 +1807,20 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType } } + // handlePodResourcesResize updates the pod to use the allocated resources. This should come + // before the main business logic of SyncPod, so that a consistent view of the pod is used + // across the sync loop. + if kuberuntime.IsInPlacePodVerticalScalingAllowed(pod) { + // Handle pod resize here instead of doing it in HandlePodUpdates because + // this conveniently retries any Deferred resize requests + // TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan + // See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060 + pod, err = kl.handlePodResourcesResize(pod) + if err != nil { + return false, err + } + } + // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) @@ -1960,16 +1975,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // Ensure the pod is being probed kl.probeManager.AddPod(pod) - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - // Handle pod resize here instead of doing it in HandlePodUpdates because - // this conveniently retries any Deferred resize requests - // TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan - // See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060 - if kl.podWorkers.CouldHaveRunningContainers(pod.UID) && !kubetypes.IsStaticPod(pod) { - pod = kl.handlePodResourcesResize(pod) - } - } - // TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker. // Currently, using cancellation from that context causes test failures. To remove this WithoutCancel, // any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling @@ -1992,8 +1997,10 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType return false, nil } - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) { - // While resize is in progress, periodically call PLEG to update pod cache + if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) { + // While resize is in progress, periodically request the latest status from the runtime via + // the PLEG. This is necessary since ordinarily pod status is only fetched when a container + // undergoes a state transition. runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod)) @@ -2304,23 +2311,14 @@ func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { // The function returns a boolean value indicating whether the pod // can be admitted, a brief single-word reason and a message explaining why // the pod cannot be admitted. -func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) { +// allocatedPods should represent the pods that have already been admitted, along with their +// admitted (allocated) resources. +func (kl *Kubelet) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) { // the kubelet will invoke each pod admit handler in sequence // if any handler rejects, the pod is rejected. // TODO: move out of disk check into a pod admitter // TODO: out of resource eviction should have a pod admitter call-out - attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods} - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - // Use allocated resources values from checkpoint store (source of truth) to determine fit - otherPods := make([]*v1.Pod, 0, len(pods)) - for _, p := range pods { - op := p.DeepCopy() - kl.updateContainerResourceAllocation(op) - - otherPods = append(otherPods, op) - } - attrs.OtherPods = otherPods - } + attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: allocatedPods} for _, podAdmitHandler := range kl.admitHandlers { if result := podAdmitHandler.Admit(attrs); !result.Admit { @@ -2560,7 +2558,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { defer kl.podResizeMutex.Unlock() } for _, pod := range pods { - existingPods := kl.podManager.GetPods() // Always add the pod to the pod manager. Kubelet relies on the pod // manager as the source of truth for the desired state. If a pod does // not exist in the pod manager, it means that it has been deleted in @@ -2590,29 +2587,30 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // we simply avoid doing any work. // We also do not try to admit the pod that is already in terminated state. if !kl.podWorkers.IsPodTerminationRequested(pod.UID) && !podutil.IsPodPhaseTerminal(pod.Status.Phase) { - // We failed pods that we rejected, so activePods include all admitted + // We failed pods that we rejected, so allocatedPods include all admitted // pods that are alive. - activePods := kl.filterOutInactivePods(existingPods) + allocatedPods := kl.getAllocatedPods() + // Filter out the pod being evaluated. + allocatedPods = slices.DeleteFunc(allocatedPods, func(p *v1.Pod) bool { return p.UID == pod.UID }) if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // To handle kubelet restarts, test pod admissibility using AllocatedResources values // (for cpu & memory) from checkpoint store. If found, that is the source of truth. - podCopy := pod.DeepCopy() - kl.updateContainerResourceAllocation(podCopy) + allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod) // Check if we can admit the pod; if not, reject it. - if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok { + if ok, reason, message := kl.canAdmitPod(allocatedPods, allocatedPod); !ok { kl.rejectPod(pod, reason, message) continue } // For new pod, checkpoint the resource values at which the Pod has been admitted - if err := kl.statusManager.SetPodAllocation(podCopy); err != nil { + if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil { //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod)) } } else { // Check if we can admit the pod; if not, reject it. - if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { + if ok, reason, message := kl.canAdmitPod(allocatedPods, pod); !ok { kl.rejectPod(pod, reason, message) continue } @@ -2627,22 +2625,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } -// updateContainerResourceAllocation updates AllocatedResources values -// (for cpu & memory) from checkpoint store -func (kl *Kubelet) updateContainerResourceAllocation(pod *v1.Pod) { - for _, c := range pod.Spec.Containers { - allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name) - if c.Resources.Requests != nil && found { - if _, ok := allocatedResources[v1.ResourceCPU]; ok { - c.Resources.Requests[v1.ResourceCPU] = allocatedResources[v1.ResourceCPU] - } - if _, ok := allocatedResources[v1.ResourceMemory]; ok { - c.Resources.Requests[v1.ResourceMemory] = allocatedResources[v1.ResourceMemory] - } - } - } -} - // HandlePodUpdates is the callback in the SyncHandler interface for pods // being updated from a config source. func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { @@ -2772,117 +2754,87 @@ func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { } } -func isPodResizeInProgress(pod *v1.Pod, podStatus *v1.PodStatus) bool { +func isPodResizeInProgress(pod *v1.Pod, podStatus *kubecontainer.PodStatus) bool { for _, c := range pod.Spec.Containers { - if cs, ok := podutil.GetContainerStatus(podStatus.ContainerStatuses, c.Name); ok { - if cs.Resources == nil { + if cs := podStatus.FindContainerStatusByName(c.Name); cs != nil { + if cs.State != kubecontainer.ContainerStateRunning || cs.Resources == nil { continue } - if !cmp.Equal(c.Resources.Limits, cs.Resources.Limits) || !cmp.Equal(cs.AllocatedResources, cs.Resources.Requests) { - return true + if c.Resources.Requests != nil { + if cs.Resources.CPURequest != nil && !cs.Resources.CPURequest.Equal(*c.Resources.Requests.Cpu()) { + return true + } + } + if c.Resources.Limits != nil { + if cs.Resources.CPULimit != nil && !cs.Resources.CPULimit.Equal(*c.Resources.Limits.Cpu()) { + return true + } + if cs.Resources.MemoryLimit != nil && !cs.Resources.MemoryLimit.Equal(*c.Resources.Limits.Memory()) { + return true + } } } } return false } -func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) { - var otherActivePods []*v1.Pod - +// canResizePod determines if the requested resize is currently feasible. +// pod should hold the desired (pre-allocated) spec. +// Returns true if the resize can proceed. +func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) { node, err := kl.getNodeAnyWay() if err != nil { klog.ErrorS(err, "getNodeAnyway function failed") - return false, nil, "" + return false, "" } - podCopy := pod.DeepCopy() cpuAvailable := node.Status.Allocatable.Cpu().MilliValue() memAvailable := node.Status.Allocatable.Memory().Value() - cpuRequests := resource.GetResourceRequest(podCopy, v1.ResourceCPU) - memRequests := resource.GetResourceRequest(podCopy, v1.ResourceMemory) + cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU) + memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory) if cpuRequests > cpuAvailable || memRequests > memAvailable { - klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", podCopy.Name) - return false, podCopy, v1.PodResizeStatusInfeasible + klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", klog.KObj(pod)) + return false, v1.PodResizeStatusInfeasible } // Treat the existing pod needing resize as a new pod with desired resources seeking admit. // If desired resources don't fit, pod continues to run with currently allocated resources. - activePods := kl.GetActivePods() - for _, p := range activePods { - if p.UID != pod.UID { - otherActivePods = append(otherActivePods, p) - } - } + allocatedPods := kl.getAllocatedPods() + allocatedPods = slices.DeleteFunc(allocatedPods, func(p *v1.Pod) bool { return p.UID == pod.UID }) - if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, podCopy); !ok { + if ok, failReason, failMessage := kl.canAdmitPod(allocatedPods, pod); !ok { // Log reason and return. Let the next sync iteration retry the resize - klog.V(3).InfoS("Resize cannot be accommodated", "pod", podCopy.Name, "reason", failReason, "message", failMessage) - return false, podCopy, v1.PodResizeStatusDeferred + klog.V(3).InfoS("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage) + return false, v1.PodResizeStatusDeferred } - for _, container := range podCopy.Spec.Containers { - idx, found := podutil.GetIndexOfContainerStatus(podCopy.Status.ContainerStatuses, container.Name) - if found { - for rName, rQuantity := range container.Resources.Requests { - podCopy.Status.ContainerStatuses[idx].AllocatedResources[rName] = rQuantity - } - } - } - return true, podCopy, v1.PodResizeStatusInProgress + return true, v1.PodResizeStatusInProgress } -func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) *v1.Pod { - if pod.Status.Phase != v1.PodRunning { - return pod - } - podResized := false - for _, container := range pod.Spec.Containers { - if len(container.Resources.Requests) == 0 { - continue - } - containerStatus, found := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name) - if !found { - klog.V(5).InfoS("ContainerStatus not found", "pod", pod.Name, "container", container.Name) - break - } - if len(containerStatus.AllocatedResources) != len(container.Resources.Requests) { - klog.V(5).InfoS("ContainerStatus.AllocatedResources length mismatch", "pod", pod.Name, "container", container.Name) - break - } - if !cmp.Equal(container.Resources.Requests, containerStatus.AllocatedResources) { - podResized = true - break - } - } - if !podResized { - return pod +func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) { + allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod) + if !updated { + // Pod is not resizing, nothing more to do here. + return allocatedPod, nil } kl.podResizeMutex.Lock() defer kl.podResizeMutex.Unlock() - fit, updatedPod, resizeStatus := kl.canResizePod(pod) - if updatedPod == nil { - return pod - } + fit, resizeStatus := kl.canResizePod(pod) if fit { // Update pod resource allocation checkpoint - if err := kl.statusManager.SetPodAllocation(updatedPod); err != nil { - //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate - klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(updatedPod)) - return pod + if err := kl.statusManager.SetPodAllocation(pod); err != nil { + return nil, err } + allocatedPod = pod } if resizeStatus != "" { // Save resize decision to checkpoint - if err := kl.statusManager.SetPodResizeStatus(updatedPod.UID, resizeStatus); err != nil { + if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, resizeStatus); err != nil { //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate - klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(updatedPod)) - return pod + klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(allocatedPod)) } - updatedPod.Status.Resize = resizeStatus } - kl.podManager.UpdatePod(updatedPod) - kl.statusManager.SetPodStatus(updatedPod, updatedPod.Status) - return updatedPod + return allocatedPod, nil } // LatestLoopEntryTime returns the last time in the sync loop monitor. diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index ff872985ce5..6072a48027c 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -207,6 +207,21 @@ func (kl *Kubelet) GetActivePods() []*v1.Pod { return activePods } +// getAllocatedPods returns the active pods (see GetActivePods), but updates the pods to their +// allocated state. +func (kl *Kubelet) getAllocatedPods() []*v1.Pod { + activePods := kl.GetActivePods() + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + return activePods + } + + allocatedPods := make([]*v1.Pod, len(activePods)) + for i, pod := range activePods { + allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod) + } + return allocatedPods +} + // makeBlockVolumes maps the raw block devices specified in the path of the container // Experimental func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumepathhandler.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) { @@ -2081,12 +2096,14 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon } } container := kubecontainer.GetContainerSpec(pod, cName) - // AllocatedResources values come from checkpoint. It is the source-of-truth. - found := false - status.AllocatedResources, found = kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) - if !(container.Resources.Requests == nil && container.Resources.Limits == nil) && !found { - // Log error and fallback to AllocatedResources in oldStatus if it exists - klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName) + + // Always set the status to the latest allocated resources, even if it differs from the + // allocation used by the current sync loop. + alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) + if found { + status.AllocatedResources = alloc.Requests + } else if !(container.Resources.Requests == nil && container.Resources.Limits == nil) { + // This case is expected for ephemeral containers. if oldStatusFound { status.AllocatedResources = oldStatus.AllocatedResources } @@ -2107,46 +2124,46 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon } // Convert Limits - if container.Resources.Limits != nil { + if alloc.Limits != nil { limits = make(v1.ResourceList) if cStatus.Resources != nil && cStatus.Resources.CPULimit != nil { limits[v1.ResourceCPU] = cStatus.Resources.CPULimit.DeepCopy() } else { - determineResource(v1.ResourceCPU, container.Resources.Limits, oldStatus.Resources.Limits, limits) + determineResource(v1.ResourceCPU, alloc.Limits, oldStatus.Resources.Limits, limits) } if cStatus.Resources != nil && cStatus.Resources.MemoryLimit != nil { limits[v1.ResourceMemory] = cStatus.Resources.MemoryLimit.DeepCopy() } else { - determineResource(v1.ResourceMemory, container.Resources.Limits, oldStatus.Resources.Limits, limits) + determineResource(v1.ResourceMemory, alloc.Limits, oldStatus.Resources.Limits, limits) } - if ephemeralStorage, found := container.Resources.Limits[v1.ResourceEphemeralStorage]; found { + if ephemeralStorage, found := alloc.Limits[v1.ResourceEphemeralStorage]; found { limits[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() } - if storage, found := container.Resources.Limits[v1.ResourceStorage]; found { + if storage, found := alloc.Limits[v1.ResourceStorage]; found { limits[v1.ResourceStorage] = storage.DeepCopy() } - convertCustomResources(container.Resources.Limits, limits) + convertCustomResources(alloc.Limits, limits) } // Convert Requests - if status.AllocatedResources != nil { + if alloc.Requests != nil { requests = make(v1.ResourceList) if cStatus.Resources != nil && cStatus.Resources.CPURequest != nil { requests[v1.ResourceCPU] = cStatus.Resources.CPURequest.DeepCopy() } else { - determineResource(v1.ResourceCPU, status.AllocatedResources, oldStatus.Resources.Requests, requests) + determineResource(v1.ResourceCPU, alloc.Requests, oldStatus.Resources.Requests, requests) } - if memory, found := status.AllocatedResources[v1.ResourceMemory]; found { + if memory, found := alloc.Requests[v1.ResourceMemory]; found { requests[v1.ResourceMemory] = memory.DeepCopy() } - if ephemeralStorage, found := status.AllocatedResources[v1.ResourceEphemeralStorage]; found { + if ephemeralStorage, found := alloc.Requests[v1.ResourceEphemeralStorage]; found { requests[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() } - if storage, found := status.AllocatedResources[v1.ResourceStorage]; found { + if storage, found := alloc.Requests[v1.ResourceStorage]; found { requests[v1.ResourceStorage] = storage.DeepCopy() } - convertCustomResources(status.AllocatedResources, requests) + convertCustomResources(alloc.Requests, requests) } resources := &v1.ResourceRequirements{ diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index c8d493b973b..70e05796b21 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2391,6 +2391,13 @@ func TestPodResourceAllocationReset(t *testing.T) { kubelet := testKubelet.kubelet kubelet.statusManager = status.NewFakeManager() + // fakePodWorkers trigger syncPodFn synchronously on update, but entering + // kubelet.SyncPod while holding the podResizeMutex can lead to deadlock. + kubelet.podWorkers.(*fakePodWorkers).syncPodFn = + func(_ context.Context, _ kubetypes.SyncPodType, _, _ *v1.Pod, _ *kubecontainer.PodStatus) (bool, error) { + return false, nil + } + nodes := []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, @@ -2447,8 +2454,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Having both memory and cpu, resource allocation not exists", pod: podWithUIDNameNsSpec("1", "pod1", "foo", *cpu500mMem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "1": map[string]v1.ResourceList{ - cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests, + "1": map[string]v1.ResourceRequirements{ + cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2457,8 +2464,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "2": map[string]v1.ResourceList{ - cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests, + "2": map[string]v1.ResourceRequirements{ + cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2467,8 +2474,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu500mMem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu800mMem800MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "3": map[string]v1.ResourceList{ - cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources.Requests, + "3": map[string]v1.ResourceRequirements{ + cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources, }, }, }, @@ -2476,8 +2483,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has cpu, resource allocation not exists", pod: podWithUIDNameNsSpec("4", "pod5", "foo", *cpu500mPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "4": map[string]v1.ResourceList{ - cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests, + "4": map[string]v1.ResourceRequirements{ + cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, }, }, }, @@ -2486,8 +2493,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec), existingPodAllocation: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "5": map[string]v1.ResourceList{ - cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests, + "5": map[string]v1.ResourceRequirements{ + cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources, }, }, }, @@ -2496,8 +2503,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu500mPodSpec), existingPodAllocation: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu800mPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "6": map[string]v1.ResourceList{ - cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources.Requests, + "6": map[string]v1.ResourceRequirements{ + cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources, }, }, }, @@ -2505,8 +2512,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "Only has memory, resource allocation not exists", pod: podWithUIDNameNsSpec("7", "pod7", "foo", *mem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "7": map[string]v1.ResourceList{ - mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests, + "7": map[string]v1.ResourceRequirements{ + mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2515,8 +2522,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "8": map[string]v1.ResourceList{ - mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests, + "8": map[string]v1.ResourceRequirements{ + mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources, }, }, }, @@ -2525,8 +2532,8 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("9", "pod9", "foo", *mem500MPodSpec), existingPodAllocation: podWithUIDNameNsSpec("9", "pod9", "foo", *mem800MPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "9": map[string]v1.ResourceList{ - mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources.Requests, + "9": map[string]v1.ResourceRequirements{ + mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources, }, }, }, @@ -2534,8 +2541,8 @@ func TestPodResourceAllocationReset(t *testing.T) { name: "No CPU and memory, resource allocation not exists", pod: podWithUIDNameNsSpec("10", "pod10", "foo", *emptyPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "10": map[string]v1.ResourceList{ - emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests, + "10": map[string]v1.ResourceRequirements{ + emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, }, }, }, @@ -2544,27 +2551,29 @@ func TestPodResourceAllocationReset(t *testing.T) { pod: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec), existingPodAllocation: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec), expectedPodResourceAllocation: state.PodResourceAllocation{ - "11": map[string]v1.ResourceList{ - emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests, + "11": map[string]v1.ResourceRequirements{ + emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources, }, }, }, } for _, tc := range tests { - if tc.existingPodAllocation != nil { - // when kubelet restarts, AllocatedResources has already existed before adding pod - err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation) - if err != nil { - t.Fatalf("failed to set pod allocation: %v", err) + t.Run(tc.name, func(t *testing.T) { + if tc.existingPodAllocation != nil { + // when kubelet restarts, AllocatedResources has already existed before adding pod + err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation) + if err != nil { + t.Fatalf("failed to set pod allocation: %v", err) + } } - } - kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) + kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) - allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) - if !found { - t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) - } - assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name) + allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) + if !found { + t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) + } + assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name) + }) } } @@ -2573,7 +2582,6 @@ func TestHandlePodResourcesResize(t *testing.T) { testKubelet := newTestKubelet(t, false) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet - kubelet.statusManager = status.NewFakeManager() cpu500m := resource.MustParse("500m") cpu1000m := resource.MustParse("1") @@ -2715,14 +2723,24 @@ func TestHandlePodResourcesResize(t *testing.T) { } for _, tt := range tests { - tt.pod.Spec.Containers[0].Resources.Requests = tt.newRequests - tt.pod.Status.ContainerStatuses[0].AllocatedResources = v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M} - kubelet.handlePodResourcesResize(tt.pod) - updatedPod, found := kubelet.podManager.GetPodByName(tt.pod.Namespace, tt.pod.Name) - assert.True(t, found, "expected to find pod %s", tt.pod.Name) - assert.Equal(t, tt.expectedAllocations, updatedPod.Status.ContainerStatuses[0].AllocatedResources, tt.name) - assert.Equal(t, tt.expectedResize, updatedPod.Status.Resize, tt.name) - testKubelet.fakeKubeClient.ClearActions() + t.Run(tt.name, func(t *testing.T) { + kubelet.statusManager = status.NewFakeManager() + require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod)) + + pod := tt.pod.DeepCopy() + pod.Spec.Containers[0].Resources.Requests = tt.newRequests + updatedPod, err := kubelet.handlePodResourcesResize(pod) + require.NoError(t, err) + assert.Equal(t, tt.expectedAllocations, updatedPod.Spec.Containers[0].Resources.Requests, "updated pod spec resources") + + alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(pod.UID), pod.Spec.Containers[0].Name) + require.True(t, found, "container allocation") + assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation") + + resizeStatus, found := kubelet.statusManager.GetPodResizeStatus(string(pod.UID)) + require.True(t, found, "pod resize status") + assert.Equal(t, tt.expectedResize, resizeStatus) + }) } } @@ -3274,3 +3292,132 @@ func TestSyncPodSpans(t *testing.T) { assert.Equalf(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), "runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID()) } } + +func TestIsPodResizeInProgress(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "12345", + Name: "test", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "c1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI), + }, + }, + }, { + Name: "c2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), + }, + }, + }}, + }, + } + steadyStateC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(400, resource.DecimalSI), + }, + } + resizeMemC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(800, resource.DecimalSI), + }, + } + resizeCPUReqC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(200, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(400, resource.DecimalSI), + }, + } + resizeCPULimitC1Status := &kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(600, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(400, resource.DecimalSI), + }, + } + steadyStateC2Status := &kubecontainer.Status{ + Name: "c2", + State: kubecontainer.ContainerStateRunning, + Resources: &kubecontainer.ContainerResources{ + CPURequest: resource.NewMilliQuantity(500, resource.DecimalSI), + CPULimit: resource.NewMilliQuantity(700, resource.DecimalSI), + MemoryLimit: resource.NewQuantity(800, resource.DecimalSI), + }, + } + mkPodStatus := func(containerStatuses ...*kubecontainer.Status) *kubecontainer.PodStatus { + return &kubecontainer.PodStatus{ + ID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + ContainerStatuses: containerStatuses, + } + } + tests := []struct { + name string + status *kubecontainer.PodStatus + expectResize bool + }{{ + name: "steady state", + status: mkPodStatus(steadyStateC1Status, steadyStateC2Status), + expectResize: false, + }, { + name: "terminated container", + status: mkPodStatus(&kubecontainer.Status{ + Name: "c1", + State: kubecontainer.ContainerStateExited, + Resources: resizeMemC1Status.Resources, + }, steadyStateC2Status), + expectResize: false, + }, { + name: "missing container", + status: mkPodStatus(steadyStateC2Status), + expectResize: false, + }, { + name: "resizing memory limit", + status: mkPodStatus(resizeMemC1Status, steadyStateC2Status), + expectResize: true, + }, { + name: "resizing cpu request", + status: mkPodStatus(resizeCPUReqC1Status, steadyStateC2Status), + expectResize: true, + }, { + name: "resizing cpu limit", + status: mkPodStatus(resizeCPULimitC1Status, steadyStateC2Status), + expectResize: true, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expectResize, isPodResizeInProgress(pod, test.status)) + }) + } +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go index 2cbf83bb621..37f0cf3562c 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_linux_test.go @@ -805,291 +805,67 @@ func TestGenerateLinuxContainerResources(t *testing.T) { }, }, }, - Status: v1.PodStatus{}, } for _, tc := range []struct { name string - scalingFg bool limits v1.ResourceList requests v1.ResourceList - cStatus []v1.ContainerStatus expected *runtimeapi.LinuxContainerResources cgroupVersion CgroupVersion }{ { - "requests & limits, cpu & memory, guaranteed qos - no container status", - true, + "requests & limits, cpu & memory, guaranteed qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, cgroupV1, }, { - "requests & limits, cpu & memory, burstable qos - no container status", - true, + "requests & limits, cpu & memory, burstable qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970}, cgroupV1, }, { - "best-effort qos - no container status", - true, + "best-effort qos", nil, nil, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000}, cgroupV1, }, { - "requests & limits, cpu & memory, guaranteed qos - empty resources container status", - true, + "requests & limits, cpu & memory, guaranteed qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, burstable qos - empty resources container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999}, - cgroupV1, - }, - { - "best-effort qos - empty resources container status", - true, - nil, - nil, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - no container status", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, - cgroupV1, - }, - { - "best-effort qos - no container status", - false, - nil, - nil, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000}, - cgroupV1, - }, - { - "requests & limits, cpu & memory, guaranteed qos - no container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, cgroupV2, }, { - "requests & limits, cpu & memory, burstable qos - no container status", - true, + "requests & limits, cpu & memory, burstable qos", v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}}, cgroupV2, }, { - "best-effort qos - no container status", - true, + "best-effort qos", nil, nil, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - empty resources container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, burstable qos - empty resources container status", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "best-effort qos - empty resources container status", - true, - nil, - nil, - []v1.ContainerStatus{{Name: "c1"}}, - &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - true, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - no container status", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{}, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, burstable qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources", - false, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - []v1.ContainerStatus{ - { - Name: "c1", - AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")}, - }, - }, - &runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, - cgroupV2, - }, - { - "best-effort qos - no container status", - false, - nil, - nil, - []v1.ContainerStatus{}, &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}}, cgroupV2, }, } { - t.Run(tc.name, func(t *testing.T) { + t.Run(fmt.Sprintf("cgroup%s:%s", tc.cgroupVersion, tc.name), func(t *testing.T) { defer setSwapControllerAvailableDuringTest(false)() - if tc.scalingFg { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) - } setCgroupVersionDuringTest(tc.cgroupVersion) pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests} - if len(tc.cStatus) > 0 { - pod.Status.ContainerStatuses = tc.cStatus - } + resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false) tc.expected.HugepageLimits = resources.HugepageLimits - if !cmp.Equal(resources, tc.expected) { - t.Errorf("Test %s: expected resources %+v, but got %+v", tc.name, tc.expected, resources) - } + assert.Equal(t, tc.expected, resources) }) } } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 4ef341f7ab5..4b87d452049 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -26,7 +26,6 @@ import ( "time" cadvisorapi "github.com/google/cadvisor/info/v1" - "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/trace" grpcstatus "google.golang.org/grpc/status" crierror "k8s.io/cri-api/pkg/errors" @@ -47,7 +46,6 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/kubernetes/pkg/api/legacyscheme" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/credentialprovider/plugin" "k8s.io/kubernetes/pkg/features" @@ -541,7 +539,7 @@ func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) boo return cStatus.State == kubecontainer.ContainerStateExited && cStatus.ExitCode == 0 } -func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool { +func IsInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool { if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { return false } @@ -551,60 +549,49 @@ func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool { return true } -func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool { +// computePodResizeAction determines the actions required (if any) to resize the given container. +// Returns whether to keep (true) or restart (false) the container. +func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) (keepContainer bool) { container := pod.Spec.Containers[containerIdx] - if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 { - return true - } // Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired) // with v1.Status.Resources / runtime.Status.Resources (last known actual). // Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources. // Skip if runtime containerID doesn't match pod.Status containerID (container is restarting) - apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name) - if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil || - kubeContainerStatus.State != kubecontainer.ContainerStateRunning || - kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID || - !cmp.Equal(container.Resources.Requests, apiContainerStatus.AllocatedResources) { + if kubeContainerStatus.State != kubecontainer.ContainerStateRunning { return true } - desiredMemoryLimit := container.Resources.Limits.Memory().Value() - desiredCPULimit := container.Resources.Limits.Cpu().MilliValue() - desiredCPURequest := container.Resources.Requests.Cpu().MilliValue() - currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value() - currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue() - currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue() - // Runtime container status resources (from CRI), if set, supercedes v1(api) container status resrouces. - if kubeContainerStatus.Resources != nil { - if kubeContainerStatus.Resources.MemoryLimit != nil { - currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value() - } - if kubeContainerStatus.Resources.CPULimit != nil { - currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue() - } - if kubeContainerStatus.Resources.CPURequest != nil { - currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue() - } - } - - // Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during - // handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest. - if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest { + if kubeContainerStatus.Resources == nil { + // Not enough information to actuate a resize. + klog.V(4).InfoS("Missing runtime resource information for container", "pod", klog.KObj(pod), "container", container.Name) return true } desiredResources := containerResources{ - memoryLimit: desiredMemoryLimit, - memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(), - cpuLimit: desiredCPULimit, - cpuRequest: desiredCPURequest, + memoryLimit: container.Resources.Limits.Memory().Value(), + memoryRequest: container.Resources.Requests.Memory().Value(), + cpuLimit: container.Resources.Limits.Cpu().MilliValue(), + cpuRequest: container.Resources.Requests.Cpu().MilliValue(), } - currentResources := containerResources{ - memoryLimit: currentMemoryLimit, - memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(), - cpuLimit: currentCPULimit, - cpuRequest: currentCPURequest, + + // Default current values to the desired values so that a resize isn't triggered for missing values. + currentResources := desiredResources + if kubeContainerStatus.Resources.MemoryLimit != nil { + currentResources.memoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value() + } + if kubeContainerStatus.Resources.CPULimit != nil { + currentResources.cpuLimit = kubeContainerStatus.Resources.CPULimit.MilliValue() + } + if kubeContainerStatus.Resources.CPURequest != nil { + currentResources.cpuRequest = kubeContainerStatus.Resources.CPURequest.MilliValue() + } + // Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during + // handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest. + + if currentResources == desiredResources { + // No resize required. + return true } resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy) @@ -637,9 +624,9 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe changes.ContainersToUpdate[rName][0] = cUpdateInfo } } - resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) - resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit) - resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit) + resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit) + resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest) if restartCPULim || restartCPUReq || restartMemLim { // resize policy requires this container to restart changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{ @@ -652,12 +639,12 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe return false } else { if resizeMemLim { - markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) + markContainerForUpdate(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit) } if resizeCPULim { - markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit) + markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit) } else if resizeCPUReq { - markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest) } } return true @@ -940,12 +927,8 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod * } } - if isInPlacePodVerticalScalingAllowed(pod) { + if IsInPlacePodVerticalScalingAllowed(pod) { changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo) - latestPodStatus, err := m.GetPodStatus(ctx, podStatus.ID, pod.Name, pod.Namespace) - if err == nil { - podStatus = latestPodStatus - } } // Number of running containers to keep. @@ -1002,7 +985,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod * // If the container failed the startup probe, we should kill it. message = fmt.Sprintf("Container %s failed startup probe", container.Name) reason = reasonStartupProbe - } else if isInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) { + } else if IsInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) { // computePodResizeAction updates 'changes' if resize policy requires restarting this container continue } else { @@ -1319,7 +1302,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po } // Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources - if isInPlacePodVerticalScalingAllowed(pod) { + if IsInPlacePodVerticalScalingAllowed(pod) { if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources { m.doPodResizeAction(pod, podStatus, podContainerChanges, result) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 0c241952f76..d2982e4fa02 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -52,6 +52,7 @@ import ( imagetypes "k8s.io/kubernetes/pkg/kubelet/images" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/utils/ptr" ) var ( @@ -2174,7 +2175,7 @@ func makeBasePodAndStatusWithInitAndEphemeralContainers() (*v1.Pod, *kubecontain func TestComputePodActionsForPodResize(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) - fakeRuntime, _, m, err := createTestRuntimeManager() + _, _, m, err := createTestRuntimeManager() m.machineInfo.MemoryCapacity = 17179860387 // 16GB assert.NoError(t, err) @@ -2188,18 +2189,19 @@ func TestComputePodActionsForPodResize(t *testing.T) { memPolicyRestartRequired := v1.ContainerResizePolicy{ResourceName: v1.ResourceMemory, RestartPolicy: v1.RestartContainer} for desc, test := range map[string]struct { - podResizePolicyFn func(*v1.Pod) - mutatePodFn func(*v1.Pod) + setupFn func(*v1.Pod, *kubecontainer.PodStatus) getExpectedPodActionsFn func(*v1.Pod, *kubecontainer.PodStatus) *podActions }{ "Update container CPU and memory resources": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + MemoryLimit: ptr.To(mem200M.DeepCopy()), } } }, @@ -2244,13 +2246,15 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU resources": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2281,13 +2285,15 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container memory resources": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[2] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2318,12 +2324,15 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Nothing when spec.Resources and status.Resources are equal": { - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m}, } - pod.Status.ContainerStatuses[1].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu200m}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu200m.DeepCopy()), + } } }, getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { @@ -2337,16 +2346,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU and memory resources with Restart policy for CPU": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[0].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[0].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[0] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[0].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2368,16 +2377,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU and memory resources with Restart policy for memory": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[2] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2399,16 +2408,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container memory resources with Restart policy for CPU": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[1] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem200M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2439,16 +2448,16 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, "Update container CPU resources with Restart policy for memory": { - podResizePolicyFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} - }, - mutatePodFn: func(pod *v1.Pod) { - pod.Spec.Containers[2].Resources = v1.ResourceRequirements{ + setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) { + c := &pod.Spec.Containers[2] + c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} + c.Resources = v1.ResourceRequirements{ Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, } - if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { - pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, + if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil { + cStatus.Resources = &kubecontainer.ContainerResources{ + CPULimit: ptr.To(cpu100m.DeepCopy()), + MemoryLimit: ptr.To(mem100M.DeepCopy()), } } }, @@ -2479,41 +2488,28 @@ func TestComputePodActionsForPodResize(t *testing.T) { }, }, } { - pod, kps := makeBasePodAndStatus() - for idx := range pod.Spec.Containers { - // default resize policy when pod resize feature is enabled - pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired} - } - if test.podResizePolicyFn != nil { - test.podResizePolicyFn(pod) - } - for idx := range pod.Spec.Containers { - // compute hash - if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil { - kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx]) + t.Run(desc, func(t *testing.T) { + pod, status := makeBasePodAndStatus() + for idx := range pod.Spec.Containers { + // default resize policy when pod resize feature is enabled + pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired} } - } - makeAndSetFakePod(t, m, fakeRuntime, pod) - ctx := context.Background() - status, _ := m.GetPodStatus(ctx, kps.ID, pod.Name, pod.Namespace) - for idx := range pod.Spec.Containers { - if rcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); rcs != nil { - if csIdx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[idx].Name); found { - pod.Status.ContainerStatuses[csIdx].ContainerID = rcs.ID.String() + if test.setupFn != nil { + test.setupFn(pod, status) + } + + for idx := range pod.Spec.Containers { + // compute hash + if kcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil { + kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx]) } } - } - for idx := range pod.Spec.Containers { - if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil { - kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx]) - } - } - if test.mutatePodFn != nil { - test.mutatePodFn(pod) - } - expectedActions := test.getExpectedPodActionsFn(pod, status) - actions := m.computePodActions(ctx, pod, status) - verifyActions(t, expectedActions, &actions, desc) + + ctx := context.Background() + expectedActions := test.getExpectedPodActionsFn(pod, status) + actions := m.computePodActions(ctx, pod, status) + verifyActions(t, expectedActions, &actions, desc) + }) } } diff --git a/pkg/kubelet/qos/policy.go b/pkg/kubelet/qos/policy.go index 7117be21255..492bb9d2b8a 100644 --- a/pkg/kubelet/qos/policy.go +++ b/pkg/kubelet/qos/policy.go @@ -18,10 +18,7 @@ package qos import ( v1 "k8s.io/api/core/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -40,6 +37,8 @@ const ( // multiplied by 10 (barring exceptional cases) + a configurable quantity which is between -1000 // and 1000. Containers with higher OOM scores are killed if the system runs out of memory. // See https://lwn.net/Articles/391222/ for more information. +// OOMScoreAdjust should be calculated based on the allocated resources, so the pod argument should +// contain the allocated resources in the spec. func GetContainerOOMScoreAdjust(pod *v1.Pod, container *v1.Container, memoryCapacity int64) int { if types.IsNodeCriticalPod(pod) { // Only node critical pod should be the last to get killed. @@ -63,11 +62,6 @@ func GetContainerOOMScoreAdjust(pod *v1.Pod, container *v1.Container, memoryCapa // targets for OOM kills. // Note that this is a heuristic, it won't work if a container has many small processes. memoryRequest := container.Resources.Requests.Memory().Value() - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { - if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok { - memoryRequest = cs.AllocatedResources.Memory().Value() - } - } oomScoreAdjust := 1000 - (1000*memoryRequest)/memoryCapacity // A guaranteed pod using 100% of memory can have an OOM score of 10. Ensure // that burstable pods have a higher OOM score adjustment. diff --git a/pkg/kubelet/status/fake_status_manager.go b/pkg/kubelet/status/fake_status_manager.go index 42cd611984e..ebb3970c6e4 100644 --- a/pkg/kubelet/status/fake_status_manager.go +++ b/pkg/kubelet/status/fake_status_manager.go @@ -63,31 +63,31 @@ func (m *fakeManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { return } -func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { klog.InfoS("GetContainerResourceAllocation()") return m.state.GetContainerResourceAllocation(podUID, containerName) } func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { - klog.InfoS("GetPodResizeStatus()") - return "", false + return m.state.GetPodResizeStatus(podUID) +} + +func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { + allocs := m.state.GetPodResourceAllocation() + return updatePodFromAllocation(pod, allocs) } func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { klog.InfoS("SetPodAllocation()") for _, container := range pod.Spec.Containers { - var alloc v1.ResourceList - if container.Resources.Requests != nil { - alloc = container.Resources.Requests.DeepCopy() - } + alloc := *container.Resources.DeepCopy() m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc) } return nil } func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { - klog.InfoS("SetPodResizeStatus()") - return nil + return m.state.SetPodResizeStatus(string(podUID), resizeStatus) } // NewFakeManager creates empty/fake memory manager diff --git a/pkg/kubelet/status/state/checkpoint.go b/pkg/kubelet/status/state/checkpoint.go index 6cad6361e28..525bf675860 100644 --- a/pkg/kubelet/status/state/checkpoint.go +++ b/pkg/kubelet/status/state/checkpoint.go @@ -19,7 +19,7 @@ package state import ( "encoding/json" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) @@ -28,16 +28,16 @@ var _ checkpointmanager.Checkpoint = &PodResourceAllocationCheckpoint{} // PodResourceAllocationCheckpoint is used to store resources allocated to a pod in checkpoint type PodResourceAllocationCheckpoint struct { - AllocationEntries map[string]map[string]v1.ResourceList `json:"allocationEntries,omitempty"` - ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"` - Checksum checksum.Checksum `json:"checksum"` + AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` + ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"` + Checksum checksum.Checksum `json:"checksum"` } // NewPodResourceAllocationCheckpoint returns an instance of Checkpoint func NewPodResourceAllocationCheckpoint() *PodResourceAllocationCheckpoint { //lint:ignore unexported-type-in-api user-facing error message return &PodResourceAllocationCheckpoint{ - AllocationEntries: make(map[string]map[string]v1.ResourceList), + AllocationEntries: make(map[string]map[string]v1.ResourceRequirements), ResizeStatusEntries: make(map[string]v1.PodResizeStatus), } } diff --git a/pkg/kubelet/status/state/state.go b/pkg/kubelet/status/state/state.go index 2fdbe8a4474..4b85f164ab8 100644 --- a/pkg/kubelet/status/state/state.go +++ b/pkg/kubelet/status/state/state.go @@ -17,11 +17,11 @@ limitations under the License. package state import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) // PodResourceAllocation type is used in tracking resources allocated to pod's containers -type PodResourceAllocation map[string]map[string]v1.ResourceList +type PodResourceAllocation map[string]map[string]v1.ResourceRequirements // PodResizeStatus type is used in tracking the last resize decision for pod type PodResizeStatus map[string]v1.PodResizeStatus @@ -30,9 +30,9 @@ type PodResizeStatus map[string]v1.PodResizeStatus func (pr PodResourceAllocation) Clone() PodResourceAllocation { prCopy := make(PodResourceAllocation) for pod := range pr { - prCopy[pod] = make(map[string]v1.ResourceList) + prCopy[pod] = make(map[string]v1.ResourceRequirements) for container, alloc := range pr[pod] { - prCopy[pod][container] = alloc.DeepCopy() + prCopy[pod][container] = *alloc.DeepCopy() } } return prCopy @@ -40,14 +40,14 @@ func (pr PodResourceAllocation) Clone() PodResourceAllocation { // Reader interface used to read current pod resource allocation state type Reader interface { - GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) + GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) GetPodResourceAllocation() PodResourceAllocation GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) GetResizeStatus() PodResizeStatus } type writer interface { - SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error + SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetPodResourceAllocation(PodResourceAllocation) error SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error SetResizeStatus(PodResizeStatus) error diff --git a/pkg/kubelet/status/state/state_checkpoint.go b/pkg/kubelet/status/state/state_checkpoint.go index f0f5b2b7696..3230b2adcc4 100644 --- a/pkg/kubelet/status/state/state_checkpoint.go +++ b/pkg/kubelet/status/state/state_checkpoint.go @@ -21,7 +21,7 @@ import ( "path" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" @@ -82,7 +82,7 @@ func (sc *stateCheckpoint) storeState() error { podAllocation := sc.cache.GetPodResourceAllocation() for pod := range podAllocation { - checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceList) + checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceRequirements) for container, alloc := range podAllocation[pod] { checkpoint.AllocationEntries[pod][container] = alloc } @@ -103,7 +103,7 @@ func (sc *stateCheckpoint) storeState() error { } // GetContainerResourceAllocation returns current resources allocated to a pod's container -func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { sc.mux.RLock() defer sc.mux.RUnlock() return sc.cache.GetContainerResourceAllocation(podUID, containerName) @@ -131,7 +131,7 @@ func (sc *stateCheckpoint) GetResizeStatus() PodResizeStatus { } // SetContainerResourceAllocation sets resources allocated to a pod's container -func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error { +func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc) @@ -185,8 +185,8 @@ func NewNoopStateCheckpoint() State { return &noopStateCheckpoint{} } -func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ string, _ string) (v1.ResourceList, bool) { - return nil, false +func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ string, _ string) (v1.ResourceRequirements, bool) { + return v1.ResourceRequirements{}, false } func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation { @@ -201,7 +201,7 @@ func (sc *noopStateCheckpoint) GetResizeStatus() PodResizeStatus { return nil } -func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceList) error { +func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error { return nil } diff --git a/pkg/kubelet/status/state/state_mem.go b/pkg/kubelet/status/state/state_mem.go index 6a4047b1739..8e90c06f554 100644 --- a/pkg/kubelet/status/state/state_mem.go +++ b/pkg/kubelet/status/state/state_mem.go @@ -19,7 +19,7 @@ package state import ( "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) @@ -40,12 +40,12 @@ func NewStateMemory() State { } } -func (s *stateMemory) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (s *stateMemory) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { s.RLock() defer s.RUnlock() alloc, ok := s.podAllocation[podUID][containerName] - return alloc.DeepCopy(), ok + return *alloc.DeepCopy(), ok } func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation { @@ -72,12 +72,12 @@ func (s *stateMemory) GetResizeStatus() PodResizeStatus { return prs } -func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error { +func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error { s.Lock() defer s.Unlock() if _, ok := s.podAllocation[podUID]; !ok { - s.podAllocation[podUID] = make(map[string]v1.ResourceList) + s.podAllocation[podUID] = make(map[string]v1.ResourceRequirements) } s.podAllocation[podUID][containerName] = alloc diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index c7cb740586e..818b638afa7 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -143,17 +143,27 @@ type Manager interface { // the provided podUIDs. RemoveOrphanedStatuses(podUIDs map[types.UID]bool) - // GetContainerResourceAllocation returns checkpointed AllocatedResources value for the container - GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) - // GetPodResizeStatus returns checkpointed PodStatus.Resize value GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) - // SetPodAllocation checkpoints the resources allocated to a pod's containers. - SetPodAllocation(pod *v1.Pod) error - // SetPodResizeStatus checkpoints the last resizing decision for the pod. SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error + + allocationManager +} + +// TODO(tallclair): Refactor allocation state handling out of the status manager. +type allocationManager interface { + // GetContainerResourceAllocation returns the checkpointed AllocatedResources value for the container + GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) + + // UpdatePodFromAllocation overwrites the pod spec with the allocation. + // This function does a deep copy only if updates are needed. + // Returns the updated (or original) pod, and whether there was an allocation stored. + UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) + + // SetPodAllocation checkpoints the resources allocated to a pod's containers. + SetPodAllocation(pod *v1.Pod) error } const syncPeriod = 10 * time.Second @@ -236,12 +246,45 @@ func (m *manager) Start() { // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values // If checkpoint manager has not been initialized, it returns nil, false -func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) { +func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() return m.state.GetContainerResourceAllocation(podUID, containerName) } +// UpdatePodFromAllocation overwrites the pod spec with the allocation. +// This function does a deep copy only if updates are needed. +func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { + m.podStatusesLock.RLock() + defer m.podStatusesLock.RUnlock() + // TODO(tallclair): This clones the whole cache, but we only need 1 pod. + allocs := m.state.GetPodResourceAllocation() + return updatePodFromAllocation(pod, allocs) +} + +func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) { + allocated, found := allocs[string(pod.UID)] + if !found { + return pod, false + } + + updated := false + for i, c := range pod.Spec.Containers { + if cAlloc, ok := allocated[c.Name]; ok { + if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) { + // Allocation differs from pod spec, update + if !updated { + // If this is the first update, copy the pod + pod = pod.DeepCopy() + updated = true + } + pod.Spec.Containers[i].Resources = cAlloc + } + } + } + return pod, updated +} + // GetPodResizeStatus returns the last checkpointed ResizeStaus value // If checkpoint manager has not been initialized, it returns nil, false func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { @@ -255,10 +298,7 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() for _, container := range pod.Spec.Containers { - var alloc v1.ResourceList - if container.Resources.Requests != nil { - alloc = container.Resources.Requests.DeepCopy() - } + alloc := *container.Resources.DeepCopy() if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil { return err } diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 4e6b9d47010..19cef9fb9ce 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -32,6 +32,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -42,6 +43,7 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" + "k8s.io/kubernetes/pkg/kubelet/status/state" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util" @@ -2034,6 +2036,105 @@ func TestMergePodStatus(t *testing.T) { } +func TestUpdatePodFromAllocation(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "12345", + Name: "test", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "c1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI), + }, + }, + }, { + Name: "c2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI), + }, + }, + }}, + }, + } + + resizedPod := pod.DeepCopy() + resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI) + + tests := []struct { + name string + pod *v1.Pod + allocs state.PodResourceAllocation + expectPod *v1.Pod + expectUpdate bool + }{{ + name: "steady state", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c1": *pod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: false, + }, { + name: "no allocations", + pod: pod, + allocs: state.PodResourceAllocation{}, + expectUpdate: false, + }, { + name: "missing container allocation", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c2": *pod.Spec.Containers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: false, + }, { + name: "resized container", + pod: pod, + allocs: state.PodResourceAllocation{ + string(pod.UID): map[string]v1.ResourceRequirements{ + "c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(), + "c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(), + }, + }, + expectUpdate: true, + expectPod: resizedPod, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pod := test.pod.DeepCopy() + allocatedPod, updated := updatePodFromAllocation(pod, test.allocs) + + if test.expectUpdate { + assert.True(t, updated, "updated") + assert.Equal(t, test.expectPod, allocatedPod) + assert.NotEqual(t, pod, allocatedPod) + } else { + assert.False(t, updated, "updated") + assert.Same(t, pod, allocatedPod) + } + }) + } +} + func statusEqual(left, right v1.PodStatus) bool { left.Conditions = nil right.Conditions = nil