Merge pull request #128269 from tallclair/allocated

[FG:InPlacePodVerticalScaling] Rework handling of allocated resources
This commit is contained in:
Kubernetes Prow Robot 2024-10-25 23:24:52 +01:00 committed by GitHub
commit b3cf9c6e5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 616 additions and 672 deletions

View File

@ -26,13 +26,10 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1resource "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
volumeutils "k8s.io/kubernetes/pkg/volume/util"
@ -1252,12 +1249,6 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats
for _, container := range containers {
if container.Name == containerStats.Name {
requests := container.Resources.Requests[resourceToReclaim]
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) &&
(resourceToReclaim == v1.ResourceMemory || resourceToReclaim == v1.ResourceCPU) {
if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
requests = cs.AllocatedResources[resourceToReclaim]
}
}
var usage *resource.Quantity
switch resourceToReclaim {
case v1.ResourceEphemeralStorage:

View File

@ -31,11 +31,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/features"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
@ -3366,56 +3363,6 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool {
return true
}
func TestEvictonMessageWithResourceResize(t *testing.T) {
testpod := newPod("testpod", 1, []v1.Container{
newContainer("testcontainer", newResourceList("", "200Mi", ""), newResourceList("", "", "")),
}, nil)
testpod.Status = v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Name: "testcontainer",
AllocatedResources: newResourceList("", "100Mi", ""),
},
},
}
testpodMemory := resource.MustParse("150Mi")
testpodStats := newPodMemoryStats(testpod, testpodMemory)
testpodMemoryBytes := uint64(testpodMemory.Value())
testpodStats.Containers = []statsapi.ContainerStats{
{
Name: "testcontainer",
Memory: &statsapi.MemoryStats{
WorkingSetBytes: &testpodMemoryBytes,
},
},
}
stats := map[*v1.Pod]statsapi.PodStats{
testpod: testpodStats,
}
statsFn := func(pod *v1.Pod) (statsapi.PodStats, bool) {
result, found := stats[pod]
return result, found
}
threshold := []evictionapi.Threshold{}
observations := signalObservations{}
for _, enabled := range []bool{true, false} {
t.Run(fmt.Sprintf("InPlacePodVerticalScaling enabled=%v", enabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, enabled)
msg, _ := evictionMessage(v1.ResourceMemory, testpod, statsFn, threshold, observations)
if enabled {
if !strings.Contains(msg, "testcontainer was using 150Mi, request is 100Mi") {
t.Errorf("Expected 'exceeds memory' eviction message was not found.")
}
} else {
if strings.Contains(msg, "which exceeds its request") {
t.Errorf("Found 'exceeds memory' eviction message which was not expected.")
}
}
})
}
}
func TestStatsNotFoundForPod(t *testing.T) {
pod1 := newPod("fake-pod1", defaultPriority, []v1.Container{
newContainer("fake-container1", newResourceList("", "", ""), newResourceList("", "", "")),

View File

@ -27,13 +27,13 @@ import (
"os"
"path/filepath"
sysruntime "runtime"
"slices"
"sort"
"sync"
"sync/atomic"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/google/go-cmp/cmp"
"github.com/opencontainers/selinux/go-selinux"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
@ -1581,7 +1581,8 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
os.Exit(1)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod)
// Eviction decisions are based on the allocated (rather than desired) pod resources.
kl.evictionManager.Start(kl.StatsProvider, kl.getAllocatedPods, kl.PodIsFinished, evictionMonitoringPeriod)
// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.
@ -1806,6 +1807,20 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
}
// handlePodResourcesResize updates the pod to use the allocated resources. This should come
// before the main business logic of SyncPod, so that a consistent view of the pod is used
// across the sync loop.
if kuberuntime.IsInPlacePodVerticalScalingAllowed(pod) {
// Handle pod resize here instead of doing it in HandlePodUpdates because
// this conveniently retries any Deferred resize requests
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan
// See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060
pod, err = kl.handlePodResourcesResize(pod)
if err != nil {
return false, err
}
}
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
@ -1960,16 +1975,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Ensure the pod is being probed
kl.probeManager.AddPod(pod)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Handle pod resize here instead of doing it in HandlePodUpdates because
// this conveniently retries any Deferred resize requests
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan
// See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060
if kl.podWorkers.CouldHaveRunningContainers(pod.UID) && !kubetypes.IsStaticPod(pod) {
pod = kl.handlePodResourcesResize(pod)
}
}
// TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker.
// Currently, using cancellation from that context causes test failures. To remove this WithoutCancel,
// any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling
@ -1992,8 +1997,10 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
return false, nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) {
// While resize is in progress, periodically call PLEG to update pod cache
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) {
// While resize is in progress, periodically request the latest status from the runtime via
// the PLEG. This is necessary since ordinarily pod status is only fetched when a container
// undergoes a state transition.
runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil {
klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod))
@ -2304,23 +2311,14 @@ func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// allocatedPods should represent the pods that have already been admitted, along with their
// admitted (allocated) resources.
func (kl *Kubelet) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Use allocated resources values from checkpoint store (source of truth) to determine fit
otherPods := make([]*v1.Pod, 0, len(pods))
for _, p := range pods {
op := p.DeepCopy()
kl.updateContainerResourceAllocation(op)
otherPods = append(otherPods, op)
}
attrs.OtherPods = otherPods
}
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: allocatedPods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
@ -2560,7 +2558,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
defer kl.podResizeMutex.Unlock()
}
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
@ -2590,29 +2587,30 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
// we simply avoid doing any work.
// We also do not try to admit the pod that is already in terminated state.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) && !podutil.IsPodPhaseTerminal(pod.Status.Phase) {
// We failed pods that we rejected, so activePods include all admitted
// We failed pods that we rejected, so allocatedPods include all admitted
// pods that are alive.
activePods := kl.filterOutInactivePods(existingPods)
allocatedPods := kl.getAllocatedPods()
// Filter out the pod being evaluated.
allocatedPods = slices.DeleteFunc(allocatedPods, func(p *v1.Pod) bool { return p.UID == pod.UID })
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// To handle kubelet restarts, test pod admissibility using AllocatedResources values
// (for cpu & memory) from checkpoint store. If found, that is the source of truth.
podCopy := pod.DeepCopy()
kl.updateContainerResourceAllocation(podCopy)
allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok {
if ok, reason, message := kl.canAdmitPod(allocatedPods, allocatedPod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
// For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(podCopy); err != nil {
if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
}
} else {
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
if ok, reason, message := kl.canAdmitPod(allocatedPods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
@ -2627,22 +2625,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
}
}
// updateContainerResourceAllocation updates AllocatedResources values
// (for cpu & memory) from checkpoint store
func (kl *Kubelet) updateContainerResourceAllocation(pod *v1.Pod) {
for _, c := range pod.Spec.Containers {
allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name)
if c.Resources.Requests != nil && found {
if _, ok := allocatedResources[v1.ResourceCPU]; ok {
c.Resources.Requests[v1.ResourceCPU] = allocatedResources[v1.ResourceCPU]
}
if _, ok := allocatedResources[v1.ResourceMemory]; ok {
c.Resources.Requests[v1.ResourceMemory] = allocatedResources[v1.ResourceMemory]
}
}
}
}
// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
@ -2772,117 +2754,87 @@ func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
}
}
func isPodResizeInProgress(pod *v1.Pod, podStatus *v1.PodStatus) bool {
func isPodResizeInProgress(pod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
for _, c := range pod.Spec.Containers {
if cs, ok := podutil.GetContainerStatus(podStatus.ContainerStatuses, c.Name); ok {
if cs.Resources == nil {
if cs := podStatus.FindContainerStatusByName(c.Name); cs != nil {
if cs.State != kubecontainer.ContainerStateRunning || cs.Resources == nil {
continue
}
if !cmp.Equal(c.Resources.Limits, cs.Resources.Limits) || !cmp.Equal(cs.AllocatedResources, cs.Resources.Requests) {
return true
if c.Resources.Requests != nil {
if cs.Resources.CPURequest != nil && !cs.Resources.CPURequest.Equal(*c.Resources.Requests.Cpu()) {
return true
}
}
if c.Resources.Limits != nil {
if cs.Resources.CPULimit != nil && !cs.Resources.CPULimit.Equal(*c.Resources.Limits.Cpu()) {
return true
}
if cs.Resources.MemoryLimit != nil && !cs.Resources.MemoryLimit.Equal(*c.Resources.Limits.Memory()) {
return true
}
}
}
}
return false
}
func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) {
var otherActivePods []*v1.Pod
// canResizePod determines if the requested resize is currently feasible.
// pod should hold the desired (pre-allocated) spec.
// Returns true if the resize can proceed.
func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) {
node, err := kl.getNodeAnyWay()
if err != nil {
klog.ErrorS(err, "getNodeAnyway function failed")
return false, nil, ""
return false, ""
}
podCopy := pod.DeepCopy()
cpuAvailable := node.Status.Allocatable.Cpu().MilliValue()
memAvailable := node.Status.Allocatable.Memory().Value()
cpuRequests := resource.GetResourceRequest(podCopy, v1.ResourceCPU)
memRequests := resource.GetResourceRequest(podCopy, v1.ResourceMemory)
cpuRequests := resource.GetResourceRequest(pod, v1.ResourceCPU)
memRequests := resource.GetResourceRequest(pod, v1.ResourceMemory)
if cpuRequests > cpuAvailable || memRequests > memAvailable {
klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", podCopy.Name)
return false, podCopy, v1.PodResizeStatusInfeasible
klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", klog.KObj(pod))
return false, v1.PodResizeStatusInfeasible
}
// Treat the existing pod needing resize as a new pod with desired resources seeking admit.
// If desired resources don't fit, pod continues to run with currently allocated resources.
activePods := kl.GetActivePods()
for _, p := range activePods {
if p.UID != pod.UID {
otherActivePods = append(otherActivePods, p)
}
}
allocatedPods := kl.getAllocatedPods()
allocatedPods = slices.DeleteFunc(allocatedPods, func(p *v1.Pod) bool { return p.UID == pod.UID })
if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, podCopy); !ok {
if ok, failReason, failMessage := kl.canAdmitPod(allocatedPods, pod); !ok {
// Log reason and return. Let the next sync iteration retry the resize
klog.V(3).InfoS("Resize cannot be accommodated", "pod", podCopy.Name, "reason", failReason, "message", failMessage)
return false, podCopy, v1.PodResizeStatusDeferred
klog.V(3).InfoS("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage)
return false, v1.PodResizeStatusDeferred
}
for _, container := range podCopy.Spec.Containers {
idx, found := podutil.GetIndexOfContainerStatus(podCopy.Status.ContainerStatuses, container.Name)
if found {
for rName, rQuantity := range container.Resources.Requests {
podCopy.Status.ContainerStatuses[idx].AllocatedResources[rName] = rQuantity
}
}
}
return true, podCopy, v1.PodResizeStatusInProgress
return true, v1.PodResizeStatusInProgress
}
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) *v1.Pod {
if pod.Status.Phase != v1.PodRunning {
return pod
}
podResized := false
for _, container := range pod.Spec.Containers {
if len(container.Resources.Requests) == 0 {
continue
}
containerStatus, found := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name)
if !found {
klog.V(5).InfoS("ContainerStatus not found", "pod", pod.Name, "container", container.Name)
break
}
if len(containerStatus.AllocatedResources) != len(container.Resources.Requests) {
klog.V(5).InfoS("ContainerStatus.AllocatedResources length mismatch", "pod", pod.Name, "container", container.Name)
break
}
if !cmp.Equal(container.Resources.Requests, containerStatus.AllocatedResources) {
podResized = true
break
}
}
if !podResized {
return pod
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) {
allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod)
if !updated {
// Pod is not resizing, nothing more to do here.
return allocatedPod, nil
}
kl.podResizeMutex.Lock()
defer kl.podResizeMutex.Unlock()
fit, updatedPod, resizeStatus := kl.canResizePod(pod)
if updatedPod == nil {
return pod
}
fit, resizeStatus := kl.canResizePod(pod)
if fit {
// Update pod resource allocation checkpoint
if err := kl.statusManager.SetPodAllocation(updatedPod); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(updatedPod))
return pod
if err := kl.statusManager.SetPodAllocation(pod); err != nil {
return nil, err
}
allocatedPod = pod
}
if resizeStatus != "" {
// Save resize decision to checkpoint
if err := kl.statusManager.SetPodResizeStatus(updatedPod.UID, resizeStatus); err != nil {
if err := kl.statusManager.SetPodResizeStatus(allocatedPod.UID, resizeStatus); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(updatedPod))
return pod
klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(allocatedPod))
}
updatedPod.Status.Resize = resizeStatus
}
kl.podManager.UpdatePod(updatedPod)
kl.statusManager.SetPodStatus(updatedPod, updatedPod.Status)
return updatedPod
return allocatedPod, nil
}
// LatestLoopEntryTime returns the last time in the sync loop monitor.

