Rework allocated resources handling

This commit is contained in:
Tim Allclair 2024-10-24 00:15:43 -07:00
parent 53aa727708
commit 321eff34f7
12 changed files with 526 additions and 570 deletions

View File

@ -26,13 +26,10 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1helpers "k8s.io/component-helpers/scheduling/corev1" corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1resource "k8s.io/kubernetes/pkg/api/v1/resource" v1resource "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
volumeutils "k8s.io/kubernetes/pkg/volume/util" volumeutils "k8s.io/kubernetes/pkg/volume/util"
@ -1252,12 +1249,6 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats
for _, container := range containers { for _, container := range containers {
if container.Name == containerStats.Name { if container.Name == containerStats.Name {
requests := container.Resources.Requests[resourceToReclaim] requests := container.Resources.Requests[resourceToReclaim]
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) &&
(resourceToReclaim == v1.ResourceMemory || resourceToReclaim == v1.ResourceCPU) {
if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
requests = cs.AllocatedResources[resourceToReclaim]
}
}
var usage *resource.Quantity var usage *resource.Quantity
switch resourceToReclaim { switch resourceToReclaim {
case v1.ResourceEphemeralStorage: case v1.ResourceEphemeralStorage:

View File

@ -31,11 +31,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/features"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
) )
@ -3366,56 +3363,6 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool {
return true return true
} }
func TestEvictonMessageWithResourceResize(t *testing.T) {
testpod := newPod("testpod", 1, []v1.Container{
newContainer("testcontainer", newResourceList("", "200Mi", ""), newResourceList("", "", "")),
}, nil)
testpod.Status = v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "testcontainer",
AllocatedResources: newResourceList("", "100Mi", ""),
},
},
}
testpodMemory := resource.MustParse("150Mi")
testpodStats := newPodMemoryStats(testpod, testpodMemory)
testpodMemoryBytes := uint64(testpodMemory.Value())
testpodStats.Containers = []statsapi.ContainerStats{
{
Name: "testcontainer",
Memory: &statsapi.MemoryStats{
WorkingSetBytes: &testpodMemoryBytes,
},
},
}
stats := map[*v1.Pod]statsapi.PodStats{
testpod: testpodStats,
}
statsFn := func(pod *v1.Pod) (statsapi.PodStats, bool) {
result, found := stats[pod]
return result, found
}
threshold := []evictionapi.Threshold{}
observations := signalObservations{}
for _, enabled := range []bool{true, false} {
t.Run(fmt.Sprintf("InPlacePodVerticalScaling enabled=%v", enabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, enabled)
msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn, threshold, observations)
if enabled {
if !strings.Contains(msg, "testcontainer was using 150Mi, request is 100Mi") {
t.Errorf("Expected 'exceeds memory' eviction message was not found.")
}
} else {
if strings.Contains(msg, "which exceeds its request") {
t.Errorf("Found 'exceeds memory' eviction message which was not expected.")
}
}
})
}
}
func TestStatsNotFoundForPod(t *testing.T) { func TestStatsNotFoundForPod(t *testing.T) {
pod1 := newPod("fake-pod1", defaultPriority, []v1.Container{ pod1 := newPod("fake-pod1", defaultPriority, []v1.Container{
newContainer("fake-container1", newResourceList("", "", ""), newResourceList("", "", "")), newContainer("fake-container1", newResourceList("", "", ""), newResourceList("", "", "")),

View File

@ -33,7 +33,6 @@ import (
"time" "time"
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/google/go-cmp/cmp"
"github.com/opencontainers/selinux/go-selinux" "github.com/opencontainers/selinux/go-selinux"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
@ -1568,7 +1567,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
os.Exit(1) os.Exit(1)
} }
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod) kl.evictionManager.Start(kl.StatsProvider, kl.getAllocatedPods, kl.PodIsFinished, evictionMonitoringPeriod)
// container log manager must start after container runtime is up to retrieve information from container runtime // container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation. // and inform container to reopen log file after log rotation.
@ -1789,6 +1788,17 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
} }
} }
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && !kubetypes.IsStaticPod(pod) {
// Handle pod resize here instead of doing it in HandlePodUpdates because
// this conveniently retries any Deferred resize requests
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan
// See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060
pod, err = kl.handlePodResourcesResize(pod)
if err != nil {
return false, err
}
}
// Generate final API pod status with pod and status manager status // Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false) apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
@ -1943,16 +1953,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Ensure the pod is being probed // Ensure the pod is being probed
kl.probeManager.AddPod(pod) kl.probeManager.AddPod(pod)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Handle pod resize here instead of doing it in HandlePodUpdates because
// this conveniently retries any Deferred resize requests
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan
// See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060
if kl.podWorkers.CouldHaveRunningContainers(pod.UID) && !kubetypes.IsStaticPod(pod) {
pod = kl.handlePodResourcesResize(pod)
}
}
// TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker. // TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker.
// Currently, using cancellation from that context causes test failures. To remove this WithoutCancel, // Currently, using cancellation from that context causes test failures. To remove this WithoutCancel,
// any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling // any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling
@ -1975,7 +1975,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
return false, nil return false, nil
} }
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) {
// While resize is in progress, periodically call PLEG to update pod cache // While resize is in progress, periodically call PLEG to update pod cache
runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil {
@ -2296,9 +2296,7 @@ func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, strin
// Use allocated resources values from checkpoint store (source of truth) to determine fit // Use allocated resources values from checkpoint store (source of truth) to determine fit
otherPods := make([]*v1.Pod, 0, len(pods)) otherPods := make([]*v1.Pod, 0, len(pods))
for _, p := range pods { for _, p := range pods {
op := p.DeepCopy() op, _ := kl.statusManager.UpdatePodFromAllocation(p)
kl.updateContainerResourceAllocation(op)
otherPods = append(otherPods, op) otherPods = append(otherPods, op)
} }
attrs.OtherPods = otherPods attrs.OtherPods = otherPods
@ -2579,16 +2577,15 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// To handle kubelet restarts, test pod admissibility using AllocatedResources values // To handle kubelet restarts, test pod admissibility using AllocatedResources values
// (for cpu & memory) from checkpoint store. If found, that is the source of truth. // (for cpu & memory) from checkpoint store. If found, that is the source of truth.
podCopy := pod.DeepCopy() allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod)
kl.updateContainerResourceAllocation(podCopy)
// Check if we can admit the pod; if not, reject it. // Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok { if ok, reason, message := kl.canAdmitPod(activePods, allocatedPod); !ok {
kl.rejectPod(pod, reason, message) kl.rejectPod(pod, reason, message)
continue continue
} }
// For new pod, checkpoint the resource values at which the Pod has been admitted // For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(podCopy); err != nil { if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod)) klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
} }
@ -2609,17 +2606,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
} }
} }
// updateContainerResourceAllocation updates AllocatedResources values
// (for cpu & memory) from checkpoint store
func (kl *Kubelet) updateContainerResourceAllocation(pod *v1.Pod) {
for i, c := range pod.Spec.Containers {
allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name)
if found {
pod.Spec.Containers[i].Resources = allocatedResources
}
}
}
// HandlePodUpdates is the callback in the SyncHandler interface for pods // HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source. // being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
@ -2749,36 +2735,47 @@ func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
} }
} }
func isPodResizeInProgress(pod *v1.Pod, podStatus *v1.PodStatus) bool { func isPodResizeInProgress(pod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {
if cs, ok := podutil.GetContainerStatus(podStatus.ContainerStatuses, c.Name); ok { if cs := podStatus.FindContainerStatusByName(c.Name); cs != nil {
if cs.Resources == nil { if cs.State != kubecontainer.ContainerStateRunning || cs.Resources == nil {
continue continue
} }
if !cmp.Equal(c.Resources.Limits, cs.Resources.Limits) || !cmp.Equal(cs.AllocatedResources, cs.Resources.Requests) { if c.Resources.Requests != nil {
return true if cs.Resources.CPURequest != nil && !cs.Resources.CPURequest.Equal(*c.Resources.Requests.Cpu()) {
return true
}
}
if c.Resources.Limits != nil {
if cs.Resources.CPULimit != nil && !cs.Resources.CPULimit.Equal(*c.Resources.Limits.Cpu()) {
return true
}
if cs.Resources.MemoryLimit != nil && !cs.Resources.MemoryLimit.Equal(*c.Resources.Limits.Memory()) {
return true
}
} }
} }
} }
return false return false
} }
func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) { // canResizePod determines if the requested resize is currently feasible.
// Returns true if the resize can proceed.
func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) {
var otherActivePods []*v1.Pod var otherActivePods []*v1.Pod
node, err := kl.getNodeAnyWay() node, err := kl.getNodeAnyWay()
if err != nil { if err != nil {
klog.ErrorS(err, "getNodeAnyway function failed") klog.ErrorS(err, "getNodeAnyway function failed")
return false, nil, "" return false, ""
} }
podCopy := pod.DeepCopy()
cpuAvailable := node.Status.Allocatable.Cpu().MilliValue() cpuAvailable := node.Status.Allocatable.Cpu().MilliValue()
memAvailable := node.Status.Allocatable.Memory().Value() memAvailable := node.Status.Allocatable.Memory().Value()
cpuRequests := resource.GetResourceRequest(podCopy, v1.ResourceCPU) cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU)
memRequests := resource.GetResourceRequest(podCopy, v1.ResourceMemory) memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory)
if cpuRequests > cpuAvailable || memRequests > memAvailable { if cpuRequests > cpuAvailable || memRequests > memAvailable {
klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", podCopy.Name) klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", klog.KObj(pod))
return false, podCopy, v1.PodResizeStatusInfeasible return false, v1.PodResizeStatusInfeasible
} }
// Treat the existing pod needing resize as a new pod with desired resources seeking admit. // Treat the existing pod needing resize as a new pod with desired resources seeking admit.
@ -2790,76 +2787,42 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus)
} }
} }
if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, podCopy); !ok { if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, pod); !ok {
// Log reason and return. Let the next sync iteration retry the resize // Log reason and return. Let the next sync iteration retry the resize
klog.V(3).InfoS("Resize cannot be accommodated", "pod", podCopy.Name, "reason", failReason, "message", failMessage) klog.V(3).InfoS("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage)
return false, podCopy, v1.PodResizeStatusDeferred return false, v1.PodResizeStatusDeferred
} }
for _, container := range podCopy.Spec.Containers { return true, v1.PodResizeStatusInProgress
idx, found := podutil.GetIndexOfContainerStatus(podCopy.Status.ContainerStatuses, container.Name)
if found {
for rName, rQuantity := range container.Resources.Requests {
podCopy.Status.ContainerStatuses[idx].AllocatedResources[rName] = rQuantity
}
}
}
return true, podCopy, v1.PodResizeStatusInProgress
} }
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) *v1.Pod { func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) {
if pod.Status.Phase != v1.PodRunning { allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod)
return pod if !updated {
} // Pod is not resizing, nothing more to do here.
podResized := false return pod, nil
for _, container := range pod.Spec.Containers {
if len(container.Resources.Requests) == 0 {
continue
}
containerStatus, found := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name)
if !found {
klog.V(5).InfoS("ContainerStatus not found", "pod", pod.Name, "container", container.Name)
break
}
if len(containerStatus.AllocatedResources) != len(container.Resources.Requests) {
klog.V(5).InfoS("ContainerStatus.AllocatedResources length mismatch", "pod", pod.Name, "container", container.Name)
break
}
if !cmp.Equal(container.Resources.Requests, containerStatus.AllocatedResources) {
podResized = true
break
}
}
if !podResized {
return pod
} }
kl.podResizeMutex.Lock() kl.podResizeMutex.Lock()
defer kl.podResizeMutex.Unlock() defer kl.podResizeMutex.Unlock()
fit, updatedPod, resizeStatus := kl.canResizePod(pod) fit, resizeStatus := kl.canResizePod(pod)
if updatedPod == nil {
return pod
}
if fit { if fit {
// Update pod resource allocation checkpoint // Update pod resource allocation checkpoint
if err := kl.statusManager.SetPodAllocation(updatedPod); err != nil { if err := kl.statusManager.SetPodAllocation(pod); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate return nil, err
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(updatedPod))
return pod
} }
} else {
// If resize isn't immediately feasible, proceed with the allocated pod.
pod = allocatedPod
} }
if resizeStatus != "" { if resizeStatus != "" {
// Save resize decision to checkpoint // Save resize decision to checkpoint
if err := kl.statusManager.SetPodResizeStatus(updatedPod.UID, resizeStatus); err != nil { if err := kl.statusManager.SetPodResizeStatus(pod.UID, resizeStatus); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(updatedPod)) klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(pod))
return pod
} }
updatedPod.Status.Resize = resizeStatus
} }
kl.podManager.UpdatePod(updatedPod) return pod, nil
kl.statusManager.SetPodStatus(updatedPod, updatedPod.Status)
return updatedPod
} }
// LatestLoopEntryTime returns the last time in the sync loop monitor. // LatestLoopEntryTime returns the last time in the sync loop monitor.

