Merge pull request #128453 from tallclair/cacheless-pleg

Cleanup unused cacheless PLEG code
This commit is contained in:
Kubernetes Prow Robot 2024-11-06 06:59:35 +00:00 committed by GitHub
commit aafcf4e932
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 52 additions and 58 deletions

View File

@ -121,6 +121,9 @@ type podRecords map[types.UID]*podRecord
func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
relistDuration *RelistDuration, cache kubecontainer.Cache,
clock clock.Clock) PodLifecycleEventGenerator {
if cache == nil {
panic("cache cannot be nil")
}
return &GenericPLEG{
logger: logger,
relistDuration: relistDuration,
@ -265,45 +268,42 @@ func (g *GenericPLEG) Relist() {
}
}
var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}
needsReinspection := make(map[types.UID]*kubecontainer.Pod)
// If there are events associated with a pod, we should update the
// podCache.
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
// updateCache() will inspect the pod and update the cache. If an
// error occurs during the inspection, we want PLEG to retry again
// in the next relist. To achieve this, we do not update the
// associated podRecord of the pod, so that the change will be
// detect again in the next relist.
// TODO: If many pods changed during the same relist period,
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
// updateCache() will inspect the pod and update the cache. If an
// error occurs during the inspection, we want PLEG to retry again
// in the next relist. To achieve this, we do not update the
// associated podRecord of the pod, so that the change will be
// detect again in the next relist.
// TODO: If many pods changed during the same relist period,
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
continue
} else {
// this pod was in the list to reinspect and we did so because it had events, so remove it
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if !updated {
continue
}
// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
continue
} else {
// this pod was in the list to reinspect and we did so because it had events, so remove it
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if !updated {
continue
}
}
}
// Update the internal storage and send out the events.
g.podRecords.update(pid)
@ -324,7 +324,7 @@ func (g *GenericPLEG) Relist() {
// Log exit code of containers when they finished in a particular event
if events[i].Type == ContainerDied {
// Fill up containerExitCode map for ContainerDied event when first time appeared
if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
if len(containerExitCode) == 0 && pod != nil {
// Get updated podStatus
status, err := g.cache.Get(pod.ID)
if err == nil {
@ -342,24 +342,22 @@ func (g *GenericPLEG) Relist() {
}
}
if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
}
// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
}
// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
// make sure we retain the list of pods that need reinspecting the next time relist is called
g.podsToReinspect = needsReinspection
}
@ -402,10 +400,6 @@ func computeEvents(logger klog.Logger, oldPod, newPod *kubecontainer.Pod, cid *k
return generateEvents(logger, pid, cid.ID, oldState, newState)
}
func (g *GenericPLEG) cacheEnabled() bool {
return g.cache != nil
}
// getPodIP preserves an older cached status' pod IP if the new status has no pod IPs
// and its sandboxes have exited
func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string {
@ -488,9 +482,6 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
ctx := context.Background()
if !g.cacheEnabled() {
return fmt.Errorf("pod cache disabled"), false
}
if pod == nil {
return fmt.Errorf("pod cannot be nil"), false
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@ -57,16 +58,18 @@ func newTestGenericPLEG() *TestGenericPLEG {
func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG {
fakeRuntime := &containertest.FakeRuntime{}
fakeCache := containertest.NewFakeCache(fakeRuntime)
clock := testingclock.NewFakeClock(time.Time{})
// The channel capacity should be large enough to hold all events in a
// single test.
pleg := &GenericPLEG{
relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 3 * time.Minute},
runtime: fakeRuntime,
eventChannel: make(chan *PodLifecycleEvent, eventChannelCap),
podRecords: make(podRecords),
clock: clock,
}
pleg := NewGenericPLEG(
klog.Logger{},
fakeRuntime,
make(chan *PodLifecycleEvent, eventChannelCap),
&RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 3 * time.Minute},
fakeCache,
clock,
).(*GenericPLEG)
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock}
}