From 07a9ab87bc879f7e426b0ab1f51c1f369bbf43ef Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Wed, 30 Oct 2024 11:58:17 -0700 Subject: [PATCH 1/6] Simplify PLEG relist loops --- pkg/kubelet/pleg/generic.go | 39 +++++++++++-------------------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 41acf15ab65..3eafb2550e8 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -253,27 +253,26 @@ func (g *GenericPLEG) Relist() { updateRunningPodAndContainerMetrics(pods) g.podRecords.setCurrent(pods) - // Compare the old and the current pods, and generate events. - eventsByPodID := map[types.UID][]*PodLifecycleEvent{} + needsReinspection := make(map[types.UID]*kubecontainer.Pod) + for pid := range g.podRecords { + // Compare the old and the current pods, and generate events. oldPod := g.podRecords.getOld(pid) pod := g.podRecords.getCurrent(pid) // Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) + var events []*PodLifecycleEvent for _, container := range allContainers { - events := computeEvents(g.logger, oldPod, pod, &container.ID) - for _, e := range events { - updateEvents(eventsByPodID, e) - } + containerEvents := computeEvents(g.logger, oldPod, pod, &container.ID) + events = append(events, containerEvents...) } - } - needsReinspection := make(map[types.UID]*kubecontainer.Pod) + _, reinspect := g.podsToReinspect[pid] - // If there are events associated with a pod, we should update the - // podCache. - for pid, events := range eventsByPodID { - pod := g.podRecords.getCurrent(pid) + if len(events) == 0 && !reinspect { + // Nothing else needed for this pod. + continue + } // updateCache() will inspect the pod and update the cache. If an // error occurs during the inspection, we want PLEG to retry again @@ -293,10 +292,6 @@ func (g *GenericPLEG) Relist() { continue } else { - // this pod was in the list to reinspect and we did so because it had events, so remove it - // from the list (we don't want the reinspection code below to inspect it a second time in - // this relist execution) - delete(g.podsToReinspect, pid) if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { if !updated { continue @@ -342,18 +337,6 @@ func (g *GenericPLEG) Relist() { } } - // reinspect any pods that failed inspection during the previous relist - if len(g.podsToReinspect) > 0 { - g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection") - for pid, pod := range g.podsToReinspect { - if err, _ := g.updateCache(ctx, pod, pid); err != nil { - // Rely on updateCache calling GetPodStatus to log the actual error. - g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name)) - needsReinspection[pid] = pod - } - } - } - // Update the cache timestamp. This needs to happen *after* // all pods have been properly updated in the cache. g.cache.UpdateTime(timestamp) From f4d36dd402048d42e2f10babe7a86850093014fa Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Wed, 30 Oct 2024 21:07:17 -0700 Subject: [PATCH 2/6] Add WatchCondition concept to the PLEG --- pkg/kubelet/pleg/evented.go | 4 + pkg/kubelet/pleg/generic.go | 127 ++++++++++++++++++++++---- pkg/kubelet/pleg/generic_test.go | 151 +++++++++++++++++++++++++++++++ pkg/kubelet/pleg/pleg.go | 20 ++++ 4 files changed, 284 insertions(+), 18 deletions(-) diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index f20f0a28007..56844bc7533 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -430,3 +430,7 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { return fmt.Errorf("not implemented"), false } + +func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { + e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition) +} diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 3eafb2550e8..e84d3b99cb2 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -80,6 +80,16 @@ type GenericPLEG struct { podCacheMutex sync.Mutex // logger is used for contextual logging logger klog.Logger + // watchConditions tracks pod watch conditions, guarded by watchConditionsLock + // watchConditions is a map of pod UID -> condition key -> condition + watchConditions map[types.UID]map[string]versionedWatchCondition + watchConditionsLock sync.Mutex +} + +type versionedWatchCondition struct { + key string + condition WatchCondition + version uint32 } // plegContainerState has a one-to-one mapping to the @@ -125,13 +135,14 @@ func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChan panic("cache cannot be nil") } return &GenericPLEG{ - logger: logger, - relistDuration: relistDuration, - runtime: runtime, - eventChannel: eventChannel, - podRecords: make(podRecords), - cache: cache, - clock: clock, + logger: logger, + relistDuration: relistDuration, + runtime: runtime, + eventChannel: eventChannel, + podRecords: make(podRecords), + cache: cache, + clock: clock, + watchConditions: make(map[types.UID]map[string]versionedWatchCondition), } } @@ -252,6 +263,7 @@ func (g *GenericPLEG) Relist() { // update running pod and container count updateRunningPodAndContainerMetrics(pods) g.podRecords.setCurrent(pods) + g.cleanupOrphanedWatchConditions() needsReinspection := make(map[types.UID]*kubecontainer.Pod) @@ -267,9 +279,10 @@ func (g *GenericPLEG) Relist() { events = append(events, containerEvents...) } + watchConditions := g.getPodWatchConditions(pid) _, reinspect := g.podsToReinspect[pid] - if len(events) == 0 && !reinspect { + if len(events) == 0 && len(watchConditions) == 0 && !reinspect { // Nothing else needed for this pod. continue } @@ -283,7 +296,8 @@ func (g *GenericPLEG) Relist() { // inspecting the pod and getting the PodStatus to update the cache // serially may take a while. We should be aware of this and // parallelize if needed. - if err, updated := g.updateCache(ctx, pod, pid); err != nil { + status, updated, err := g.updateCache(ctx, pod, pid) + if err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name)) @@ -299,6 +313,14 @@ func (g *GenericPLEG) Relist() { } } + var completedConditions []versionedWatchCondition + for _, condition := range watchConditions { + if condition.condition(status) { + completedConditions = append(completedConditions, condition) + } + } + g.completeWatchConditions(pid, completedConditions) + // Update the internal storage and send out the events. g.podRecords.update(pid) @@ -320,8 +342,6 @@ func (g *GenericPLEG) Relist() { if events[i].Type == ContainerDied { // Fill up containerExitCode map for ContainerDied event when first time appeared if len(containerExitCode) == 0 && pod != nil { - // Get updated podStatus - status, err := g.cache.Get(pod.ID) if err == nil { for _, containerStatus := range status.ContainerStatuses { containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode @@ -410,13 +430,13 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) // updateCache tries to update the pod status in the kubelet cache and returns true if the // pod status was actually updated in the cache. It will return false if the pod status // was ignored by the cache. -func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) { +func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (*kubecontainer.PodStatus, bool, error) { if pod == nil { // The pod is missing in the current relist. This means that // the pod has no visible (active or inactive) containers. g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid)) g.cache.Delete(pid) - return nil, true + return nil, true, nil } g.podCacheMutex.Lock() @@ -460,7 +480,7 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p timestamp = status.TimeStamp } - return err, g.cache.Set(pod.ID, status, err, timestamp) + return status, g.cache.Set(pod.ID, status, err, timestamp), err } func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { @@ -468,14 +488,85 @@ func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, if pod == nil { return fmt.Errorf("pod cannot be nil"), false } - return g.updateCache(ctx, pod, pid) + _, updated, err := g.updateCache(ctx, pod, pid) + return err, updated } -func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { - if e == nil { +func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + if condition == nil { + return // Condition isn't set, nothing to do. + } + conditions = make(map[string]versionedWatchCondition) + } + + versioned, found := conditions[conditionKey] + if found { + versioned.version++ + versioned.condition = condition + conditions[conditionKey] = versioned + } else if condition != nil { + conditions[conditionKey] = versionedWatchCondition{ + key: conditionKey, + condition: condition, + } + } + + g.watchConditions[podUID] = conditions +} + +// getPodWatchConditions returns a list of the active watch conditions for the pod. +func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCondition { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + return nil + } + + filtered := make([]versionedWatchCondition, 0, len(conditions)) + for _, condition := range conditions { + filtered = append(filtered, condition) + } + return filtered +} + +// completeWatchConditions clears the completed watch conditions. +func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + // Pod was deleted, nothing to do. return } - eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e) + + for _, completed := range completedConditions { + condition := conditions[completed.key] + // Only clear the condition if it has not been updated. + if condition.version == completed.version { + delete(conditions, completed.key) + } + } + g.watchConditions[podUID] = conditions +} + +func (g *GenericPLEG) cleanupOrphanedWatchConditions() { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + for podUID := range g.watchConditions { + if g.podRecords.getCurrent(podUID) == nil { + // Pod was deleted, remove it from the watch conditions. + delete(g.watchConditions, podUID) + } + } } func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState { diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 16d5fad4d9e..3eb94f0251e 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -736,3 +736,154 @@ kubelet_running_pods 2 }) } } + +func TestWatchConditions(t *testing.T) { + pods := []*containertest.FakePod{{ + Pod: &kubecontainer.Pod{ + Name: "running-pod", + ID: "running", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + Containers: []*kubecontainer.Container{ + createTestContainer("c", kubecontainer.ContainerStateRunning), + }, + }, + }, { + Pod: &kubecontainer.Pod{ + Name: "terminating-pod", + ID: "terminating", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateExited), + }, + }, + }, { + Pod: &kubecontainer.Pod{ + Name: "reinspect-pod", + ID: "reinspect", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + }, + }} + initialPods := append(pods, &containertest.FakePod{Pod: &kubecontainer.Pod{ + Name: "terminated-pod", + ID: "terminated", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateExited), + }, + }}) + + alwaysComplete := func(_ *kubecontainer.PodStatus) bool { + return true + } + neverComplete := func(_ *kubecontainer.PodStatus) bool { + return false + } + + var pleg *GenericPLEG + var updatingCond WatchCondition + // updatingCond always completes, but updates the condition first. + updatingCond = func(_ *kubecontainer.PodStatus) bool { + pleg.SetPodWatchCondition("running", "updating", updatingCond) + return true + } + + testCases := []struct { + name string + podUID types.UID + watchConditions map[string]WatchCondition + expectEvaluated bool // Whether the watch conditions should be evaluated + expectRemoved bool // Whether podUID should be present in the watch conditions map + expectWatchConditions map[string]versionedWatchCondition // The expected watch conditions for the podUIDa (only key & version checked) + }{{ + name: "no watch conditions", + podUID: "running", + }, { + name: "running pod with conditions", + podUID: "running", + watchConditions: map[string]WatchCondition{ + "completing": alwaysComplete, + "watching": neverComplete, + "updating": updatingCond, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + "updating": {version: 1}, + }, + }, { + name: "non-existant pod", + podUID: "non-existant", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: false, + expectRemoved: true, + }, { + name: "terminated pod", + podUID: "terminated", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: false, + expectRemoved: true, + }, { + name: "reinspecting pod", + podUID: "reinspect", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + }, + }} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg = testPleg.pleg + runtime := testPleg.runtime + runtime.AllPodList = initialPods + pleg.Relist() // Setup initial pod records. + + runtime.AllPodList = pods // Doesn't have "terminated" pod. + pleg.podsToReinspect["reinspect"] = nil + + var evaluatedConditions []string + for key, condition := range test.watchConditions { + wrappedCondition := func(status *kubecontainer.PodStatus) bool { + if !test.expectEvaluated { + assert.Fail(t, "conditions should not be evaluated") + } else { + evaluatedConditions = append(evaluatedConditions, key) + } + return condition(status) + } + pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition) + } + pleg.Relist() + + if test.expectEvaluated { + assert.Len(t, evaluatedConditions, len(test.watchConditions), "all conditions should be evaluated") + } + + if test.expectRemoved { + assert.NotContains(t, pleg.watchConditions, test.podUID, "Pod should be removed from watch conditions") + } else { + actualConditions := pleg.watchConditions[test.podUID] + assert.Len(t, actualConditions, len(test.expectWatchConditions), "expected number of conditions") + for key, expected := range test.expectWatchConditions { + if !assert.Contains(t, actualConditions, key) { + continue + } + actual := actualConditions[key] + assert.Equal(t, key, actual.key) + assert.Equal(t, expected.version, actual.version) + } + } + + }) + } +} diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 0a44745925b..6edc941dfdc 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -67,6 +67,10 @@ type PodLifecycleEventGenerator interface { Watch() chan *PodLifecycleEvent Healthy() (bool, error) UpdateCache(*kubecontainer.Pod, types.UID) (error, bool) + // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch + // condition is met. The condition is keyed so it can be updated before the condition + // is met. + SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) } // podLifecycleEventGeneratorHandler contains functions that are useful for different PLEGs @@ -77,3 +81,19 @@ type podLifecycleEventGeneratorHandler interface { Update(relistDuration *RelistDuration) Relist() } + +// WatchCondition takes the latest PodStatus, and returns whether the condition is met. +type WatchCondition func(*kubecontainer.PodStatus) bool + +// RunningContainerWatchCondition wraps a condition on the container status to make a pod +// WatchCondition. If the container is no longer running, the condition is implicitly cleared. +func RunningContainerWatchCondition(containerName string, condition func(*kubecontainer.Status) bool) WatchCondition { + return func(podStatus *kubecontainer.PodStatus) bool { + status := podStatus.FindContainerStatusByName(containerName) + if status == nil || status.State != kubecontainer.ContainerStateRunning { + // Container isn't running. Consider the condition "completed" so it is cleared. + return true + } + return condition(status) + } +} From da9c2c553b53b305f67a73c1b33c3059b260a347 Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Sun, 3 Nov 2024 13:16:27 -0800 Subject: [PATCH 3/6] Set pod watch conditions for resize --- pkg/kubelet/container/helpers.go | 3 +++ .../container/testing/fake_runtime_helper.go | 4 ++++ pkg/kubelet/kubelet.go | 15 ++++----------- pkg/kubelet/kuberuntime/kuberuntime_manager.go | 17 +++++++++++++++++ pkg/kubelet/pleg/evented.go | 4 ---- pkg/kubelet/pleg/generic.go | 17 +++-------------- pkg/kubelet/pleg/generic_test.go | 7 ++++--- pkg/kubelet/pleg/pleg.go | 3 +-- 8 files changed, 36 insertions(+), 34 deletions(-) diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 580ee34893a..fb43305faf2 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -66,6 +66,9 @@ type RuntimeHelper interface { // UnprepareDynamicResources unprepares resources for a a pod. UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error + + // SetPodWatchCondition flags a pod to be inspected until the condition is met. + SetPodWatchCondition(types.UID, string, func(*PodStatus) bool) } // ShouldContainerBeRestarted checks whether a container needs to be restarted. diff --git a/pkg/kubelet/container/testing/fake_runtime_helper.go b/pkg/kubelet/container/testing/fake_runtime_helper.go index b56ea1f2000..47c6bb89edf 100644 --- a/pkg/kubelet/container/testing/fake_runtime_helper.go +++ b/pkg/kubelet/container/testing/fake_runtime_helper.go @@ -114,3 +114,7 @@ func (f *FakeRuntimeHelper) PrepareDynamicResources(ctx context.Context, pod *v1 func (f *FakeRuntimeHelper) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { return nil } + +func (f *FakeRuntimeHelper) SetPodWatchCondition(_ kubetypes.UID, _ string, _ func(*kubecontainer.PodStatus) bool) { + // Not implemented. +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 56cff502cec..7ce055a17d9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1980,17 +1980,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType return false, nil } - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) { - // While resize is in progress, periodically request the latest status from the runtime via - // the PLEG. This is necessary since ordinarily pod status is only fetched when a container - // undergoes a state transition. - runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) - if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { - klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod)) - return false, err - } - } - return false, nil } @@ -3097,3 +3086,7 @@ func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) { kl.tryReconcileMirrorPods(staticPod, mirrorPod) } } + +func (kl *Kubelet) SetPodWatchCondition(podUID types.UID, conditionKey string, condition pleg.WatchCondition) { + kl.pleg.SetPodWatchCondition(podUID, conditionKey, condition) +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index f523b155d2b..ad509c9435c 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/pkg/kubelet/pleg" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/runtimeclass" "k8s.io/kubernetes/pkg/kubelet/sysctl" @@ -797,6 +798,22 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res "pod", format.Pod(pod), "resourceName", resourceName) return err } + resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName) + resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool { + if status.Resources == nil { + return false + } + switch resourceName { + case v1.ResourceMemory: + return status.Resources.MemoryLimit.Equal(*container.Resources.Limits.Memory()) + case v1.ResourceCPU: + return status.Resources.CPURequest.Equal(*container.Resources.Requests.Cpu()) && + status.Resources.CPULimit.Equal(*container.Resources.Limits.Cpu()) + default: + return true // Shouldn't happen. + } + }) + m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition) // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index 56844bc7533..39e13b55223 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -427,10 +427,6 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon metrics.EventedPLEGConnLatency.Observe(duration.Seconds()) } -func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { - return fmt.Errorf("not implemented"), false -} - func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition) } diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index e84d3b99cb2..de5ee494230 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -305,11 +305,9 @@ func (g *GenericPLEG) Relist() { needsReinspection[pid] = pod continue - } else { - if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { - if !updated { - continue - } + } else if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { + if !updated { + continue } } @@ -483,15 +481,6 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p return status, g.cache.Set(pod.ID, status, err, timestamp), err } -func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { - ctx := context.Background() - if pod == nil { - return fmt.Errorf("pod cannot be nil"), false - } - _, updated, err := g.updateCache(ctx, pod, pid) - return err, updated -} - func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 3eb94f0251e..4d7ac651745 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -766,7 +766,8 @@ func TestWatchConditions(t *testing.T) { }, }, }} - initialPods := append(pods, &containertest.FakePod{Pod: &kubecontainer.Pod{ + initialPods := pods + initialPods = append(initialPods, &containertest.FakePod{Pod: &kubecontainer.Pod{ Name: "terminated-pod", ID: "terminated", Sandboxes: []*kubecontainer.Container{ @@ -813,8 +814,8 @@ func TestWatchConditions(t *testing.T) { "updating": {version: 1}, }, }, { - name: "non-existant pod", - podUID: "non-existant", + name: "non-existent pod", + podUID: "non-existent", watchConditions: map[string]WatchCondition{ "watching": neverComplete, }, diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 6edc941dfdc..17e7ec81543 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -66,7 +66,6 @@ type PodLifecycleEventGenerator interface { Start() Watch() chan *PodLifecycleEvent Healthy() (bool, error) - UpdateCache(*kubecontainer.Pod, types.UID) (error, bool) // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch // condition is met. The condition is keyed so it can be updated before the condition // is met. @@ -83,7 +82,7 @@ type podLifecycleEventGeneratorHandler interface { } // WatchCondition takes the latest PodStatus, and returns whether the condition is met. -type WatchCondition func(*kubecontainer.PodStatus) bool +type WatchCondition = func(*kubecontainer.PodStatus) bool // RunningContainerWatchCondition wraps a condition on the container status to make a pod // WatchCondition. If the container is no longer running, the condition is implicitly cleared. From 35bd1e6831ccddcbe6fdf32f2ca256b3f5705c8b Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Mon, 4 Nov 2024 16:51:26 -0800 Subject: [PATCH 4/6] Emit a pod event when WatchConditions are completed --- pkg/kubelet/pleg/generic.go | 5 ++++- pkg/kubelet/pleg/pleg.go | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index de5ee494230..956ef60afc3 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -317,7 +317,10 @@ func (g *GenericPLEG) Relist() { completedConditions = append(completedConditions, condition) } } - g.completeWatchConditions(pid, completedConditions) + if len(completedConditions) > 0 { + g.completeWatchConditions(pid, completedConditions) + events = append(events, &PodLifecycleEvent{ID: pid, Type: ConditionMet}) + } // Update the internal storage and send out the events. g.podRecords.update(pid) diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 17e7ec81543..08625aba9cc 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -47,6 +47,8 @@ const ( PodSync PodLifeCycleEventType = "PodSync" // ContainerChanged - event type when the new state of container is unknown. ContainerChanged PodLifeCycleEventType = "ContainerChanged" + // ConditionMet - event type triggered when any number of watch conditions are met. + ConditionMet PodLifeCycleEventType = "ConditionMet" ) // PodLifecycleEvent is an event that reflects the change of the pod state. From 7fce6f2317a93284b5c7e434d715de83376bf30b Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Tue, 5 Nov 2024 17:09:48 -0800 Subject: [PATCH 5/6] More comments around PLEG WatchConditions --- .../kuberuntime/kuberuntime_manager.go | 4 +++ pkg/kubelet/pleg/generic.go | 30 ++++++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index ad509c9435c..516cba76a45 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -799,6 +799,9 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res return err } resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName) + + // Watch (poll) the container for the expected resources update. Stop watching once the resources + // match the desired values. resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool { if status.Resources == nil { return false @@ -814,6 +817,7 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res } }) m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition) + // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 956ef60afc3..ce0abc6bd64 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -314,11 +314,15 @@ func (g *GenericPLEG) Relist() { var completedConditions []versionedWatchCondition for _, condition := range watchConditions { if condition.condition(status) { + // condition was met: add it to the list of completed conditions. completedConditions = append(completedConditions, condition) } } if len(completedConditions) > 0 { g.completeWatchConditions(pid, completedConditions) + // If at least 1 condition completed, emit a ConditionMet event to trigger a pod sync. + // We only emit 1 event even if multiple conditions are met, since SyncPod reevaluates + // all containers in the pod with the latest status. events = append(events, &PodLifecycleEvent{ID: pid, Type: ConditionMet}) } @@ -484,24 +488,25 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p return status, g.cache.Set(pod.ID, status, err, timestamp), err } +// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch +// condition is met. The condition is keyed so it can be updated before the condition +// is met. func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() conditions, ok := g.watchConditions[podUID] if !ok { - if condition == nil { - return // Condition isn't set, nothing to do. - } conditions = make(map[string]versionedWatchCondition) } versioned, found := conditions[conditionKey] if found { + // Watch condition was already set. Increment its version & update the condition function. versioned.version++ versioned.condition = condition conditions[conditionKey] = versioned - } else if condition != nil { + } else { conditions[conditionKey] = versionedWatchCondition{ key: conditionKey, condition: condition, @@ -516,19 +521,22 @@ func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCo g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() - conditions, ok := g.watchConditions[podUID] + podConditions, ok := g.watchConditions[podUID] if !ok { return nil } - filtered := make([]versionedWatchCondition, 0, len(conditions)) - for _, condition := range conditions { - filtered = append(filtered, condition) + // Flatten the map into a list of conditions. This also serves to create a copy, so the lock can + // be released. + conditions := make([]versionedWatchCondition, 0, len(podConditions)) + for _, condition := range podConditions { + conditions = append(conditions, condition) } - return filtered + return conditions } -// completeWatchConditions clears the completed watch conditions. +// completeWatchConditions removes the completed watch conditions, unless they have been updated +// since the condition was checked. func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() @@ -549,6 +557,8 @@ func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditi g.watchConditions[podUID] = conditions } +// cleanupOrphanedWatchConditions purges the watchConditions map of any pods that were removed from +// the pod records. Events are not emitted for removed pods. func (g *GenericPLEG) cleanupOrphanedWatchConditions() { g.watchConditionsLock.Lock() defer g.watchConditionsLock.Unlock() From 24443b67cbada0ad97c01e8c0ad022cf360a54b5 Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Wed, 6 Nov 2024 16:28:44 -0800 Subject: [PATCH 6/6] Expand PLEG SetWatchCondition unit test coverage --- pkg/kubelet/pleg/generic_test.go | 199 ++++++++++++++++++++++++------- 1 file changed, 153 insertions(+), 46 deletions(-) diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 4d7ac651745..7c024834537 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/types" @@ -82,9 +83,10 @@ func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent { return events } -func createTestContainer(ID string, state kubecontainer.State) *kubecontainer.Container { +func createTestContainer(id string, state kubecontainer.State) *kubecontainer.Container { return &kubecontainer.Container{ - ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID}, + ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: id}, + Name: id, State: state, } } @@ -317,14 +319,14 @@ func testReportMissingPods(t *testing.T, numRelists int) { } func newTestGenericPLEGWithRuntimeMock(runtimeMock kubecontainer.Runtime) *GenericPLEG { - pleg := &GenericPLEG{ - relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour}, - runtime: runtimeMock, - eventChannel: make(chan *PodLifecycleEvent, 1000), - podRecords: make(podRecords), - cache: kubecontainer.NewCache(), - clock: clock.RealClock{}, - } + pleg := NewGenericPLEG( + klog.Logger{}, + runtimeMock, + make(chan *PodLifecycleEvent, 1000), + &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour}, + kubecontainer.NewCache(), + clock.RealClock{}, + ).(*GenericPLEG) return pleg } @@ -738,42 +740,46 @@ kubelet_running_pods 2 } func TestWatchConditions(t *testing.T) { - pods := []*containertest.FakePod{{ - Pod: &kubecontainer.Pod{ - Name: "running-pod", - ID: "running", - Sandboxes: []*kubecontainer.Container{ - createTestContainer("s", kubecontainer.ContainerStateRunning), - }, - Containers: []*kubecontainer.Container{ - createTestContainer("c", kubecontainer.ContainerStateRunning), - }, + pods := []*kubecontainer.Pod{{ + Name: "running-pod", + ID: "running", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + Containers: []*kubecontainer.Container{ + createTestContainer("c", kubecontainer.ContainerStateRunning), }, }, { - Pod: &kubecontainer.Pod{ - Name: "terminating-pod", - ID: "terminating", - Sandboxes: []*kubecontainer.Container{ - createTestContainer("s", kubecontainer.ContainerStateExited), - }, + Name: "running-pod-2", + ID: "running-2", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + Containers: []*kubecontainer.Container{ + createTestContainer("c-exited", kubecontainer.ContainerStateExited), + createTestContainer("c-running", kubecontainer.ContainerStateRunning), }, }, { - Pod: &kubecontainer.Pod{ - Name: "reinspect-pod", - ID: "reinspect", - Sandboxes: []*kubecontainer.Container{ - createTestContainer("s", kubecontainer.ContainerStateRunning), - }, + Name: "terminating-pod", + ID: "terminating", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateExited), + }, + }, { + Name: "reinspect-pod", + ID: "reinspect", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), }, }} initialPods := pods - initialPods = append(initialPods, &containertest.FakePod{Pod: &kubecontainer.Pod{ + initialPods = append(initialPods, &kubecontainer.Pod{ Name: "terminated-pod", ID: "terminated", Sandboxes: []*kubecontainer.Container{ createTestContainer("s", kubecontainer.ContainerStateExited), }, - }}) + }) alwaysComplete := func(_ *kubecontainer.PodStatus) bool { return true @@ -790,13 +796,32 @@ func TestWatchConditions(t *testing.T) { return true } + // resettingCond decrements the version before it completes. + var resettingCond = func(_ *kubecontainer.PodStatus) bool { + versioned := pleg.watchConditions["running"]["resetting"] + versioned.version = 0 + pleg.watchConditions["running"]["resetting"] = versioned + return true + } + + // makeContainerCond returns a RunningContainerWatchCondition that asserts the expected container status + makeContainerCond := func(expectedContainerName string, complete bool) WatchCondition { + return RunningContainerWatchCondition(expectedContainerName, func(status *kubecontainer.Status) bool { + if status.Name != expectedContainerName { + panic(fmt.Sprintf("unexpected container name: got %q, want %q", status.Name, expectedContainerName)) + } + return complete + }) + } + testCases := []struct { - name string - podUID types.UID - watchConditions map[string]WatchCondition - expectEvaluated bool // Whether the watch conditions should be evaluated - expectRemoved bool // Whether podUID should be present in the watch conditions map - expectWatchConditions map[string]versionedWatchCondition // The expected watch conditions for the podUIDa (only key & version checked) + name string + podUID types.UID + watchConditions map[string]WatchCondition + incrementInitialVersion bool // Whether to call SetPodWatchCondition multiple times to increment the version + expectEvaluated bool // Whether the watch conditions should be evaluated + expectRemoved bool // Whether podUID should be present in the watch conditions map + expectWatchConditions map[string]versionedWatchCondition // The expected watch conditions for the podUID (only key & version checked) }{{ name: "no watch conditions", podUID: "running", @@ -813,6 +838,31 @@ func TestWatchConditions(t *testing.T) { "watching": {version: 0}, "updating": {version: 1}, }, + }, { + name: "conditions with incremented versions", + podUID: "running", + incrementInitialVersion: true, + watchConditions: map[string]WatchCondition{ + "completing": alwaysComplete, + "watching": neverComplete, + "updating": updatingCond, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 1}, + "updating": {version: 2}, + }, + }, { + name: "completed watch condition with older version", + podUID: "running", + incrementInitialVersion: true, + watchConditions: map[string]WatchCondition{ + "resetting": resettingCond, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "resetting": {version: 0}, + }, }, { name: "non-existent pod", podUID: "non-existent", @@ -839,22 +889,75 @@ func TestWatchConditions(t *testing.T) { expectWatchConditions: map[string]versionedWatchCondition{ "watching": {version: 0}, }, + }, { + name: "single container conditions", + podUID: "running", + watchConditions: map[string]WatchCondition{ + "completing": makeContainerCond("c", true), + "watching": makeContainerCond("c", false), + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + }, + }, { + name: "multi-container conditions", + podUID: "running-2", + watchConditions: map[string]WatchCondition{ + "completing:exited": makeContainerCond("c-exited", true), + "watching:exited": makeContainerCond("c-exited", false), + "completing:running": makeContainerCond("c-running", true), + "watching:running": makeContainerCond("c-running", false), + "completing:dne": makeContainerCond("c-dne", true), + "watching:dne": makeContainerCond("c-dne", false), + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching:running": {version: 0}, + }, }} for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - testPleg := newTestGenericPLEG() - pleg = testPleg.pleg - runtime := testPleg.runtime - runtime.AllPodList = initialPods - pleg.Relist() // Setup initial pod records. + runtimeMock := containertest.NewMockRuntime(t) + pleg = newTestGenericPLEGWithRuntimeMock(runtimeMock) - runtime.AllPodList = pods // Doesn't have "terminated" pod. + // Mock pod statuses + for _, pod := range initialPods { + podStatus := &kubecontainer.PodStatus{ + ID: pod.ID, + Name: pod.Name, + Namespace: pod.Namespace, + } + for _, c := range pod.Containers { + podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &kubecontainer.Status{ + ID: c.ID, + Name: c.Name, + State: c.State, + }) + } + runtimeMock.EXPECT(). + GetPodStatus(mock.Anything, pod.ID, pod.Name, pod.Namespace). + Return(podStatus, nil).Maybe() + } + + // Setup initial pod records. + runtimeMock.EXPECT().GetPods(mock.Anything, true).Return(initialPods, nil).Once() + pleg.Relist() pleg.podsToReinspect["reinspect"] = nil + // Remove "terminated" pod. + runtimeMock.EXPECT().GetPods(mock.Anything, true).Return(pods, nil).Once() + var evaluatedConditions []string for key, condition := range test.watchConditions { wrappedCondition := func(status *kubecontainer.PodStatus) bool { + defer func() { + if r := recover(); r != nil { + require.Fail(t, "condition error", r) + } + }() + assert.Equal(t, test.podUID, status.ID, "podUID") if !test.expectEvaluated { assert.Fail(t, "conditions should not be evaluated") } else { @@ -863,6 +966,10 @@ func TestWatchConditions(t *testing.T) { return condition(status) } pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition) + if test.incrementInitialVersion { + // Set the watch condition a second time to increment the version. + pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition) + } } pleg.Relist()