View File

@ -207,6 +207,21 @@ func (kl *Kubelet) GetActivePods() []*v1.Pod {
return activePods return activePods
} }
// getAllocatedPods returns the active pods (see GetActivePods), but updates the pods to their
// allocated state.
func (kl *Kubelet) getAllocatedPods() []*v1.Pod {
activePods := kl.GetActivePods()
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return activePods
}
allocatedPods := make([]*v1.Pod, len(activePods))
for i, pod := range activePods {
allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod)
}
return allocatedPods
}
// makeBlockVolumes maps the raw block devices specified in the path of the container // makeBlockVolumes maps the raw block devices specified in the path of the container
// Experimental // Experimental
func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumepathhandler.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) { func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumepathhandler.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) {
@ -2081,13 +2096,14 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
} }
} }
container := kubecontainer.GetContainerSpec(pod, cName) container := kubecontainer.GetContainerSpec(pod, cName)
// AllocatedResources values come from checkpoint. It is the source-of-truth.
// Always set the status to the latest allocated resources, even if it differs from the
// allocation used by the current sync loop.
alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName)
if found { if found {
status.AllocatedResources = alloc.Requests status.AllocatedResources = alloc.Requests
} else if !(container.Resources.Requests == nil && container.Resources.Limits == nil) { } else if !(container.Resources.Requests == nil && container.Resources.Limits == nil) {
// Log error and fallback to AllocatedResources in oldStatus if it exists // This case is expected for ephemeral containers.
klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName)
if oldStatusFound { if oldStatusFound {
status.AllocatedResources = oldStatus.AllocatedResources status.AllocatedResources = oldStatus.AllocatedResources
} }
@ -2108,46 +2124,46 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
} }
// Convert Limits // Convert Limits
if container.Resources.Limits != nil { if alloc.Limits != nil {
limits = make(v1.ResourceList) limits = make(v1.ResourceList)
if cStatus.Resources != nil && cStatus.Resources.CPULimit != nil { if cStatus.Resources != nil && cStatus.Resources.CPULimit != nil {
limits[v1.ResourceCPU] = cStatus.Resources.CPULimit.DeepCopy() limits[v1.ResourceCPU] = cStatus.Resources.CPULimit.DeepCopy()
} else { } else {
determineResource(v1.ResourceCPU, container.Resources.Limits, oldStatus.Resources.Limits, limits) determineResource(v1.ResourceCPU, alloc.Limits, oldStatus.Resources.Limits, limits)
} }
if cStatus.Resources != nil && cStatus.Resources.MemoryLimit != nil { if cStatus.Resources != nil && cStatus.Resources.MemoryLimit != nil {
limits[v1.ResourceMemory] = cStatus.Resources.MemoryLimit.DeepCopy() limits[v1.ResourceMemory] = cStatus.Resources.MemoryLimit.DeepCopy()
} else { } else {
determineResource(v1.ResourceMemory, container.Resources.Limits, oldStatus.Resources.Limits, limits) determineResource(v1.ResourceMemory, alloc.Limits, oldStatus.Resources.Limits, limits)
} }
if ephemeralStorage, found := container.Resources.Limits[v1.ResourceEphemeralStorage]; found { if ephemeralStorage, found := alloc.Limits[v1.ResourceEphemeralStorage]; found {
limits[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() limits[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy()
} }
if storage, found := container.Resources.Limits[v1.ResourceStorage]; found { if storage, found := alloc.Limits[v1.ResourceStorage]; found {
limits[v1.ResourceStorage] = storage.DeepCopy() limits[v1.ResourceStorage] = storage.DeepCopy()
} }
convertCustomResources(container.Resources.Limits, limits) convertCustomResources(alloc.Limits, limits)
} }
// Convert Requests // Convert Requests
if status.AllocatedResources != nil { if alloc.Requests != nil {
requests = make(v1.ResourceList) requests = make(v1.ResourceList)
if cStatus.Resources != nil && cStatus.Resources.CPURequest != nil { if cStatus.Resources != nil && cStatus.Resources.CPURequest != nil {
requests[v1.ResourceCPU] = cStatus.Resources.CPURequest.DeepCopy() requests[v1.ResourceCPU] = cStatus.Resources.CPURequest.DeepCopy()
} else { } else {
determineResource(v1.ResourceCPU, status.AllocatedResources, oldStatus.Resources.Requests, requests) determineResource(v1.ResourceCPU, alloc.Requests, oldStatus.Resources.Requests, requests)
} }
if memory, found := status.AllocatedResources[v1.ResourceMemory]; found { if memory, found := alloc.Requests[v1.ResourceMemory]; found {
requests[v1.ResourceMemory] = memory.DeepCopy() requests[v1.ResourceMemory] = memory.DeepCopy()
} }
if ephemeralStorage, found := status.AllocatedResources[v1.ResourceEphemeralStorage]; found { if ephemeralStorage, found := alloc.Requests[v1.ResourceEphemeralStorage]; found {
requests[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() requests[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy()
} }
if storage, found := status.AllocatedResources[v1.ResourceStorage]; found { if storage, found := alloc.Requests[v1.ResourceStorage]; found {
requests[v1.ResourceStorage] = storage.DeepCopy() requests[v1.ResourceStorage] = storage.DeepCopy()
} }
convertCustomResources(status.AllocatedResources, requests) convertCustomResources(alloc.Requests, requests)
} }
resources := &v1.ResourceRequirements{ resources := &v1.ResourceRequirements{

View File

@ -2391,6 +2391,13 @@ func TestPodResourceAllocationReset(t *testing.T) {
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.statusManager = status.NewFakeManager() kubelet.statusManager = status.NewFakeManager()
// fakePodWorkers trigger syncPodFn synchronously on update, but entering
// kubelet.SyncPod while holding the podResizeMutex can lead to deadlock.
kubelet.podWorkers.(*fakePodWorkers).syncPodFn =
func(_ context.Context, _ kubetypes.SyncPodType, _, _ *v1.Pod, _ *kubecontainer.PodStatus) (bool, error) {
return false, nil
}
nodes := []*v1.Node{ nodes := []*v1.Node{
{ {
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
@ -2575,7 +2582,6 @@ func TestHandlePodResourcesResize(t *testing.T) {
testKubelet := newTestKubelet(t, false) testKubelet := newTestKubelet(t, false)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.statusManager = status.NewFakeManager()
cpu500m := resource.MustParse("500m") cpu500m := resource.MustParse("500m")
cpu1000m := resource.MustParse("1") cpu1000m := resource.MustParse("1")
@ -2717,14 +2723,24 @@ func TestHandlePodResourcesResize(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
tt.pod.Spec.Containers[0].Resources.Requests = tt.newRequests t.Run(tt.name, func(t *testing.T) {
tt.pod.Status.ContainerStatuses[0].AllocatedResources = v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M} kubelet.statusManager = status.NewFakeManager()
kubelet.handlePodResourcesResize(tt.pod) require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod))
updatedPod, found := kubelet.podManager.GetPodByName(tt.pod.Namespace, tt.pod.Name)
assert.True(t, found, "expected to find pod %s", tt.pod.Name) pod := tt.pod.DeepCopy()
assert.Equal(t, tt.expectedAllocations, updatedPod.Status.ContainerStatuses[0].AllocatedResources, tt.name) pod.Spec.Containers[0].Resources.Requests = tt.newRequests
assert.Equal(t, tt.expectedResize, updatedPod.Status.Resize, tt.name) updatedPod, err := kubelet.handlePodResourcesResize(pod)
testKubelet.fakeKubeClient.ClearActions() require.NoError(t, err)
assert.Equal(t, tt.expectedAllocations, updatedPod.Spec.Containers[0].Resources.Requests, "updated pod spec resources")
alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(pod.UID), pod.Spec.Containers[0].Name)
require.True(t, found, "container allocation")
assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation")
resizeStatus, found := kubelet.statusManager.GetPodResizeStatus(string(pod.UID))
require.True(t, found, "pod resize status")
assert.Equal(t, tt.expectedResize, resizeStatus)
})
} }
} }
@ -3276,3 +3292,132 @@ func TestSyncPodSpans(t *testing.T) {
assert.Equalf(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), "runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID()) assert.Equalf(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), "runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID())
} }
} }
func TestIsPodResizeInProgress(t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345",
Name: "test",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "c1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
},
},
}, {
Name: "c2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
}},
},
}
steadyStateC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(400, resource.DecimalSI),
},
}
resizeMemC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(800, resource.DecimalSI),
},
}
resizeCPUReqC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(200, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(400, resource.DecimalSI),
},
}
resizeCPULimitC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(600, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(400, resource.DecimalSI),
},
}
steadyStateC2Status := &kubecontainer.Status{
Name: "c2",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(500, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(700, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(800, resource.DecimalSI),
},
}
mkPodStatus := func(containerStatuses ...*kubecontainer.Status) *kubecontainer.PodStatus {
return &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
ContainerStatuses: containerStatuses,
}
}
tests := []struct {
name string
status *kubecontainer.PodStatus
expectResize bool
}{{
name: "steady state",
status: mkPodStatus(steadyStateC1Status, steadyStateC2Status),
expectResize: false,
}, {
name: "terminated container",
status: mkPodStatus(&kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateExited,
Resources: resizeMemC1Status.Resources,
}, steadyStateC2Status),
expectResize: false,
}, {
name: "missing container",
status: mkPodStatus(steadyStateC2Status),
expectResize: false,
}, {
name: "resizing memory limit",
status: mkPodStatus(resizeMemC1Status, steadyStateC2Status),
expectResize: true,
}, {
name: "resizing cpu request",
status: mkPodStatus(resizeCPUReqC1Status, steadyStateC2Status),
expectResize: true,
}, {
name: "resizing cpu limit",
status: mkPodStatus(resizeCPULimitC1Status, steadyStateC2Status),
expectResize: true,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expectResize, isPodResizeInProgress(pod, test.status))
})
}
}

