Set pod watch conditions for resize

This commit is contained in:
Tim Allclair 2024-11-03 13:16:27 -08:00
parent f4d36dd402
commit da9c2c553b
8 changed files with 36 additions and 34 deletions

View File

@ -66,6 +66,9 @@ type RuntimeHelper interface {
// UnprepareDynamicResources unprepares resources for a a pod. // UnprepareDynamicResources unprepares resources for a a pod.
UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error
// SetPodWatchCondition flags a pod to be inspected until the condition is met.
SetPodWatchCondition(types.UID, string, func(*PodStatus) bool)
} }
// ShouldContainerBeRestarted checks whether a container needs to be restarted. // ShouldContainerBeRestarted checks whether a container needs to be restarted.

View File

@ -114,3 +114,7 @@ func (f *FakeRuntimeHelper) PrepareDynamicResources(ctx context.Context, pod *v1
func (f *FakeRuntimeHelper) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { func (f *FakeRuntimeHelper) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return nil return nil
} }
func (f *FakeRuntimeHelper) SetPodWatchCondition(_ kubetypes.UID, _ string, _ func(*kubecontainer.PodStatus) bool) {
// Not implemented.
}

View File

@ -1980,17 +1980,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
return false, nil return false, nil
} }
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) {
// While resize is in progress, periodically request the latest status from the runtime via
// the PLEG. This is necessary since ordinarily pod status is only fetched when a container
// undergoes a state transition.
runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil {
klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod))
return false, err
}
}
return false, nil return false, nil
} }
@ -3097,3 +3086,7 @@ func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) {
kl.tryReconcileMirrorPods(staticPod, mirrorPod) kl.tryReconcileMirrorPods(staticPod, mirrorPod)
} }
} }
func (kl *Kubelet) SetPodWatchCondition(podUID types.UID, conditionKey string, condition pleg.WatchCondition) {
kl.pleg.SetPodWatchCondition(podUID, conditionKey, condition)
}

View File

@ -57,6 +57,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pleg"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass" "k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/sysctl" "k8s.io/kubernetes/pkg/kubelet/sysctl"
@ -797,6 +798,22 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res
"pod", format.Pod(pod), "resourceName", resourceName) "pod", format.Pod(pod), "resourceName", resourceName)
return err return err
} }
resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName)
resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool {
if status.Resources == nil {
return false
}
switch resourceName {
case v1.ResourceMemory:
return status.Resources.MemoryLimit.Equal(*container.Resources.Limits.Memory())
case v1.ResourceCPU:
return status.Resources.CPURequest.Equal(*container.Resources.Requests.Cpu()) &&
status.Resources.CPULimit.Equal(*container.Resources.Limits.Cpu())
default:
return true // Shouldn't happen.
}
})
m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition)
// If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime.
// So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources.
// Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted.

View File

@ -427,10 +427,6 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon
metrics.EventedPLEGConnLatency.Observe(duration.Seconds()) metrics.EventedPLEGConnLatency.Observe(duration.Seconds())
} }
func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
return fmt.Errorf("not implemented"), false
}
func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition) e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition)
} }

View File

@ -305,13 +305,11 @@ func (g *GenericPLEG) Relist() {
needsReinspection[pid] = pod needsReinspection[pid] = pod
continue continue
} else { } else if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if !updated { if !updated {
continue continue
} }
} }
}
var completedConditions []versionedWatchCondition var completedConditions []versionedWatchCondition
for _, condition := range watchConditions { for _, condition := range watchConditions {
@ -483,15 +481,6 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
return status, g.cache.Set(pod.ID, status, err, timestamp), err return status, g.cache.Set(pod.ID, status, err, timestamp), err
} }
func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
ctx := context.Background()
if pod == nil {
return fmt.Errorf("pod cannot be nil"), false
}
_, updated, err := g.updateCache(ctx, pod, pid)
return err, updated
}
func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) {
g.watchConditionsLock.Lock() g.watchConditionsLock.Lock()
defer g.watchConditionsLock.Unlock() defer g.watchConditionsLock.Unlock()

View File

@ -766,7 +766,8 @@ func TestWatchConditions(t *testing.T) {
}, },
}, },
}} }}
initialPods := append(pods, &containertest.FakePod{Pod: &kubecontainer.Pod{ initialPods := pods
initialPods = append(initialPods, &containertest.FakePod{Pod: &kubecontainer.Pod{
Name: "terminated-pod", Name: "terminated-pod",
ID: "terminated", ID: "terminated",
Sandboxes: []*kubecontainer.Container{ Sandboxes: []*kubecontainer.Container{
@ -813,8 +814,8 @@ func TestWatchConditions(t *testing.T) {
"updating": {version: 1}, "updating": {version: 1},
}, },
}, { }, {
name: "non-existant pod", name: "non-existent pod",
podUID: "non-existant", podUID: "non-existent",
watchConditions: map[string]WatchCondition{ watchConditions: map[string]WatchCondition{
"watching": neverComplete, "watching": neverComplete,
}, },

View File

@ -66,7 +66,6 @@ type PodLifecycleEventGenerator interface {
Start() Start()
Watch() chan *PodLifecycleEvent Watch() chan *PodLifecycleEvent
Healthy() (bool, error) Healthy() (bool, error)
UpdateCache(*kubecontainer.Pod, types.UID) (error, bool)
// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch
// condition is met. The condition is keyed so it can be updated before the condition // condition is met. The condition is keyed so it can be updated before the condition
// is met. // is met.
@ -83,7 +82,7 @@ type podLifecycleEventGeneratorHandler interface {
} }
// WatchCondition takes the latest PodStatus, and returns whether the condition is met. // WatchCondition takes the latest PodStatus, and returns whether the condition is met.
type WatchCondition func(*kubecontainer.PodStatus) bool type WatchCondition = func(*kubecontainer.PodStatus) bool
// RunningContainerWatchCondition wraps a condition on the container status to make a pod // RunningContainerWatchCondition wraps a condition on the container status to make a pod
// WatchCondition. If the container is no longer running, the condition is implicitly cleared. // WatchCondition. If the container is no longer running, the condition is implicitly cleared.