diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index f20f0a28007..56844bc7533 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -430,3 +430,7 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { return fmt.Errorf("not implemented"), false } + +func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { + e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition) +} diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 3eafb2550e8..e84d3b99cb2 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -80,6 +80,16 @@ type GenericPLEG struct { podCacheMutex sync.Mutex // logger is used for contextual logging logger klog.Logger + // watchConditions tracks pod watch conditions, guarded by watchConditionsLock + // watchConditions is a map of pod UID -> condition key -> condition + watchConditions map[types.UID]map[string]versionedWatchCondition + watchConditionsLock sync.Mutex +} + +type versionedWatchCondition struct { + key string + condition WatchCondition + version uint32 } // plegContainerState has a one-to-one mapping to the @@ -125,13 +135,14 @@ func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChan panic("cache cannot be nil") } return &GenericPLEG{ - logger: logger, - relistDuration: relistDuration, - runtime: runtime, - eventChannel: eventChannel, - podRecords: make(podRecords), - cache: cache, - clock: clock, + logger: logger, + relistDuration: relistDuration, + runtime: runtime, + eventChannel: eventChannel, + podRecords: make(podRecords), + cache: cache, + clock: clock, + watchConditions: make(map[types.UID]map[string]versionedWatchCondition), } } @@ -252,6 +263,7 @@ func (g *GenericPLEG) Relist() { // update running pod and container count updateRunningPodAndContainerMetrics(pods) g.podRecords.setCurrent(pods) + g.cleanupOrphanedWatchConditions() needsReinspection := make(map[types.UID]*kubecontainer.Pod) @@ -267,9 +279,10 @@ func (g *GenericPLEG) Relist() { events = append(events, containerEvents...) } + watchConditions := g.getPodWatchConditions(pid) _, reinspect := g.podsToReinspect[pid] - if len(events) == 0 && !reinspect { + if len(events) == 0 && len(watchConditions) == 0 && !reinspect { // Nothing else needed for this pod. continue } @@ -283,7 +296,8 @@ func (g *GenericPLEG) Relist() { // inspecting the pod and getting the PodStatus to update the cache // serially may take a while. We should be aware of this and // parallelize if needed. - if err, updated := g.updateCache(ctx, pod, pid); err != nil { + status, updated, err := g.updateCache(ctx, pod, pid) + if err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name)) @@ -299,6 +313,14 @@ func (g *GenericPLEG) Relist() { } } + var completedConditions []versionedWatchCondition + for _, condition := range watchConditions { + if condition.condition(status) { + completedConditions = append(completedConditions, condition) + } + } + g.completeWatchConditions(pid, completedConditions) + // Update the internal storage and send out the events. g.podRecords.update(pid) @@ -320,8 +342,6 @@ func (g *GenericPLEG) Relist() { if events[i].Type == ContainerDied { // Fill up containerExitCode map for ContainerDied event when first time appeared if len(containerExitCode) == 0 && pod != nil { - // Get updated podStatus - status, err := g.cache.Get(pod.ID) if err == nil { for _, containerStatus := range status.ContainerStatuses { containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode @@ -410,13 +430,13 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) // updateCache tries to update the pod status in the kubelet cache and returns true if the // pod status was actually updated in the cache. It will return false if the pod status // was ignored by the cache. -func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) { +func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (*kubecontainer.PodStatus, bool, error) { if pod == nil { // The pod is missing in the current relist. This means that // the pod has no visible (active or inactive) containers. g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid)) g.cache.Delete(pid) - return nil, true + return nil, true, nil } g.podCacheMutex.Lock() @@ -460,7 +480,7 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p timestamp = status.TimeStamp } - return err, g.cache.Set(pod.ID, status, err, timestamp) + return status, g.cache.Set(pod.ID, status, err, timestamp), err } func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { @@ -468,14 +488,85 @@ func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, if pod == nil { return fmt.Errorf("pod cannot be nil"), false } - return g.updateCache(ctx, pod, pid) + _, updated, err := g.updateCache(ctx, pod, pid) + return err, updated } -func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { - if e == nil { +func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + if condition == nil { + return // Condition isn't set, nothing to do. + } + conditions = make(map[string]versionedWatchCondition) + } + + versioned, found := conditions[conditionKey] + if found { + versioned.version++ + versioned.condition = condition + conditions[conditionKey] = versioned + } else if condition != nil { + conditions[conditionKey] = versionedWatchCondition{ + key: conditionKey, + condition: condition, + } + } + + g.watchConditions[podUID] = conditions +} + +// getPodWatchConditions returns a list of the active watch conditions for the pod. +func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCondition { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + return nil + } + + filtered := make([]versionedWatchCondition, 0, len(conditions)) + for _, condition := range conditions { + filtered = append(filtered, condition) + } + return filtered +} + +// completeWatchConditions clears the completed watch conditions. +func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + // Pod was deleted, nothing to do. return } - eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e) + + for _, completed := range completedConditions { + condition := conditions[completed.key] + // Only clear the condition if it has not been updated. + if condition.version == completed.version { + delete(conditions, completed.key) + } + } + g.watchConditions[podUID] = conditions +} + +func (g *GenericPLEG) cleanupOrphanedWatchConditions() { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + for podUID := range g.watchConditions { + if g.podRecords.getCurrent(podUID) == nil { + // Pod was deleted, remove it from the watch conditions. + delete(g.watchConditions, podUID) + } + } } func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState { diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 16d5fad4d9e..3eb94f0251e 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -736,3 +736,154 @@ kubelet_running_pods 2 }) } } + +func TestWatchConditions(t *testing.T) { + pods := []*containertest.FakePod{{ + Pod: &kubecontainer.Pod{ + Name: "running-pod", + ID: "running", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + Containers: []*kubecontainer.Container{ + createTestContainer("c", kubecontainer.ContainerStateRunning), + }, + }, + }, { + Pod: &kubecontainer.Pod{ + Name: "terminating-pod", + ID: "terminating", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateExited), + }, + }, + }, { + Pod: &kubecontainer.Pod{ + Name: "reinspect-pod", + ID: "reinspect", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + }, + }} + initialPods := append(pods, &containertest.FakePod{Pod: &kubecontainer.Pod{ + Name: "terminated-pod", + ID: "terminated", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateExited), + }, + }}) + + alwaysComplete := func(_ *kubecontainer.PodStatus) bool { + return true + } + neverComplete := func(_ *kubecontainer.PodStatus) bool { + return false + } + + var pleg *GenericPLEG + var updatingCond WatchCondition + // updatingCond always completes, but updates the condition first. + updatingCond = func(_ *kubecontainer.PodStatus) bool { + pleg.SetPodWatchCondition("running", "updating", updatingCond) + return true + } + + testCases := []struct { + name string + podUID types.UID + watchConditions map[string]WatchCondition + expectEvaluated bool // Whether the watch conditions should be evaluated + expectRemoved bool // Whether podUID should be present in the watch conditions map + expectWatchConditions map[string]versionedWatchCondition // The expected watch conditions for the podUIDa (only key & version checked) + }{{ + name: "no watch conditions", + podUID: "running", + }, { + name: "running pod with conditions", + podUID: "running", + watchConditions: map[string]WatchCondition{ + "completing": alwaysComplete, + "watching": neverComplete, + "updating": updatingCond, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + "updating": {version: 1}, + }, + }, { + name: "non-existant pod", + podUID: "non-existant", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: false, + expectRemoved: true, + }, { + name: "terminated pod", + podUID: "terminated", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: false, + expectRemoved: true, + }, { + name: "reinspecting pod", + podUID: "reinspect", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + }, + }} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg = testPleg.pleg + runtime := testPleg.runtime + runtime.AllPodList = initialPods + pleg.Relist() // Setup initial pod records. + + runtime.AllPodList = pods // Doesn't have "terminated" pod. + pleg.podsToReinspect["reinspect"] = nil + + var evaluatedConditions []string + for key, condition := range test.watchConditions { + wrappedCondition := func(status *kubecontainer.PodStatus) bool { + if !test.expectEvaluated { + assert.Fail(t, "conditions should not be evaluated") + } else { + evaluatedConditions = append(evaluatedConditions, key) + } + return condition(status) + } + pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition) + } + pleg.Relist() + + if test.expectEvaluated { + assert.Len(t, evaluatedConditions, len(test.watchConditions), "all conditions should be evaluated") + } + + if test.expectRemoved { + assert.NotContains(t, pleg.watchConditions, test.podUID, "Pod should be removed from watch conditions") + } else { + actualConditions := pleg.watchConditions[test.podUID] + assert.Len(t, actualConditions, len(test.expectWatchConditions), "expected number of conditions") + for key, expected := range test.expectWatchConditions { + if !assert.Contains(t, actualConditions, key) { + continue + } + actual := actualConditions[key] + assert.Equal(t, key, actual.key) + assert.Equal(t, expected.version, actual.version) + } + } + + }) + } +} diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 0a44745925b..6edc941dfdc 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -67,6 +67,10 @@ type PodLifecycleEventGenerator interface { Watch() chan *PodLifecycleEvent Healthy() (bool, error) UpdateCache(*kubecontainer.Pod, types.UID) (error, bool) + // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch + // condition is met. The condition is keyed so it can be updated before the condition + // is met. + SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) } // podLifecycleEventGeneratorHandler contains functions that are useful for different PLEGs @@ -77,3 +81,19 @@ type podLifecycleEventGeneratorHandler interface { Update(relistDuration *RelistDuration) Relist() } + +// WatchCondition takes the latest PodStatus, and returns whether the condition is met. +type WatchCondition func(*kubecontainer.PodStatus) bool + +// RunningContainerWatchCondition wraps a condition on the container status to make a pod +// WatchCondition. If the container is no longer running, the condition is implicitly cleared. +func RunningContainerWatchCondition(containerName string, condition func(*kubecontainer.Status) bool) WatchCondition { + return func(podStatus *kubecontainer.PodStatus) bool { + status := podStatus.FindContainerStatusByName(containerName) + if status == nil || status.State != kubecontainer.ContainerStateRunning { + // Container isn't running. Consider the condition "completed" so it is cleared. + return true + } + return condition(status) + } +}