View File

@ -805,291 +805,67 @@ func TestGenerateLinuxContainerResources(t *testing.T) {
}, },
}, },
}, },
Status: v1.PodStatus{},
} }
for _, tc := range []struct { for _, tc := range []struct {
name string name string
scalingFg bool
limits v1.ResourceList limits v1.ResourceList
requests v1.ResourceList requests v1.ResourceList
cStatus []v1.ContainerStatus
expected *runtimeapi.LinuxContainerResources expected *runtimeapi.LinuxContainerResources
cgroupVersion CgroupVersion cgroupVersion CgroupVersion
}{ }{
{ {
"requests & limits, cpu & memory, guaranteed qos - no container status", "requests & limits, cpu & memory, guaranteed qos",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1, cgroupV1,
}, },
{ {
"requests & limits, cpu & memory, burstable qos - no container status", "requests & limits, cpu & memory, burstable qos",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970},
cgroupV1, cgroupV1,
}, },
{ {
"best-effort qos - no container status", "best-effort qos",
true,
nil, nil,
nil, nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000}, &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000},
cgroupV1, cgroupV1,
}, },
{ {
"requests & limits, cpu & memory, guaranteed qos - empty resources container status", "requests & limits, cpu & memory, guaranteed qos",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"requests & limits, cpu & memory, burstable qos - empty resources container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999},
cgroupV1,
},
{
"best-effort qos - empty resources container status",
true,
nil,
nil,
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - no container status",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"best-effort qos - no container status",
false,
nil,
nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - no container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2, cgroupV2,
}, },
{ {
"requests & limits, cpu & memory, burstable qos - no container status", "requests & limits, cpu & memory, burstable qos",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")}, v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}}, &runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2, cgroupV2,
}, },
{ {
"best-effort qos - no container status", "best-effort qos",
true,
nil, nil,
nil, nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - empty resources container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, burstable qos - empty resources container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"best-effort qos - empty resources container status",
true,
nil,
nil,
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - no container status",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"best-effort qos - no container status",
false,
nil,
nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}}, &runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2, cgroupV2,
}, },
} { } {
t.Run(tc.name, func(t *testing.T) { t.Run(fmt.Sprintf("cgroup%s:%s", tc.cgroupVersion, tc.name), func(t *testing.T) {
defer setSwapControllerAvailableDuringTest(false)() defer setSwapControllerAvailableDuringTest(false)()
if tc.scalingFg {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
}
setCgroupVersionDuringTest(tc.cgroupVersion) setCgroupVersionDuringTest(tc.cgroupVersion)
pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests} pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests}
if len(tc.cStatus) > 0 {
pod.Status.ContainerStatuses = tc.cStatus
}
resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false) resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false)
tc.expected.HugepageLimits = resources.HugepageLimits tc.expected.HugepageLimits = resources.HugepageLimits
if !cmp.Equal(resources, tc.expected) { assert.Equal(t, tc.expected, resources)
t.Errorf("Test %s: expected resources %+v, but got %+v", tc.name, tc.expected, resources)
}
}) })
} }
} }