View File

@ -207,6 +207,21 @@ func (kl *Kubelet) GetActivePods() []*v1.Pod {
return activePods
}
// getAllocatedPods returns the active pods (see GetActivePods), but updates the pods to their
// allocated state.
func (kl *Kubelet) getAllocatedPods() []*v1.Pod {
activePods := kl.GetActivePods()
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return activePods
}
allocatedPods := make([]*v1.Pod, len(activePods))
for i, pod := range activePods {
allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod)
}
return allocatedPods
}
// makeBlockVolumes maps the raw block devices specified in the path of the container
// Experimental
func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumepathhandler.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) {
@ -2081,12 +2096,14 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
}
}
container := kubecontainer.GetContainerSpec(pod, cName)
// AllocatedResources values come from checkpoint. It is the source-of-truth.
found := false
status.AllocatedResources, found = kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName)
if !(container.Resources.Requests == nil && container.Resources.Limits == nil) && !found {
// Log error and fallback to AllocatedResources in oldStatus if it exists
klog.ErrorS(nil, "resource allocation not found in checkpoint store", "pod", pod.Name, "container", cName)
// Always set the status to the latest allocated resources, even if it differs from the
// allocation used by the current sync loop.
alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName)
if found {
status.AllocatedResources = alloc.Requests
} else if !(container.Resources.Requests == nil && container.Resources.Limits == nil) {
// This case is expected for ephemeral containers.
if oldStatusFound {
status.AllocatedResources = oldStatus.AllocatedResources
}
@ -2107,46 +2124,46 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
}
// Convert Limits
if container.Resources.Limits != nil {
if alloc.Limits != nil {
limits = make(v1.ResourceList)
if cStatus.Resources != nil && cStatus.Resources.CPULimit != nil {
limits[v1.ResourceCPU] = cStatus.Resources.CPULimit.DeepCopy()
} else {
determineResource(v1.ResourceCPU, container.Resources.Limits, oldStatus.Resources.Limits, limits)
determineResource(v1.ResourceCPU, alloc.Limits, oldStatus.Resources.Limits, limits)
}
if cStatus.Resources != nil && cStatus.Resources.MemoryLimit != nil {
limits[v1.ResourceMemory] = cStatus.Resources.MemoryLimit.DeepCopy()
} else {
determineResource(v1.ResourceMemory, container.Resources.Limits, oldStatus.Resources.Limits, limits)
determineResource(v1.ResourceMemory, alloc.Limits, oldStatus.Resources.Limits, limits)
}
if ephemeralStorage, found := container.Resources.Limits[v1.ResourceEphemeralStorage]; found {
if ephemeralStorage, found := alloc.Limits[v1.ResourceEphemeralStorage]; found {
limits[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy()
}
if storage, found := container.Resources.Limits[v1.ResourceStorage]; found {
if storage, found := alloc.Limits[v1.ResourceStorage]; found {
limits[v1.ResourceStorage] = storage.DeepCopy()
}
convertCustomResources(container.Resources.Limits, limits)
convertCustomResources(alloc.Limits, limits)
}
// Convert Requests
if status.AllocatedResources != nil {
if alloc.Requests != nil {
requests = make(v1.ResourceList)
if cStatus.Resources != nil && cStatus.Resources.CPURequest != nil {
requests[v1.ResourceCPU] = cStatus.Resources.CPURequest.DeepCopy()
} else {
determineResource(v1.ResourceCPU, status.AllocatedResources, oldStatus.Resources.Requests, requests)
determineResource(v1.ResourceCPU, alloc.Requests, oldStatus.Resources.Requests, requests)
}
if memory, found := status.AllocatedResources[v1.ResourceMemory]; found {
if memory, found := alloc.Requests[v1.ResourceMemory]; found {
requests[v1.ResourceMemory] = memory.DeepCopy()
}
if ephemeralStorage, found := status.AllocatedResources[v1.ResourceEphemeralStorage]; found {
if ephemeralStorage, found := alloc.Requests[v1.ResourceEphemeralStorage]; found {
requests[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy()
}
if storage, found := status.AllocatedResources[v1.ResourceStorage]; found {
if storage, found := alloc.Requests[v1.ResourceStorage]; found {
requests[v1.ResourceStorage] = storage.DeepCopy()
}
convertCustomResources(status.AllocatedResources, requests)
convertCustomResources(alloc.Requests, requests)
}
resources := &v1.ResourceRequirements{

View File

@ -2391,6 +2391,13 @@ func TestPodResourceAllocationReset(t *testing.T) {
kubelet := testKubelet.kubelet
kubelet.statusManager = status.NewFakeManager()
// fakePodWorkers trigger syncPodFn synchronously on update, but entering
// kubelet.SyncPod while holding the podResizeMutex can lead to deadlock.
kubelet.podWorkers.(*fakePodWorkers).syncPodFn =
func(_ context.Context, _ kubetypes.SyncPodType, _, _ *v1.Pod, _ *kubecontainer.PodStatus) (bool, error) {
return false, nil
}
nodes := []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
@ -2447,8 +2454,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Having both memory and cpu, resource allocation not exists",
pod: podWithUIDNameNsSpec("1", "pod1", "foo", *cpu500mMem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"1": map[string]v1.ResourceList{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests,
"1": map[string]v1.ResourceRequirements{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources,
},
},
},
@ -2457,8 +2464,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
pod: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"2": map[string]v1.ResourceList{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests,
"2": map[string]v1.ResourceRequirements{
cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources,
},
},
},
@ -2467,8 +2474,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
pod: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu500mMem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu800mMem800MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"3": map[string]v1.ResourceList{
cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources.Requests,
"3": map[string]v1.ResourceRequirements{
cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources,
},
},
},
@ -2476,8 +2483,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Only has cpu, resource allocation not exists",
pod: podWithUIDNameNsSpec("4", "pod5", "foo", *cpu500mPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"4": map[string]v1.ResourceList{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests,
"4": map[string]v1.ResourceRequirements{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources,
},
},
},
@ -2486,8 +2493,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
pod: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"5": map[string]v1.ResourceList{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests,
"5": map[string]v1.ResourceRequirements{
cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources,
},
},
},
@ -2496,8 +2503,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
pod: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu500mPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu800mPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"6": map[string]v1.ResourceList{
cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources.Requests,
"6": map[string]v1.ResourceRequirements{
cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources,
},
},
},
@ -2505,8 +2512,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "Only has memory, resource allocation not exists",
pod: podWithUIDNameNsSpec("7", "pod7", "foo", *mem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"7": map[string]v1.ResourceList{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests,
"7": map[string]v1.ResourceRequirements{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources,
},
},
},
@ -2515,8 +2522,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
pod: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"8": map[string]v1.ResourceList{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests,
"8": map[string]v1.ResourceRequirements{
mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources,
},
},
},
@ -2525,8 +2532,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
pod: podWithUIDNameNsSpec("9", "pod9", "foo", *mem500MPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("9", "pod9", "foo", *mem800MPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"9": map[string]v1.ResourceList{
mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources.Requests,
"9": map[string]v1.ResourceRequirements{
mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources,
},
},
},
@ -2534,8 +2541,8 @@ func TestPodResourceAllocationReset(t *testing.T) {
name: "No CPU and memory, resource allocation not exists",
pod: podWithUIDNameNsSpec("10", "pod10", "foo", *emptyPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"10": map[string]v1.ResourceList{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests,
"10": map[string]v1.ResourceRequirements{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources,
},
},
},
@ -2544,27 +2551,29 @@ func TestPodResourceAllocationReset(t *testing.T) {
pod: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec),
existingPodAllocation: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec),
expectedPodResourceAllocation: state.PodResourceAllocation{
"11": map[string]v1.ResourceList{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests,
"11": map[string]v1.ResourceRequirements{
emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources,
},
},
},
}
for _, tc := range tests {
if tc.existingPodAllocation != nil {
// when kubelet restarts, AllocatedResources has already existed before adding pod
err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation)
if err != nil {
t.Fatalf("failed to set pod allocation: %v", err)
t.Run(tc.name, func(t *testing.T) {
if tc.existingPodAllocation != nil {
// when kubelet restarts, AllocatedResources has already existed before adding pod
err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation)
if err != nil {
t.Fatalf("failed to set pod allocation: %v", err)
}
}
}
kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
if !found {
t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
}
assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name)
allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
if !found {
t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
}
assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name)
})
}
}
@ -2573,7 +2582,6 @@ func TestHandlePodResourcesResize(t *testing.T) {
testKubelet := newTestKubelet(t, false)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.statusManager = status.NewFakeManager()
cpu500m := resource.MustParse("500m")
cpu1000m := resource.MustParse("1")
@ -2715,14 +2723,24 @@ func TestHandlePodResourcesResize(t *testing.T) {
}
for _, tt := range tests {
tt.pod.Spec.Containers[0].Resources.Requests = tt.newRequests
tt.pod.Status.ContainerStatuses[0].AllocatedResources = v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M}
kubelet.handlePodResourcesResize(tt.pod)
updatedPod, found := kubelet.podManager.GetPodByName(tt.pod.Namespace, tt.pod.Name)
assert.True(t, found, "expected to find pod %s", tt.pod.Name)
assert.Equal(t, tt.expectedAllocations, updatedPod.Status.ContainerStatuses[0].AllocatedResources, tt.name)
assert.Equal(t, tt.expectedResize, updatedPod.Status.Resize, tt.name)
testKubelet.fakeKubeClient.ClearActions()
t.Run(tt.name, func(t *testing.T) {
kubelet.statusManager = status.NewFakeManager()
require.NoError(t, kubelet.statusManager.SetPodAllocation(tt.pod))
pod := tt.pod.DeepCopy()
pod.Spec.Containers[0].Resources.Requests = tt.newRequests
updatedPod, err := kubelet.handlePodResourcesResize(pod)
require.NoError(t, err)
assert.Equal(t, tt.expectedAllocations, updatedPod.Spec.Containers[0].Resources.Requests, "updated pod spec resources")
alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(pod.UID), pod.Spec.Containers[0].Name)
require.True(t, found, "container allocation")
assert.Equal(t, tt.expectedAllocations, alloc.Requests, "stored container allocation")
resizeStatus, found := kubelet.statusManager.GetPodResizeStatus(string(pod.UID))
require.True(t, found, "pod resize status")
assert.Equal(t, tt.expectedResize, resizeStatus)
})
}
}
@ -3274,3 +3292,132 @@ func TestSyncPodSpans(t *testing.T) {
assert.Equalf(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), "runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID())
}
}
func TestIsPodResizeInProgress(t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345",
Name: "test",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "c1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
},
},
}, {
Name: "c2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
}},
},
}
steadyStateC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(400, resource.DecimalSI),
},
}
resizeMemC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(800, resource.DecimalSI),
},
}
resizeCPUReqC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(200, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(300, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(400, resource.DecimalSI),
},
}
resizeCPULimitC1Status := &kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(100, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(600, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(400, resource.DecimalSI),
},
}
steadyStateC2Status := &kubecontainer.Status{
Name: "c2",
State: kubecontainer.ContainerStateRunning,
Resources: &kubecontainer.ContainerResources{
CPURequest: resource.NewMilliQuantity(500, resource.DecimalSI),
CPULimit: resource.NewMilliQuantity(700, resource.DecimalSI),
MemoryLimit: resource.NewQuantity(800, resource.DecimalSI),
},
}
mkPodStatus := func(containerStatuses ...*kubecontainer.Status) *kubecontainer.PodStatus {
return &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
ContainerStatuses: containerStatuses,
}
}
tests := []struct {
name string
status *kubecontainer.PodStatus
expectResize bool
}{{
name: "steady state",
status: mkPodStatus(steadyStateC1Status, steadyStateC2Status),
expectResize: false,
}, {
name: "terminated container",
status: mkPodStatus(&kubecontainer.Status{
Name: "c1",
State: kubecontainer.ContainerStateExited,
Resources: resizeMemC1Status.Resources,
}, steadyStateC2Status),
expectResize: false,
}, {
name: "missing container",
status: mkPodStatus(steadyStateC2Status),
expectResize: false,
}, {
name: "resizing memory limit",
status: mkPodStatus(resizeMemC1Status, steadyStateC2Status),
expectResize: true,
}, {
name: "resizing cpu request",
status: mkPodStatus(resizeCPUReqC1Status, steadyStateC2Status),
expectResize: true,
}, {
name: "resizing cpu limit",
status: mkPodStatus(resizeCPULimitC1Status, steadyStateC2Status),
expectResize: true,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expectResize, isPodResizeInProgress(pod, test.status))
})
}
}

