diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 580ee34893a..fb43305faf2 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -66,6 +66,9 @@ type RuntimeHelper interface { // UnprepareDynamicResources unprepares resources for a a pod. UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error + + // SetPodWatchCondition flags a pod to be inspected until the condition is met. + SetPodWatchCondition(types.UID, string, func(*PodStatus) bool) } // ShouldContainerBeRestarted checks whether a container needs to be restarted. diff --git a/pkg/kubelet/container/testing/fake_runtime_helper.go b/pkg/kubelet/container/testing/fake_runtime_helper.go index b56ea1f2000..47c6bb89edf 100644 --- a/pkg/kubelet/container/testing/fake_runtime_helper.go +++ b/pkg/kubelet/container/testing/fake_runtime_helper.go @@ -114,3 +114,7 @@ func (f *FakeRuntimeHelper) PrepareDynamicResources(ctx context.Context, pod *v1 func (f *FakeRuntimeHelper) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { return nil } + +func (f *FakeRuntimeHelper) SetPodWatchCondition(_ kubetypes.UID, _ string, _ func(*kubecontainer.PodStatus) bool) { + // Not implemented. +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 94f2a9ded95..f61ddb0378c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2028,17 +2028,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType return false, nil } - if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, podStatus) { - // While resize is in progress, periodically request the latest status from the runtime via - // the PLEG. This is necessary since ordinarily pod status is only fetched when a container - // undergoes a state transition. - runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) - if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil { - klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod)) - return false, err - } - } - return false, nil } @@ -3174,3 +3163,7 @@ func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) { kl.tryReconcileMirrorPods(staticPod, mirrorPod) } } + +func (kl *Kubelet) SetPodWatchCondition(podUID types.UID, conditionKey string, condition pleg.WatchCondition) { + kl.pleg.SetPodWatchCondition(podUID, conditionKey, condition) +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index f7902bd47d8..949024d66e2 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/pkg/kubelet/pleg" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/runtimeclass" "k8s.io/kubernetes/pkg/kubelet/sysctl" @@ -804,6 +805,26 @@ func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, res "pod", format.Pod(pod), "resourceName", resourceName) return err } + resizeKey := fmt.Sprintf("%s:resize:%s", container.Name, resourceName) + + // Watch (poll) the container for the expected resources update. Stop watching once the resources + // match the desired values. + resizeCondition := pleg.RunningContainerWatchCondition(container.Name, func(status *kubecontainer.Status) bool { + if status.Resources == nil { + return false + } + switch resourceName { + case v1.ResourceMemory: + return status.Resources.MemoryLimit.Equal(*container.Resources.Limits.Memory()) + case v1.ResourceCPU: + return status.Resources.CPURequest.Equal(*container.Resources.Requests.Cpu()) && + status.Resources.CPULimit.Equal(*container.Resources.Limits.Cpu()) + default: + return true // Shouldn't happen. + } + }) + m.runtimeHelper.SetPodWatchCondition(pod.UID, resizeKey, resizeCondition) + // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index f20f0a28007..39e13b55223 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -427,6 +427,6 @@ func (e *EventedPLEG) updateLatencyMetric(event *runtimeapi.ContainerEventRespon metrics.EventedPLEGConnLatency.Observe(duration.Seconds()) } -func (e *EventedPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { - return fmt.Errorf("not implemented"), false +func (e *EventedPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { + e.genericPleg.SetPodWatchCondition(podUID, conditionKey, condition) } diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 41acf15ab65..ce0abc6bd64 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -80,6 +80,16 @@ type GenericPLEG struct { podCacheMutex sync.Mutex // logger is used for contextual logging logger klog.Logger + // watchConditions tracks pod watch conditions, guarded by watchConditionsLock + // watchConditions is a map of pod UID -> condition key -> condition + watchConditions map[types.UID]map[string]versionedWatchCondition + watchConditionsLock sync.Mutex +} + +type versionedWatchCondition struct { + key string + condition WatchCondition + version uint32 } // plegContainerState has a one-to-one mapping to the @@ -125,13 +135,14 @@ func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChan panic("cache cannot be nil") } return &GenericPLEG{ - logger: logger, - relistDuration: relistDuration, - runtime: runtime, - eventChannel: eventChannel, - podRecords: make(podRecords), - cache: cache, - clock: clock, + logger: logger, + relistDuration: relistDuration, + runtime: runtime, + eventChannel: eventChannel, + podRecords: make(podRecords), + cache: cache, + clock: clock, + watchConditions: make(map[types.UID]map[string]versionedWatchCondition), } } @@ -252,28 +263,29 @@ func (g *GenericPLEG) Relist() { // update running pod and container count updateRunningPodAndContainerMetrics(pods) g.podRecords.setCurrent(pods) + g.cleanupOrphanedWatchConditions() + + needsReinspection := make(map[types.UID]*kubecontainer.Pod) - // Compare the old and the current pods, and generate events. - eventsByPodID := map[types.UID][]*PodLifecycleEvent{} for pid := range g.podRecords { + // Compare the old and the current pods, and generate events. oldPod := g.podRecords.getOld(pid) pod := g.podRecords.getCurrent(pid) // Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) + var events []*PodLifecycleEvent for _, container := range allContainers { - events := computeEvents(g.logger, oldPod, pod, &container.ID) - for _, e := range events { - updateEvents(eventsByPodID, e) - } + containerEvents := computeEvents(g.logger, oldPod, pod, &container.ID) + events = append(events, containerEvents...) } - } - needsReinspection := make(map[types.UID]*kubecontainer.Pod) + watchConditions := g.getPodWatchConditions(pid) + _, reinspect := g.podsToReinspect[pid] - // If there are events associated with a pod, we should update the - // podCache. - for pid, events := range eventsByPodID { - pod := g.podRecords.getCurrent(pid) + if len(events) == 0 && len(watchConditions) == 0 && !reinspect { + // Nothing else needed for this pod. + continue + } // updateCache() will inspect the pod and update the cache. If an // error occurs during the inspection, we want PLEG to retry again @@ -284,7 +296,8 @@ func (g *GenericPLEG) Relist() { // inspecting the pod and getting the PodStatus to update the cache // serially may take a while. We should be aware of this and // parallelize if needed. - if err, updated := g.updateCache(ctx, pod, pid); err != nil { + status, updated, err := g.updateCache(ctx, pod, pid) + if err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name)) @@ -292,18 +305,27 @@ func (g *GenericPLEG) Relist() { needsReinspection[pid] = pod continue - } else { - // this pod was in the list to reinspect and we did so because it had events, so remove it - // from the list (we don't want the reinspection code below to inspect it a second time in - // this relist execution) - delete(g.podsToReinspect, pid) - if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { - if !updated { - continue - } + } else if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { + if !updated { + continue } } + var completedConditions []versionedWatchCondition + for _, condition := range watchConditions { + if condition.condition(status) { + // condition was met: add it to the list of completed conditions. + completedConditions = append(completedConditions, condition) + } + } + if len(completedConditions) > 0 { + g.completeWatchConditions(pid, completedConditions) + // If at least 1 condition completed, emit a ConditionMet event to trigger a pod sync. + // We only emit 1 event even if multiple conditions are met, since SyncPod reevaluates + // all containers in the pod with the latest status. + events = append(events, &PodLifecycleEvent{ID: pid, Type: ConditionMet}) + } + // Update the internal storage and send out the events. g.podRecords.update(pid) @@ -325,8 +347,6 @@ func (g *GenericPLEG) Relist() { if events[i].Type == ContainerDied { // Fill up containerExitCode map for ContainerDied event when first time appeared if len(containerExitCode) == 0 && pod != nil { - // Get updated podStatus - status, err := g.cache.Get(pod.ID) if err == nil { for _, containerStatus := range status.ContainerStatuses { containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode @@ -342,18 +362,6 @@ func (g *GenericPLEG) Relist() { } } - // reinspect any pods that failed inspection during the previous relist - if len(g.podsToReinspect) > 0 { - g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection") - for pid, pod := range g.podsToReinspect { - if err, _ := g.updateCache(ctx, pod, pid); err != nil { - // Rely on updateCache calling GetPodStatus to log the actual error. - g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name)) - needsReinspection[pid] = pod - } - } - } - // Update the cache timestamp. This needs to happen *after* // all pods have been properly updated in the cache. g.cache.UpdateTime(timestamp) @@ -427,13 +435,13 @@ func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) // updateCache tries to update the pod status in the kubelet cache and returns true if the // pod status was actually updated in the cache. It will return false if the pod status // was ignored by the cache. -func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) { +func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (*kubecontainer.PodStatus, bool, error) { if pod == nil { // The pod is missing in the current relist. This means that // the pod has no visible (active or inactive) containers. g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid)) g.cache.Delete(pid) - return nil, true + return nil, true, nil } g.podCacheMutex.Lock() @@ -477,22 +485,90 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p timestamp = status.TimeStamp } - return err, g.cache.Set(pod.ID, status, err, timestamp) + return status, g.cache.Set(pod.ID, status, err, timestamp), err } -func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { - ctx := context.Background() - if pod == nil { - return fmt.Errorf("pod cannot be nil"), false +// SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch +// condition is met. The condition is keyed so it can be updated before the condition +// is met. +func (g *GenericPLEG) SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + conditions = make(map[string]versionedWatchCondition) } - return g.updateCache(ctx, pod, pid) + + versioned, found := conditions[conditionKey] + if found { + // Watch condition was already set. Increment its version & update the condition function. + versioned.version++ + versioned.condition = condition + conditions[conditionKey] = versioned + } else { + conditions[conditionKey] = versionedWatchCondition{ + key: conditionKey, + condition: condition, + } + } + + g.watchConditions[podUID] = conditions } -func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { - if e == nil { +// getPodWatchConditions returns a list of the active watch conditions for the pod. +func (g *GenericPLEG) getPodWatchConditions(podUID types.UID) []versionedWatchCondition { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + podConditions, ok := g.watchConditions[podUID] + if !ok { + return nil + } + + // Flatten the map into a list of conditions. This also serves to create a copy, so the lock can + // be released. + conditions := make([]versionedWatchCondition, 0, len(podConditions)) + for _, condition := range podConditions { + conditions = append(conditions, condition) + } + return conditions +} + +// completeWatchConditions removes the completed watch conditions, unless they have been updated +// since the condition was checked. +func (g *GenericPLEG) completeWatchConditions(podUID types.UID, completedConditions []versionedWatchCondition) { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + conditions, ok := g.watchConditions[podUID] + if !ok { + // Pod was deleted, nothing to do. return } - eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e) + + for _, completed := range completedConditions { + condition := conditions[completed.key] + // Only clear the condition if it has not been updated. + if condition.version == completed.version { + delete(conditions, completed.key) + } + } + g.watchConditions[podUID] = conditions +} + +// cleanupOrphanedWatchConditions purges the watchConditions map of any pods that were removed from +// the pod records. Events are not emitted for removed pods. +func (g *GenericPLEG) cleanupOrphanedWatchConditions() { + g.watchConditionsLock.Lock() + defer g.watchConditionsLock.Unlock() + + for podUID := range g.watchConditions { + if g.podRecords.getCurrent(podUID) == nil { + // Pod was deleted, remove it from the watch conditions. + delete(g.watchConditions, podUID) + } + } } func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState { diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 16d5fad4d9e..7c024834537 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/types" @@ -82,9 +83,10 @@ func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent { return events } -func createTestContainer(ID string, state kubecontainer.State) *kubecontainer.Container { +func createTestContainer(id string, state kubecontainer.State) *kubecontainer.Container { return &kubecontainer.Container{ - ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID}, + ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: id}, + Name: id, State: state, } } @@ -317,14 +319,14 @@ func testReportMissingPods(t *testing.T, numRelists int) { } func newTestGenericPLEGWithRuntimeMock(runtimeMock kubecontainer.Runtime) *GenericPLEG { - pleg := &GenericPLEG{ - relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour}, - runtime: runtimeMock, - eventChannel: make(chan *PodLifecycleEvent, 1000), - podRecords: make(podRecords), - cache: kubecontainer.NewCache(), - clock: clock.RealClock{}, - } + pleg := NewGenericPLEG( + klog.Logger{}, + runtimeMock, + make(chan *PodLifecycleEvent, 1000), + &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour}, + kubecontainer.NewCache(), + clock.RealClock{}, + ).(*GenericPLEG) return pleg } @@ -736,3 +738,260 @@ kubelet_running_pods 2 }) } } + +func TestWatchConditions(t *testing.T) { + pods := []*kubecontainer.Pod{{ + Name: "running-pod", + ID: "running", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + Containers: []*kubecontainer.Container{ + createTestContainer("c", kubecontainer.ContainerStateRunning), + }, + }, { + Name: "running-pod-2", + ID: "running-2", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + Containers: []*kubecontainer.Container{ + createTestContainer("c-exited", kubecontainer.ContainerStateExited), + createTestContainer("c-running", kubecontainer.ContainerStateRunning), + }, + }, { + Name: "terminating-pod", + ID: "terminating", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateExited), + }, + }, { + Name: "reinspect-pod", + ID: "reinspect", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateRunning), + }, + }} + initialPods := pods + initialPods = append(initialPods, &kubecontainer.Pod{ + Name: "terminated-pod", + ID: "terminated", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("s", kubecontainer.ContainerStateExited), + }, + }) + + alwaysComplete := func(_ *kubecontainer.PodStatus) bool { + return true + } + neverComplete := func(_ *kubecontainer.PodStatus) bool { + return false + } + + var pleg *GenericPLEG + var updatingCond WatchCondition + // updatingCond always completes, but updates the condition first. + updatingCond = func(_ *kubecontainer.PodStatus) bool { + pleg.SetPodWatchCondition("running", "updating", updatingCond) + return true + } + + // resettingCond decrements the version before it completes. + var resettingCond = func(_ *kubecontainer.PodStatus) bool { + versioned := pleg.watchConditions["running"]["resetting"] + versioned.version = 0 + pleg.watchConditions["running"]["resetting"] = versioned + return true + } + + // makeContainerCond returns a RunningContainerWatchCondition that asserts the expected container status + makeContainerCond := func(expectedContainerName string, complete bool) WatchCondition { + return RunningContainerWatchCondition(expectedContainerName, func(status *kubecontainer.Status) bool { + if status.Name != expectedContainerName { + panic(fmt.Sprintf("unexpected container name: got %q, want %q", status.Name, expectedContainerName)) + } + return complete + }) + } + + testCases := []struct { + name string + podUID types.UID + watchConditions map[string]WatchCondition + incrementInitialVersion bool // Whether to call SetPodWatchCondition multiple times to increment the version + expectEvaluated bool // Whether the watch conditions should be evaluated + expectRemoved bool // Whether podUID should be present in the watch conditions map + expectWatchConditions map[string]versionedWatchCondition // The expected watch conditions for the podUID (only key & version checked) + }{{ + name: "no watch conditions", + podUID: "running", + }, { + name: "running pod with conditions", + podUID: "running", + watchConditions: map[string]WatchCondition{ + "completing": alwaysComplete, + "watching": neverComplete, + "updating": updatingCond, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + "updating": {version: 1}, + }, + }, { + name: "conditions with incremented versions", + podUID: "running", + incrementInitialVersion: true, + watchConditions: map[string]WatchCondition{ + "completing": alwaysComplete, + "watching": neverComplete, + "updating": updatingCond, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 1}, + "updating": {version: 2}, + }, + }, { + name: "completed watch condition with older version", + podUID: "running", + incrementInitialVersion: true, + watchConditions: map[string]WatchCondition{ + "resetting": resettingCond, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "resetting": {version: 0}, + }, + }, { + name: "non-existent pod", + podUID: "non-existent", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: false, + expectRemoved: true, + }, { + name: "terminated pod", + podUID: "terminated", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: false, + expectRemoved: true, + }, { + name: "reinspecting pod", + podUID: "reinspect", + watchConditions: map[string]WatchCondition{ + "watching": neverComplete, + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + }, + }, { + name: "single container conditions", + podUID: "running", + watchConditions: map[string]WatchCondition{ + "completing": makeContainerCond("c", true), + "watching": makeContainerCond("c", false), + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching": {version: 0}, + }, + }, { + name: "multi-container conditions", + podUID: "running-2", + watchConditions: map[string]WatchCondition{ + "completing:exited": makeContainerCond("c-exited", true), + "watching:exited": makeContainerCond("c-exited", false), + "completing:running": makeContainerCond("c-running", true), + "watching:running": makeContainerCond("c-running", false), + "completing:dne": makeContainerCond("c-dne", true), + "watching:dne": makeContainerCond("c-dne", false), + }, + expectEvaluated: true, + expectWatchConditions: map[string]versionedWatchCondition{ + "watching:running": {version: 0}, + }, + }} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + runtimeMock := containertest.NewMockRuntime(t) + pleg = newTestGenericPLEGWithRuntimeMock(runtimeMock) + + // Mock pod statuses + for _, pod := range initialPods { + podStatus := &kubecontainer.PodStatus{ + ID: pod.ID, + Name: pod.Name, + Namespace: pod.Namespace, + } + for _, c := range pod.Containers { + podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &kubecontainer.Status{ + ID: c.ID, + Name: c.Name, + State: c.State, + }) + } + runtimeMock.EXPECT(). + GetPodStatus(mock.Anything, pod.ID, pod.Name, pod.Namespace). + Return(podStatus, nil).Maybe() + } + + // Setup initial pod records. + runtimeMock.EXPECT().GetPods(mock.Anything, true).Return(initialPods, nil).Once() + pleg.Relist() + pleg.podsToReinspect["reinspect"] = nil + + // Remove "terminated" pod. + runtimeMock.EXPECT().GetPods(mock.Anything, true).Return(pods, nil).Once() + + var evaluatedConditions []string + for key, condition := range test.watchConditions { + wrappedCondition := func(status *kubecontainer.PodStatus) bool { + defer func() { + if r := recover(); r != nil { + require.Fail(t, "condition error", r) + } + }() + assert.Equal(t, test.podUID, status.ID, "podUID") + if !test.expectEvaluated { + assert.Fail(t, "conditions should not be evaluated") + } else { + evaluatedConditions = append(evaluatedConditions, key) + } + return condition(status) + } + pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition) + if test.incrementInitialVersion { + // Set the watch condition a second time to increment the version. + pleg.SetPodWatchCondition(test.podUID, key, wrappedCondition) + } + } + pleg.Relist() + + if test.expectEvaluated { + assert.Len(t, evaluatedConditions, len(test.watchConditions), "all conditions should be evaluated") + } + + if test.expectRemoved { + assert.NotContains(t, pleg.watchConditions, test.podUID, "Pod should be removed from watch conditions") + } else { + actualConditions := pleg.watchConditions[test.podUID] + assert.Len(t, actualConditions, len(test.expectWatchConditions), "expected number of conditions") + for key, expected := range test.expectWatchConditions { + if !assert.Contains(t, actualConditions, key) { + continue + } + actual := actualConditions[key] + assert.Equal(t, key, actual.key) + assert.Equal(t, expected.version, actual.version) + } + } + + }) + } +} diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 0a44745925b..08625aba9cc 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -47,6 +47,8 @@ const ( PodSync PodLifeCycleEventType = "PodSync" // ContainerChanged - event type when the new state of container is unknown. ContainerChanged PodLifeCycleEventType = "ContainerChanged" + // ConditionMet - event type triggered when any number of watch conditions are met. + ConditionMet PodLifeCycleEventType = "ConditionMet" ) // PodLifecycleEvent is an event that reflects the change of the pod state. @@ -66,7 +68,10 @@ type PodLifecycleEventGenerator interface { Start() Watch() chan *PodLifecycleEvent Healthy() (bool, error) - UpdateCache(*kubecontainer.Pod, types.UID) (error, bool) + // SetPodWatchCondition flags the pod for reinspection on every Relist iteration until the watch + // condition is met. The condition is keyed so it can be updated before the condition + // is met. + SetPodWatchCondition(podUID types.UID, conditionKey string, condition WatchCondition) } // podLifecycleEventGeneratorHandler contains functions that are useful for different PLEGs @@ -77,3 +82,19 @@ type podLifecycleEventGeneratorHandler interface { Update(relistDuration *RelistDuration) Relist() } + +// WatchCondition takes the latest PodStatus, and returns whether the condition is met. +type WatchCondition = func(*kubecontainer.PodStatus) bool + +// RunningContainerWatchCondition wraps a condition on the container status to make a pod +// WatchCondition. If the container is no longer running, the condition is implicitly cleared. +func RunningContainerWatchCondition(containerName string, condition func(*kubecontainer.Status) bool) WatchCondition { + return func(podStatus *kubecontainer.PodStatus) bool { + status := podStatus.FindContainerStatusByName(containerName) + if status == nil || status.State != kubecontainer.ContainerStateRunning { + // Container isn't running. Consider the condition "completed" so it is cleared. + return true + } + return condition(status) + } +}