View File

@ -26,7 +26,6 @@ import (
"time" "time"
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
grpcstatus "google.golang.org/grpc/status" grpcstatus "google.golang.org/grpc/status"
crierror "k8s.io/cri-api/pkg/errors" crierror "k8s.io/cri-api/pkg/errors"
@ -47,7 +46,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/credentialprovider/plugin" "k8s.io/kubernetes/pkg/credentialprovider/plugin"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
@ -551,60 +549,49 @@ func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool {
return true return true
} }
func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool { // computePodResizeAction determines the actions required (if any) to resize the given container.
// Returns whether to keep (true) or restart (false) the container.
func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) (keepContainer bool) {
container := pod.Spec.Containers[containerIdx] container := pod.Spec.Containers[containerIdx]
if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 {
return true
}
// Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired) // Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired)
// with v1.Status.Resources / runtime.Status.Resources (last known actual). // with v1.Status.Resources / runtime.Status.Resources (last known actual).
// Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources. // Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources.
// Skip if runtime containerID doesn't match pod.Status containerID (container is restarting) // Skip if runtime containerID doesn't match pod.Status containerID (container is restarting)
apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name) if kubeContainerStatus.State != kubecontainer.ContainerStateRunning {
if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil ||
kubeContainerStatus.State != kubecontainer.ContainerStateRunning ||
kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID ||
!cmp.Equal(container.Resources.Requests, apiContainerStatus.AllocatedResources) {
return true return true
} }
desiredMemoryLimit := container.Resources.Limits.Memory().Value() if kubeContainerStatus.Resources == nil {
desiredCPULimit := container.Resources.Limits.Cpu().MilliValue() // Not enough information to actuate a resize.
desiredCPURequest := container.Resources.Requests.Cpu().MilliValue() klog.V(4).InfoS("Missing runtime resource information for container", "pod", klog.KObj(pod), "container", container.Name)
currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value()
currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue()
currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue()
// Runtime container status resources (from CRI), if set, supercedes v1(api) container status resrouces.
if kubeContainerStatus.Resources != nil {
if kubeContainerStatus.Resources.MemoryLimit != nil {
currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value()
}
if kubeContainerStatus.Resources.CPULimit != nil {
currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue()
}
if kubeContainerStatus.Resources.CPURequest != nil {
currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue()
}
}
// Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during
// handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest.
if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest {
return true return true
} }
desiredResources := containerResources{ desiredResources := containerResources{
memoryLimit: desiredMemoryLimit, memoryLimit: container.Resources.Limits.Memory().Value(),
memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(), memoryRequest: container.Resources.Requests.Memory().Value(),
cpuLimit: desiredCPULimit, cpuLimit: container.Resources.Limits.Cpu().MilliValue(),
cpuRequest: desiredCPURequest, cpuRequest: container.Resources.Requests.Cpu().MilliValue(),
} }
currentResources := containerResources{
memoryLimit: currentMemoryLimit, // Default current values to the desired values so that a resize isn't triggered for missing values.
memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(), currentResources := desiredResources
cpuLimit: currentCPULimit, if kubeContainerStatus.Resources.MemoryLimit != nil {
cpuRequest: currentCPURequest, currentResources.memoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value()
}
if kubeContainerStatus.Resources.CPULimit != nil {
currentResources.cpuLimit = kubeContainerStatus.Resources.CPULimit.MilliValue()
}
if kubeContainerStatus.Resources.CPURequest != nil {
currentResources.cpuRequest = kubeContainerStatus.Resources.CPURequest.MilliValue()
}
// Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during
// handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest.
if currentResources == desiredResources {
// No resize required.
return true
} }
resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy) resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy)
@ -637,9 +624,9 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe
changes.ContainersToUpdate[rName][0] = cUpdateInfo changes.ContainersToUpdate[rName][0] = cUpdateInfo
} }
} }
resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit)
resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit) resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit)
resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest) resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest)
if restartCPULim || restartCPUReq || restartMemLim { if restartCPULim || restartCPUReq || restartMemLim {
// resize policy requires this container to restart // resize policy requires this container to restart
changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{ changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{
@ -652,12 +639,12 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe
return false return false
} else { } else {
if resizeMemLim { if resizeMemLim {
markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) markContainerForUpdate(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit)
} }
if resizeCPULim { if resizeCPULim {
markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit) markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit)
} else if resizeCPUReq { } else if resizeCPUReq {
markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest) markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest)
} }
} }
return true return true
@ -942,10 +929,6 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
if isInPlacePodVerticalScalingAllowed(pod) { if isInPlacePodVerticalScalingAllowed(pod) {
changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo) changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo)
latestPodStatus, err := m.GetPodStatus(ctx, podStatus.ID, pod.Name, pod.Namespace)
if err == nil {
podStatus = latestPodStatus
}
} }
// Number of running containers to keep. // Number of running containers to keep.