View File

@ -805,291 +805,67 @@ func TestGenerateLinuxContainerResources(t *testing.T) {
},
},
},
Status: v1.PodStatus{},
}
for _, tc := range []struct {
name string
scalingFg bool
limits v1.ResourceList
requests v1.ResourceList
cStatus []v1.ContainerStatus
expected *runtimeapi.LinuxContainerResources
cgroupVersion CgroupVersion
}{
{
"requests & limits, cpu & memory, guaranteed qos - no container status",
true,
"requests & limits, cpu & memory, guaranteed qos",
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"requests & limits, cpu & memory, burstable qos - no container status",
true,
"requests & limits, cpu & memory, burstable qos",
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970},
cgroupV1,
},
{
"best-effort qos - no container status",
true,
"best-effort qos",
nil,
nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - empty resources container status",
true,
"requests & limits, cpu & memory, guaranteed qos",
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"requests & limits, cpu & memory, burstable qos - empty resources container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999},
cgroupV1,
},
{
"best-effort qos - empty resources container status",
true,
nil,
nil,
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - no container status",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997},
cgroupV1,
},
{
"best-effort qos - no container status",
false,
nil,
nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000},
cgroupV1,
},
{
"requests & limits, cpu & memory, guaranteed qos - no container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, burstable qos - no container status",
true,
"requests & limits, cpu & memory, burstable qos",
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"best-effort qos - no container status",
true,
"best-effort qos",
nil,
nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - empty resources container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, burstable qos - empty resources container status",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 999, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"best-effort qos - empty resources container status",
true,
nil,
nil,
[]v1.ContainerStatus{{Name: "c1"}},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
true,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - no container status",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, burstable qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("750Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 256, MemoryLimitInBytes: 786432000, OomScoreAdj: 970, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"requests & limits, cpu & memory, guaranteed qos - container status with allocatedResources",
false,
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
[]v1.ContainerStatus{
{
Name: "c1",
AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m"), v1.ResourceMemory: resource.MustParse("500Mi")},
},
},
&runtimeapi.LinuxContainerResources{CpuShares: 204, MemoryLimitInBytes: 524288000, OomScoreAdj: -997, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
{
"best-effort qos - no container status",
false,
nil,
nil,
[]v1.ContainerStatus{},
&runtimeapi.LinuxContainerResources{CpuShares: 2, OomScoreAdj: 1000, Unified: map[string]string{"memory.oom.group": "1"}},
cgroupV2,
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Run(fmt.Sprintf("cgroup%s:%s", tc.cgroupVersion, tc.name), func(t *testing.T) {
defer setSwapControllerAvailableDuringTest(false)()
if tc.scalingFg {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
}
setCgroupVersionDuringTest(tc.cgroupVersion)
pod.Spec.Containers[0].Resources = v1.ResourceRequirements{Limits: tc.limits, Requests: tc.requests}
if len(tc.cStatus) > 0 {
pod.Status.ContainerStatuses = tc.cStatus
}
resources := m.generateLinuxContainerResources(pod, &pod.Spec.Containers[0], false)
tc.expected.HugepageLimits = resources.HugepageLimits
if !cmp.Equal(resources, tc.expected) {
t.Errorf("Test %s: expected resources %+v, but got %+v", tc.name, tc.expected, resources)
}
assert.Equal(t, tc.expected, resources)
})
}
}

View File

@ -26,7 +26,6 @@ import (
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel/trace"
grpcstatus "google.golang.org/grpc/status"
crierror "k8s.io/cri-api/pkg/errors"
@ -47,7 +46,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/credentialprovider/plugin"
"k8s.io/kubernetes/pkg/features"
@ -541,7 +539,7 @@ func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) boo
return cStatus.State == kubecontainer.ContainerStateExited && cStatus.ExitCode == 0
}
func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool {
func IsInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return false
}
@ -551,60 +549,49 @@ func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool {
return true
}
func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool {
// computePodResizeAction determines the actions required (if any) to resize the given container.
// Returns whether to keep (true) or restart (false) the container.
func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) (keepContainer bool) {
container := pod.Spec.Containers[containerIdx]
if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 {
return true
}
// Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired)
// with v1.Status.Resources / runtime.Status.Resources (last known actual).
// Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources.
// Skip if runtime containerID doesn't match pod.Status containerID (container is restarting)
apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name)
if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil ||
kubeContainerStatus.State != kubecontainer.ContainerStateRunning ||
kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID ||
!cmp.Equal(container.Resources.Requests, apiContainerStatus.AllocatedResources) {
if kubeContainerStatus.State != kubecontainer.ContainerStateRunning {
return true
}
desiredMemoryLimit := container.Resources.Limits.Memory().Value()
desiredCPULimit := container.Resources.Limits.Cpu().MilliValue()
desiredCPURequest := container.Resources.Requests.Cpu().MilliValue()
currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value()
currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue()
currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue()
// Runtime container status resources (from CRI), if set, supercedes v1(api) container status resrouces.
if kubeContainerStatus.Resources != nil {
if kubeContainerStatus.Resources.MemoryLimit != nil {
currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value()
}
if kubeContainerStatus.Resources.CPULimit != nil {
currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue()
}
if kubeContainerStatus.Resources.CPURequest != nil {
currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue()
}
}
// Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during
// handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest.
if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest {
if kubeContainerStatus.Resources == nil {
// Not enough information to actuate a resize.
klog.V(4).InfoS("Missing runtime resource information for container", "pod", klog.KObj(pod), "container", container.Name)
return true
}
desiredResources := containerResources{
memoryLimit: desiredMemoryLimit,
memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(),
cpuLimit: desiredCPULimit,
cpuRequest: desiredCPURequest,
memoryLimit: container.Resources.Limits.Memory().Value(),
memoryRequest: container.Resources.Requests.Memory().Value(),
cpuLimit: container.Resources.Limits.Cpu().MilliValue(),
cpuRequest: container.Resources.Requests.Cpu().MilliValue(),
}
currentResources := containerResources{
memoryLimit: currentMemoryLimit,
memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(),
cpuLimit: currentCPULimit,
cpuRequest: currentCPURequest,
// Default current values to the desired values so that a resize isn't triggered for missing values.
currentResources := desiredResources
if kubeContainerStatus.Resources.MemoryLimit != nil {
currentResources.memoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value()
}
if kubeContainerStatus.Resources.CPULimit != nil {
currentResources.cpuLimit = kubeContainerStatus.Resources.CPULimit.MilliValue()
}
if kubeContainerStatus.Resources.CPURequest != nil {
currentResources.cpuRequest = kubeContainerStatus.Resources.CPURequest.MilliValue()
}
// Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during
// handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest.
if currentResources == desiredResources {
// No resize required.
return true
}
resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy)
@ -637,9 +624,9 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe
changes.ContainersToUpdate[rName][0] = cUpdateInfo
}
}
resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit)
resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit)
resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest)
if restartCPULim || restartCPUReq || restartMemLim {
// resize policy requires this container to restart
changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{
@ -652,12 +639,12 @@ func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containe
return false
} else {
if resizeMemLim {
markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
markContainerForUpdate(v1.ResourceMemory, desiredResources.memoryLimit, currentResources.memoryLimit)
}
if resizeCPULim {
markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuLimit, currentResources.cpuLimit)
} else if resizeCPUReq {
markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
markContainerForUpdate(v1.ResourceCPU, desiredResources.cpuRequest, currentResources.cpuRequest)
}
}
return true
@ -940,12 +927,8 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
}
}
if isInPlacePodVerticalScalingAllowed(pod) {
if IsInPlacePodVerticalScalingAllowed(pod) {
changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo)
latestPodStatus, err := m.GetPodStatus(ctx, podStatus.ID, pod.Name, pod.Namespace)
if err == nil {
podStatus = latestPodStatus
}
}
// Number of running containers to keep.
@ -1002,7 +985,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
// If the container failed the startup probe, we should kill it.
message = fmt.Sprintf("Container %s failed startup probe", container.Name)
reason = reasonStartupProbe
} else if isInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) {
} else if IsInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) {
// computePodResizeAction updates 'changes' if resize policy requires restarting this container
continue
} else {
@ -1319,7 +1302,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
}
// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
if isInPlacePodVerticalScalingAllowed(pod) {
if IsInPlacePodVerticalScalingAllowed(pod) {
if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
}

View File

@ -52,6 +52,7 @@ import (
imagetypes "k8s.io/kubernetes/pkg/kubelet/images"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/ptr"
)
var (
@ -2174,7 +2175,7 @@ func makeBasePodAndStatusWithInitAndEphemeralContainers() (*v1.Pod, *kubecontain
func TestComputePodActionsForPodResize(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
fakeRuntime, _, m, err := createTestRuntimeManager()
_, _, m, err := createTestRuntimeManager()
m.machineInfo.MemoryCapacity = 17179860387 // 16GB
assert.NoError(t, err)
@ -2188,18 +2189,19 @@ func TestComputePodActionsForPodResize(t *testing.T) {
memPolicyRestartRequired := v1.ContainerResizePolicy{ResourceName: v1.ResourceMemory, RestartPolicy: v1.RestartContainer}
for desc, test := range map[string]struct {
podResizePolicyFn func(*v1.Pod)
mutatePodFn func(*v1.Pod)
setupFn func(*v1.Pod, *kubecontainer.PodStatus)
getExpectedPodActionsFn func(*v1.Pod, *kubecontainer.PodStatus) *podActions
}{
"Update container CPU and memory resources": {
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[1]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
}
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu200m.DeepCopy()),
MemoryLimit: ptr.To(mem200M.DeepCopy()),
}
}
},
@ -2244,13 +2246,15 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
"Update container CPU resources": {
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[1]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
}
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu200m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
}
}
},
@ -2281,13 +2285,15 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
"Update container memory resources": {
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[2].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[2]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M},
}
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu200m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
}
}
},
@ -2318,12 +2324,15 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
"Nothing when spec.Resources and status.Resources are equal": {
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[1]
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m},
}
pod.Status.ContainerStatuses[1].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu200m.DeepCopy()),
}
}
},
getExpectedPodActionsFn: func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *podActions {
@ -2337,16 +2346,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
"Update container CPU and memory resources with Restart policy for CPU": {
podResizePolicyFn: func(pod *v1.Pod) {
pod.Spec.Containers[0].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired}
},
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[0].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[0]
c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired}
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M},
}
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[0].Name); found {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
}
}
},
@ -2368,16 +2377,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
"Update container CPU and memory resources with Restart policy for memory": {
podResizePolicyFn: func(pod *v1.Pod) {
pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired}
},
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[2].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[2]
c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired}
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem200M},
}
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
}
}
},
@ -2399,16 +2408,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
"Update container memory resources with Restart policy for CPU": {
podResizePolicyFn: func(pod *v1.Pod) {
pod.Spec.Containers[1].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired}
},
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[1].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[1]
c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartRequired, memPolicyRestartNotRequired}
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem200M},
}
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[1].Name); found {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
}
}
},
@ -2439,16 +2448,16 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
"Update container CPU resources with Restart policy for memory": {
podResizePolicyFn: func(pod *v1.Pod) {
pod.Spec.Containers[2].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired}
},
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.Containers[2].Resources = v1.ResourceRequirements{
setupFn: func(pod *v1.Pod, status *kubecontainer.PodStatus) {
c := &pod.Spec.Containers[2]
c.ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartRequired}
c.Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu200m, v1.ResourceMemory: mem100M},
}
if idx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[2].Name); found {
pod.Status.ContainerStatuses[idx].Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: cpu100m, v1.ResourceMemory: mem100M},
if cStatus := status.FindContainerStatusByName(c.Name); cStatus != nil {
cStatus.Resources = &kubecontainer.ContainerResources{
CPULimit: ptr.To(cpu100m.DeepCopy()),
MemoryLimit: ptr.To(mem100M.DeepCopy()),
}
}
},
@ -2479,41 +2488,28 @@ func TestComputePodActionsForPodResize(t *testing.T) {
},
},
} {
pod, kps := makeBasePodAndStatus()
for idx := range pod.Spec.Containers {
// default resize policy when pod resize feature is enabled
pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired}
}
if test.podResizePolicyFn != nil {
test.podResizePolicyFn(pod)
}
for idx := range pod.Spec.Containers {
// compute hash
if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil {
kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx])
t.Run(desc, func(t *testing.T) {
pod, status := makeBasePodAndStatus()
for idx := range pod.Spec.Containers {
// default resize policy when pod resize feature is enabled
pod.Spec.Containers[idx].ResizePolicy = []v1.ContainerResizePolicy{cpuPolicyRestartNotRequired, memPolicyRestartNotRequired}
}
}
makeAndSetFakePod(t, m, fakeRuntime, pod)
ctx := context.Background()
status, _ := m.GetPodStatus(ctx, kps.ID, pod.Name, pod.Namespace)
for idx := range pod.Spec.Containers {
if rcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); rcs != nil {
if csIdx, found := podutil.GetIndexOfContainerStatus(pod.Status.ContainerStatuses, pod.Spec.Containers[idx].Name); found {
pod.Status.ContainerStatuses[csIdx].ContainerID = rcs.ID.String()
if test.setupFn != nil {
test.setupFn(pod, status)
}
for idx := range pod.Spec.Containers {
// compute hash
if kcs := status.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil {
kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx])
}
}
}
for idx := range pod.Spec.Containers {
if kcs := kps.FindContainerStatusByName(pod.Spec.Containers[idx].Name); kcs != nil {
kcs.Hash = kubecontainer.HashContainer(&pod.Spec.Containers[idx])
}
}
if test.mutatePodFn != nil {
test.mutatePodFn(pod)
}
expectedActions := test.getExpectedPodActionsFn(pod, status)
actions := m.computePodActions(ctx, pod, status)
verifyActions(t, expectedActions, &actions, desc)
ctx := context.Background()
expectedActions := test.getExpectedPodActionsFn(pod, status)
actions := m.computePodActions(ctx, pod, status)
verifyActions(t, expectedActions, &actions, desc)
})
}
}

