diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index a8082721496..41acf15ab65 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -121,6 +121,9 @@ type podRecords map[types.UID]*podRecord func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent, relistDuration *RelistDuration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator { + if cache == nil { + panic("cache cannot be nil") + } return &GenericPLEG{ logger: logger, relistDuration: relistDuration, @@ -265,45 +268,42 @@ func (g *GenericPLEG) Relist() { } } - var needsReinspection map[types.UID]*kubecontainer.Pod - if g.cacheEnabled() { - needsReinspection = make(map[types.UID]*kubecontainer.Pod) - } + needsReinspection := make(map[types.UID]*kubecontainer.Pod) // If there are events associated with a pod, we should update the // podCache. for pid, events := range eventsByPodID { pod := g.podRecords.getCurrent(pid) - if g.cacheEnabled() { - // updateCache() will inspect the pod and update the cache. If an - // error occurs during the inspection, we want PLEG to retry again - // in the next relist. To achieve this, we do not update the - // associated podRecord of the pod, so that the change will be - // detect again in the next relist. - // TODO: If many pods changed during the same relist period, - // inspecting the pod and getting the PodStatus to update the cache - // serially may take a while. We should be aware of this and - // parallelize if needed. - if err, updated := g.updateCache(ctx, pod, pid); err != nil { - // Rely on updateCache calling GetPodStatus to log the actual error. - g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name)) - // make sure we try to reinspect the pod during the next relisting - needsReinspection[pid] = pod + // updateCache() will inspect the pod and update the cache. If an + // error occurs during the inspection, we want PLEG to retry again + // in the next relist. To achieve this, we do not update the + // associated podRecord of the pod, so that the change will be + // detect again in the next relist. + // TODO: If many pods changed during the same relist period, + // inspecting the pod and getting the PodStatus to update the cache + // serially may take a while. We should be aware of this and + // parallelize if needed. + if err, updated := g.updateCache(ctx, pod, pid); err != nil { + // Rely on updateCache calling GetPodStatus to log the actual error. + g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name)) - continue - } else { - // this pod was in the list to reinspect and we did so because it had events, so remove it - // from the list (we don't want the reinspection code below to inspect it a second time in - // this relist execution) - delete(g.podsToReinspect, pid) - if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { - if !updated { - continue - } + // make sure we try to reinspect the pod during the next relisting + needsReinspection[pid] = pod + + continue + } else { + // this pod was in the list to reinspect and we did so because it had events, so remove it + // from the list (we don't want the reinspection code below to inspect it a second time in + // this relist execution) + delete(g.podsToReinspect, pid) + if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { + if !updated { + continue } } } + // Update the internal storage and send out the events. g.podRecords.update(pid) @@ -324,7 +324,7 @@ func (g *GenericPLEG) Relist() { // Log exit code of containers when they finished in a particular event if events[i].Type == ContainerDied { // Fill up containerExitCode map for ContainerDied event when first time appeared - if len(containerExitCode) == 0 && pod != nil && g.cache != nil { + if len(containerExitCode) == 0 && pod != nil { // Get updated podStatus status, err := g.cache.Get(pod.ID) if err == nil { @@ -342,24 +342,22 @@ func (g *GenericPLEG) Relist() { } } - if g.cacheEnabled() { - // reinspect any pods that failed inspection during the previous relist - if len(g.podsToReinspect) > 0 { - g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection") - for pid, pod := range g.podsToReinspect { - if err, _ := g.updateCache(ctx, pod, pid); err != nil { - // Rely on updateCache calling GetPodStatus to log the actual error. - g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name)) - needsReinspection[pid] = pod - } + // reinspect any pods that failed inspection during the previous relist + if len(g.podsToReinspect) > 0 { + g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection") + for pid, pod := range g.podsToReinspect { + if err, _ := g.updateCache(ctx, pod, pid); err != nil { + // Rely on updateCache calling GetPodStatus to log the actual error. + g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name)) + needsReinspection[pid] = pod } } - - // Update the cache timestamp. This needs to happen *after* - // all pods have been properly updated in the cache. - g.cache.UpdateTime(timestamp) } + // Update the cache timestamp. This needs to happen *after* + // all pods have been properly updated in the cache. + g.cache.UpdateTime(timestamp) + // make sure we retain the list of pods that need reinspecting the next time relist is called g.podsToReinspect = needsReinspection } @@ -402,10 +400,6 @@ func computeEvents(logger klog.Logger, oldPod, newPod *kubecontainer.Pod, cid *k return generateEvents(logger, pid, cid.ID, oldState, newState) } -func (g *GenericPLEG) cacheEnabled() bool { - return g.cache != nil -} - // getPodIP preserves an older cached status' pod IP if the new status has no pod IPs // and its sandboxes have exited func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string { @@ -488,9 +482,6 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) { ctx := context.Background() - if !g.cacheEnabled() { - return fmt.Errorf("pod cache disabled"), false - } if pod == nil { return fmt.Errorf("pod cannot be nil"), false } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index d80848f3d50..16d5fad4d9e 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/component-base/metrics/testutil" + "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/metrics" @@ -57,16 +58,18 @@ func newTestGenericPLEG() *TestGenericPLEG { func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG { fakeRuntime := &containertest.FakeRuntime{} + fakeCache := containertest.NewFakeCache(fakeRuntime) clock := testingclock.NewFakeClock(time.Time{}) // The channel capacity should be large enough to hold all events in a // single test. - pleg := &GenericPLEG{ - relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 3 * time.Minute}, - runtime: fakeRuntime, - eventChannel: make(chan *PodLifecycleEvent, eventChannelCap), - podRecords: make(podRecords), - clock: clock, - } + pleg := NewGenericPLEG( + klog.Logger{}, + fakeRuntime, + make(chan *PodLifecycleEvent, eventChannelCap), + &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 3 * time.Minute}, + fakeCache, + clock, + ).(*GenericPLEG) return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock} }