diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 580ee34893a..fb43305faf2 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -66,6 +66,9 @@ type RuntimeHelper interface { // UnprepareDynamicResources unprepares resources for a a pod. UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error + + // SetPodWatchCondition flags a pod to be inspected until the condition is met. + SetPodWatchCondition(types.UID, string, func(*PodStatus) bool) } // ShouldContainerBeRestarted checks whether a container needs to be restarted. diff --git a/pkg/kubelet/container/testing/fake_runtime_helper.go b/pkg/kubelet/container/testing/fake_runtime_helper.go index b56ea1f2000..47c6bb89edf 100644 --- a/pkg/kubelet/container/testing/fake_runtime_helper.go +++ b/pkg/kubelet/container/testing/fake_runtime_helper.go @@ -114,3 +114,7 @@ func (f *FakeRuntimeHelper) PrepareDynamicResources(ctx context.Context, pod *v1 func (f *FakeRuntimeHelper) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { return nil } + +func (f *FakeRuntimeHelper) SetPodWatchCondition(_ kubetypes.UID, _ string, _ func(*kubecontainer.PodStatus) bool) { + // Not implemented. +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 56cff502cec..7ce055a17d9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1980,17 +1980,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType return false, nil } - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) { - // While resize is in progress, periodically request the latest status from the runtime via - // the PLEG. This is necessary since ordinarily pod status is only fetched when a container - // undergoes a state transition. - runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) - if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { - klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod)) - return false, err - } - } - return false, nil } @@ -3097,3 +3086,7 @@ func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) { kl.tryReconcileMirrorPods(staticPod, mirrorPod) } } + +func (kl *Kubelet) SetPodWatchCondition(podUID types.UID, conditionKey string, condition pleg.WatchCondition) { + kl.pleg.SetPodWatchCondition(podUID, conditionKey, condition) +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index f523b155d2b..ad509c9435c 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/pkg/kubelet/pleg" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/runtimeclass" "k8s.io/kubernetes/pkg/kubelet/sysctl" @@ -797,6 +798,22 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res "pod", format.Pod(pod), "resourceName", resourceName) return err } + resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName) + resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool { + if status.Resources == nil { + return false + } + switch resourceName { + case v1.ResourceMemory: + return status.Resources.MemoryLimit.Equal(*container.Resources.Limits.Memory()) + case v1.ResourceCPU: + return status.Resources.CPURequest.Equal(*container.Resources.Requests.Cpu()) && + status.Resources.CPULimit.Equal(*container.Resources.Limits.Cpu()) + default: + return true // Shouldn't happen. + } + }) + m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition) // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index 56844bc7533..39e13b55223 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -427,10 +427,6 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon metrics.EventedPLEGConnLatency.Observe(duration.Seconds()) } -func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { - return fmt.Errorf("not implemented"), false -} - func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition) } diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index e84d3b99cb2..de5ee494230 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -305,11 +305,9 @@ func (g *GenericPLEG) Relist() { needsReinspection[pid] = pod continue - } else { - if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { - if !updated { - continue - } + } else if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { + if !updated { + continue } } @@ -483,15 +481,6 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p return status, g.cache.Set(pod.ID, status, err, timestamp), err } -func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { - ctx := context.Background() - if pod == nil { - return fmt.Errorf("pod cannot be nil"), false - } - _, updated, err := g.updateCache(ctx, pod, pid) - return err, updated -} - func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 3eb94f0251e..4d7ac651745 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -766,7 +766,8 @@ func TestWatchConditions(t *testing.T) { }, }, }} - initialPods := append(pods, &containertest.FakePod{Pod: &kubecontainer.Pod{ + initialPods := pods + initialPods = append(initialPods, &containertest.FakePod{Pod: &kubecontainer.Pod{ Name: "terminated-pod", ID: "terminated", Sandboxes: []*kubecontainer.Container{ @@ -813,8 +814,8 @@ func TestWatchConditions(t *testing.T) { "updating": {version: 1}, }, }, { - name: "non-existant pod", - podUID: "non-existant", + name: "non-existent pod", + podUID: "non-existent", watchConditions: map[string]WatchCondition{ "watching": neverComplete, }, diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 6edc941dfdc..17e7ec81543 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -66,7 +66,6 @@ type PodLifecycleEventGenerator interface { Start() Watch() chan *PodLifecycleEvent Healthy() (bool, error) - UpdateCache(*kubecontainer.Pod, types.UID) (error, bool) // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch // condition is met. The condition is keyed so it can be updated before the condition // is met. @@ -83,7 +82,7 @@ type podLifecycleEventGeneratorHandler interface { } // WatchCondition takes the latest PodStatus, and returns whether the condition is met. -type WatchCondition func(*kubecontainer.PodStatus) bool +type WatchCondition = func(*kubecontainer.PodStatus) bool // RunningContainerWatchCondition wraps a condition on the container status to make a pod // WatchCondition. If the container is no longer running, the condition is implicitly cleared.