View File

@ -18,10 +18,7 @@ package qos
import (
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/types"
)
@ -40,6 +37,8 @@ const (
// multiplied by 10 (barring exceptional cases) + a configurable quantity which is between -1000
// and 1000. Containers with higher OOM scores are killed if the system runs out of memory.
// See https://lwn.net/Articles/391222/ for more information.
// OOMScoreAdjust should be calculated based on the allocated resources, so the pod argument should
// contain the allocated resources in the spec.
func GetContainerOOMScoreAdjust(pod *v1.Pod, container *v1.Container, memoryCapacity int64) int {
if types.IsNodeCriticalPod(pod) {
// Only node critical pod should be the last to get killed.
@ -63,11 +62,6 @@ func GetContainerOOMScoreAdjust(pod *v1.Pod, container *v1.Container, memoryCapa
// targets for OOM kills.
// Note that this is a heuristic, it won't work if a container has many small processes.
memoryRequest := container.Resources.Requests.Memory().Value()
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
memoryRequest = cs.AllocatedResources.Memory().Value()
}
}
oomScoreAdjust := 1000 - (1000*memoryRequest)/memoryCapacity
// A guaranteed pod using 100% of memory can have an OOM score of 10. Ensure
// that burstable pods have a higher OOM score adjustment.

View File

@ -63,31 +63,31 @@ func (m *fakeManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
return
}
func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) {
func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
klog.InfoS("GetContainerResourceAllocation()")
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
func (m *fakeManager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
klog.InfoS("GetPodResizeStatus()")
return "", false
return m.state.GetPodResizeStatus(podUID)
}
func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
allocs := m.state.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs)
}
func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
klog.InfoS("SetPodAllocation()")
for _, container := range pod.Spec.Containers {
var alloc v1.ResourceList
if container.Resources.Requests != nil {
alloc = container.Resources.Requests.DeepCopy()
}
alloc := *container.Resources.DeepCopy()
m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc)
}
return nil
}
func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) error {
klog.InfoS("SetPodResizeStatus()")
return nil
return m.state.SetPodResizeStatus(string(podUID), resizeStatus)
}
// NewFakeManager creates empty/fake memory manager