View File

@ -52,6 +52,7 @@ import (
imagetypes "k8s.io/kubernetes/pkg/kubelet/images" imagetypes "k8s.io/kubernetes/pkg/kubelet/images"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/ptr"
) )
var ( var (
@ -2174,7 +2175,7 @@ func makeBasePodAndStatusWithInitAndEphemeralContainers() (*v1.Pod, *kubecontain
func TestComputePodActionsForPodResize(t *testing.T) { func TestComputePodActionsForPodResize(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
fakeRuntime, _, m, err := createTestRuntimeManager() _, _, m, err := createTestRuntimeManager()
m.machineInfo.MemoryCapacity = 17179860387 // 16GB m.machineInfo.MemoryCapacity = 17179860387 // 16GB
assert.NoError(t, err) assert.NoError(t, err)
@ -2188,18 +2189,19 @@ func TestComputePodActionsForPodResize(t *testing.T) {
memPolicyRestartRequired := v1.ContainerResizePolicy{ResourceName: v1.ResourceMemory, RestartPolicy: v1.RestartContainer} memPolicyRestartRequired := v1.ContainerResizePolicy{ResourceName: v1.ResourceMemory, RestartPolicy: v1.RestartContainer}
for desc, test := range map[string]struct { for desc, test := range map[string]struct {
podResizePolicyFn func(*v1.Pod) setupFn func(*v1.Pod, *kubecontainer.PodStatus)
mutatePodFn func(*v1.Pod)
getExpectedPodActionsFn func(*v1.Pod, *kubecontainer.PodStatus) *podActions getExpectedPodActionsFn func(*v1.Pod, *kubecontainer.PodStatus) *podActions
}{ }{
"Update container CPU and memory resources": { "Update container CPU and memory resources": {
mutatePodFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ c := &pod.Spec.Containers[1]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
} }
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ cStatus.Resources = &kubecontainer.ContainerResources{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, CPULimit: ptr.To(cpu200m.DeepCopy()),
MemoryLimit: ptr.To(mem200M.DeepCopy()),
} }
} }
}, },
@ -2244,13 +2246,15 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
"Update container CPU resources": { "Update container CPU resources": {
mutatePodFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ c := &pod.Spec.Containers[1]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
} }
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ cStatus.Resources = &kubecontainer.ContainerResources{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, CPULimit: ptr.To(cpu200m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
} }
} }
}, },
@ -2281,13 +2285,15 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
"Update container memory resources": { "Update container memory resources": {
mutatePodFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[2].Resources = v1.ResourceRequirements{ c := &pod.Spec.Containers[2]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M},
} }
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ cStatus.Resources = &kubecontainer.ContainerResources{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, CPULimit: ptr.To(cpu200m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
} }
} }
}, },
@ -2318,12 +2324,15 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
"Nothing when spec.Resources and status.Resources are equal": { "Nothing when spec.Resources and status.Resources are equal": {
mutatePodFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{ c := &pod.Spec.Containers[1]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m}, Limits: v1.ResourceList{v1.ResourceCPU: cpu200m},
} }
pod.Status.ContainerStatuses[1].Resources = &v1.ResourceRequirements{ if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m}, cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu200m.DeepCopy()),
}
} }
}, },
getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions { getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions {
@ -2337,16 +2346,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
"Update container CPU and memory resources with Restart policy for CPU": { "Update container CPU and memory resources with Restart policy for CPU": {
podResizePolicyFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[0].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} c := &pod.Spec.Containers[0]
}, c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired}
mutatePodFn: func(pod *v1.Pod) { c.Resources = v1.ResourceRequirements{
pod.Spec.Containers[0].Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M},
} }
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[0].Name); found { if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ cStatus.Resources = &kubecontainer.ContainerResources{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
} }
} }
}, },
@ -2368,16 +2377,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
"Update container CPU and memory resources with Restart policy for memory": { "Update container CPU and memory resources with Restart policy for memory": {
podResizePolicyFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} c := &pod.Spec.Containers[2]
}, c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired}
mutatePodFn: func(pod *v1.Pod) { c.Resources = v1.ResourceRequirements{
pod.Spec.Containers[2].Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M}, Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M},
} }
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ cStatus.Resources = &kubecontainer.ContainerResources{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
} }
} }
}, },
@ -2399,16 +2408,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
"Update container memory resources with Restart policy for CPU": { "Update container memory resources with Restart policy for CPU": {
podResizePolicyFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[1].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired} c := &pod.Spec.Containers[1]
}, c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired}
mutatePodFn: func(pod *v1.Pod) { c.Resources = v1.ResourceRequirements{
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem200M}, Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem200M},
} }
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found { if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ cStatus.Resources = &kubecontainer.ContainerResources{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
} }
} }
}, },
@ -2439,16 +2448,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
"Update container CPU resources with Restart policy for memory": { "Update container CPU resources with Restart policy for memory": {
podResizePolicyFn: func(pod *v1.Pod) { setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired} c := &pod.Spec.Containers[2]
}, c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired}
mutatePodFn: func(pod *v1.Pod) { c.Resources = v1.ResourceRequirements{
pod.Spec.Containers[2].Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M}, Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M},
} }
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found { if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{ cStatus.Resources = &kubecontainer.ContainerResources{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M}, CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
} }
} }
}, },
@ -2479,41 +2488,28 @@ func TestComputePodActionsForPodResize(t *testing.T) {
}, },
}, },
} { } {
pod, kps := makeBasePodAndStatus() t.Run(desc, func(t *testing.T) {
for idx := range pod.Spec.Containers { pod, status := makeBasePodAndStatus()
// default resize policy when pod resize feature is enabled for idx := range pod.Spec.Containers {
pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired} // default resize policy when pod resize feature is enabled
} pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired}
if test.podResizePolicyFn != nil {
test.podResizePolicyFn(pod)
}
for idx := range pod.Spec.Containers {
// compute hash
if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil {
kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx])
} }
} if test.setupFn != nil {
makeAndSetFakePod(t, m, fakeRuntime, pod) test.setupFn(pod, status)
ctx := context.Background() }
status, _ := m.GetPodStatus(ctx, kps.ID, pod.Name, pod.Namespace)
for idx := range pod.Spec.Containers { for idx := range pod.Spec.Containers {
if rcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); rcs != nil { // compute hash
if csIdx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[idx].Name); found { if kcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil {
pod.Status.ContainerStatuses[csIdx].ContainerID = rcs.ID.String() kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx])
} }
} }
}
for idx := range pod.Spec.Containers { ctx := context.Background()
if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil { expectedActions := test.getExpectedPodActionsFn(pod, status)
kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx]) actions := m.computePodActions(ctx, pod, status)
} verifyActions(t, expectedActions, &actions, desc)
} })
if test.mutatePodFn != nil {
test.mutatePodFn(pod)
}
expectedActions := test.getExpectedPodActionsFn(pod, status)
actions := m.computePodActions(ctx, pod, status)
verifyActions(t, expectedActions, &actions, desc)
} }
} }