View File

@ -19,7 +19,7 @@ package state
import (
"encoding/json"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
@ -28,16 +28,16 @@ var _ checkpointmanager.Checkpoint = &PodResourceAllocationCheckpoint{}
// PodResourceAllocationCheckpoint is used to store resources allocated to a pod in checkpoint
type PodResourceAllocationCheckpoint struct {
AllocationEntries map[string]map[string]v1.ResourceList `json:"allocationEntries,omitempty"`
ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// NewPodResourceAllocationCheckpoint returns an instance of Checkpoint
func NewPodResourceAllocationCheckpoint() *PodResourceAllocationCheckpoint {
//lint:ignore unexported-type-in-api user-facing error message
return &PodResourceAllocationCheckpoint{
AllocationEntries: make(map[string]map[string]v1.ResourceList),
AllocationEntries: make(map[string]map[string]v1.ResourceRequirements),
ResizeStatusEntries: make(map[string]v1.PodResizeStatus),
}
}

View File

@ -17,11 +17,11 @@ limitations under the License.
package state
import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
type PodResourceAllocation map[string]map[string]v1.ResourceList
type PodResourceAllocation map[string]map[string]v1.ResourceRequirements
// PodResizeStatus type is used in tracking the last resize decision for pod
type PodResizeStatus map[string]v1.PodResizeStatus
@ -30,9 +30,9 @@ type PodResizeStatus map[string]v1.PodResizeStatus
func (pr PodResourceAllocation) Clone() PodResourceAllocation {
prCopy := make(PodResourceAllocation)
for pod := range pr {
prCopy[pod] = make(map[string]v1.ResourceList)
prCopy[pod] = make(map[string]v1.ResourceRequirements)
for container, alloc := range pr[pod] {
prCopy[pod][container] = alloc.DeepCopy()
prCopy[pod][container] = *alloc.DeepCopy()
}
}
return prCopy
@ -40,14 +40,14 @@ func (pr PodResourceAllocation) Clone() PodResourceAllocation {
// Reader interface used to read current pod resource allocation state
type Reader interface {
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool)
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
GetPodResourceAllocation() PodResourceAllocation
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool)
GetResizeStatus() PodResizeStatus
}
type writer interface {
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(PodResourceAllocation) error
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) error
SetResizeStatus(PodResizeStatus) error

View File

@ -21,7 +21,7 @@ import (
"path"
"sync"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
@ -82,7 +82,7 @@ func (sc *stateCheckpoint) storeState() error {
podAllocation := sc.cache.GetPodResourceAllocation()
for pod := range podAllocation {
checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceList)
checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceRequirements)
for container, alloc := range podAllocation[pod] {
checkpoint.AllocationEntries[pod][container] = alloc
}
@ -103,7 +103,7 @@ func (sc *stateCheckpoint) storeState() error {
}
// GetContainerResourceAllocation returns current resources allocated to a pod's container
func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) {
func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
sc.mux.RLock()
defer sc.mux.RUnlock()
return sc.cache.GetContainerResourceAllocation(podUID, containerName)
@ -131,7 +131,7 @@ func (sc *stateCheckpoint) GetResizeStatus() PodResizeStatus {
}
// SetContainerResourceAllocation sets resources allocated to a pod's container
func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error {
func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc)
@ -185,8 +185,8 @@ func NewNoopStateCheckpoint() State {
return &noopStateCheckpoint{}
}
func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ string, _ string) (v1.ResourceList, bool) {
return nil, false
func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ string, _ string) (v1.ResourceRequirements, bool) {
return v1.ResourceRequirements{}, false
}
func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
@ -201,7 +201,7 @@ func (sc *noopStateCheckpoint) GetResizeStatus() PodResizeStatus {
return nil
}
func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceList) error {
func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string, _ v1.ResourceRequirements) error {
return nil
}

View File

@ -19,7 +19,7 @@ package state
import (
"sync"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
@ -40,12 +40,12 @@ func NewStateMemory() State {
}
}
func (s *stateMemory) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) {
func (s *stateMemory) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
s.RLock()
defer s.RUnlock()
alloc, ok := s.podAllocation[podUID][containerName]
return alloc.DeepCopy(), ok
return *alloc.DeepCopy(), ok
}
func (s *stateMemory) GetPodResourceAllocation() PodResourceAllocation {
@ -72,12 +72,12 @@ func (s *stateMemory) GetResizeStatus() PodResizeStatus {
return prs
}
func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceList) error {
func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error {
s.Lock()
defer s.Unlock()
if _, ok := s.podAllocation[podUID]; !ok {
s.podAllocation[podUID] = make(map[string]v1.ResourceList)
s.podAllocation[podUID] = make(map[string]v1.ResourceRequirements)
}
s.podAllocation[podUID][containerName] = alloc

View File

@ -143,17 +143,27 @@ type Manager interface {
// the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
// GetContainerResourceAllocation returns checkpointed AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool)
// GetPodResizeStatus returns checkpointed PodStatus.Resize value
GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) error
allocationManager
}
// TODO(tallclair): Refactor allocation state handling out of the status manager.
type allocationManager interface {
// GetContainerResourceAllocation returns the checkpointed AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
// Returns the updated (or original) pod, and whether there was an allocation stored.
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error
}
const syncPeriod = 10 * time.Second
@ -236,12 +246,45 @@ func (m *manager) Start() {
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceList, bool) {
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
allocs := m.state.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs)
}
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
allocated, found := allocs[string(pod.UID)]
if !found {
return pod, false
}
updated := false
for i, c := range pod.Spec.Containers {
if cAlloc, ok := allocated[c.Name]; ok {
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
// Allocation differs from pod spec, update
if !updated {
// If this is the first update, copy the pod
pod = pod.DeepCopy()
updated = true
}
pod.Spec.Containers[i].Resources = cAlloc
}
}
}
return pod, updated
}
// GetPodResizeStatus returns the last checkpointed ResizeStaus value
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetPodResizeStatus(podUID string) (v1.PodResizeStatus, bool) {
@ -255,10 +298,7 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
for _, container := range pod.Spec.Containers {
var alloc v1.ResourceList
if container.Resources.Requests != nil {
alloc = container.Resources.Requests.DeepCopy()
}
alloc := *container.Resources.DeepCopy()
if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil {
return err
}

View File

@ -32,6 +32,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -42,6 +43,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status/state"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
@ -2034,6 +2036,105 @@ func TestMergePodStatus(t *testing.T) {
}
func TestUpdatePodFromAllocation(t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345",
Name: "test",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "c1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
},
},
}, {
Name: "c2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
}},
},
}
resizedPod := pod.DeepCopy()
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
tests := []struct {
name string
pod *v1.Pod
allocs state.PodResourceAllocation
expectPod *v1.Pod
expectUpdate bool
}{{
name: "steady state",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "no allocations",
pod: pod,
allocs: state.PodResourceAllocation{},
expectUpdate: false,
}, {
name: "missing container allocation",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "resized container",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: true,
expectPod: resizedPod,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pod := test.pod.DeepCopy()
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
if test.expectUpdate {
assert.True(t, updated, "updated")
assert.Equal(t, test.expectPod, allocatedPod)
assert.NotEqual(t, pod, allocatedPod)
} else {
assert.False(t, updated, "updated")
assert.Same(t, pod, allocatedPod)
}
})
}
}
func statusEqual(left, right v1.PodStatus) bool {
left.Conditions = nil
right.Conditions = nil