View File

@ -18,10 +18,7 @@ package qos
import ( import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
) )
@ -63,11 +60,6 @@ func GetContainerOOMScoreAdjust(pod *v1.Pod, container *v1.Container, memoryCapa
// targets for OOM kills. // targets for OOM kills.
// Note that this is a heuristic, it won't work if a container has many small processes. // Note that this is a heuristic, it won't work if a container has many small processes.
memoryRequest := container.Resources.Requests.Memory().Value() memoryRequest := container.Resources.Requests.Memory().Value()
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
memoryRequest = cs.AllocatedResources.Memory().Value()
}
}
oomScoreAdjust := 1000 - (1000*memoryRequest)/memoryCapacity oomScoreAdjust := 1000 - (1000*memoryRequest)/memoryCapacity
// A guaranteed pod using 100% of memory can have an OOM score of 10. Ensure // A guaranteed pod using 100% of memory can have an OOM score of 10. Ensure
// that burstable pods have a higher OOM score adjustment. // that burstable pods have a higher OOM score adjustment.

View File

@ -69,8 +69,12 @@ func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerNam
} }
func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
klog.InfoS("GetPodResizeStatus()") return m.state.GetPodResizeStatus(podUID)
return "", false }
func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
allocs := m.state.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs)
} }
func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error { func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
@ -86,8 +90,7 @@ func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
} }
func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error { func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error {
klog.InfoS("SetPodResizeStatus()") return m.state.SetPodResizeStatus(string(podUID), resizeStatus)
return nil
} }
// NewFakeManager creates empty/fake memory manager // NewFakeManager creates empty/fake memory manager

View File

@ -143,17 +143,27 @@ type Manager interface {
// the provided podUIDs. // the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool) RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
// GetContainerResourceAllocation returns checkpointed AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
// GetPodResizeStatus returns checkpointed PodStatus.Resize value // GetPodResizeStatus returns checkpointed PodStatus.Resize value
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error
// SetPodResizeStatus checkpoints the last resizing decision for the pod. // SetPodResizeStatus checkpoints the last resizing decision for the pod.
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error
allocationManager
}
// TODO(tallclair): Refactor allocation state handling out of the status manager.
type allocationManager interface {
// GetContainerResourceAllocation returns the checkpointed AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
// Returns the updated (or original) pod, and whether there was an allocation stored.
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error
} }
const syncPeriod = 10 * time.Second const syncPeriod = 10 * time.Second
@ -242,6 +252,39 @@ func (m *manager) GetContainerResourceAllocation(podUID string, containerName st
return m.state.GetContainerResourceAllocation(podUID, containerName) return m.state.GetContainerResourceAllocation(podUID, containerName)
} }
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
allocs := m.state.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs)
}
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
allocated, found := allocs[string(pod.UID)]
if !found {
return pod, false
}
updated := false
for i, c := range pod.Spec.Containers {
if cAlloc, ok := allocated[c.Name]; ok {
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
// Allocation differs from pod spec, update
if !updated {
// If this is the first update, copy the pod
pod = pod.DeepCopy()
updated = true
}
pod.Spec.Containers[i].Resources = cAlloc
}
}
}
return pod, updated
}
// GetPodResizeStatus returns the last checkpointed ResizeStaus value // GetPodResizeStatus returns the last checkpointed ResizeStaus value
// If checkpoint manager has not been initialized, it returns nil, false // If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) { func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {

View File

@ -32,6 +32,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -42,6 +43,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status/state"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
@ -2034,6 +2036,105 @@ func TestMergePodStatus(t *testing.T) {
} }
func TestUpdatePodFromAllocation(t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345",
Name: "test",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "c1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
},
},
}, {
Name: "c2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
}},
},
}
resizedPod := pod.DeepCopy()
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
tests := []struct {
name string
pod *v1.Pod
allocs state.PodResourceAllocation
expectPod *v1.Pod
expectUpdate bool
}{{
name: "steady state",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "no allocations",
pod: pod,
allocs: state.PodResourceAllocation{},
expectUpdate: false,
}, {
name: "missing container allocation",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "resized container",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: true,
expectPod: resizedPod,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pod := test.pod.DeepCopy()
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
if test.expectUpdate {
assert.True(t, updated, "updated")
assert.Equal(t, test.expectPod, allocatedPod)
assert.NotEqual(t, pod, allocatedPod)
} else {
assert.False(t, updated, "updated")
assert.Same(t, pod, allocatedPod)
}
})
}
}
func statusEqual(left, right v1.PodStatus) bool { func statusEqual(left, right v1.PodStatus) bool {
left.Conditions = nil left.Conditions = nil
right.Conditions = nil right.Conditions = nil