From 76128586a24ffd66d55ac4358c29efd72b7176e4 Mon Sep 17 00:00:00 2001 From: Hironori Shiina Date: Fri, 12 Apr 2024 16:48:37 +0200 Subject: [PATCH 1/5] Pass event created timestamp correctly to cache `CreatedAt` timestamp of `ContainerEventResponse` should be passed as nanoseconds to `time.Unix()`. --- pkg/kubelet/pleg/evented.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index ef44ff5c994..90e880f0dc6 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -264,7 +264,7 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap } shouldSendPLEGEvent = true } else { - if e.cache.Set(podID, status, err, time.Unix(event.GetCreatedAt(), 0)) { + if e.cache.Set(podID, status, err, time.Unix(0, event.GetCreatedAt())) { shouldSendPLEGEvent = true } } From dbc47341fabfc6dcd6b6bc0854f0a59ce4095827 Mon Sep 17 00:00:00 2001 From: Hironori Shiina Date: Wed, 8 May 2024 09:49:49 +0200 Subject: [PATCH 2/5] Update global cache timestamp more frequently There are some cases where a pod worker is woken up without a cache update by the PLEG such as a pod termination. Then, the worker gets stuck in `cache.GetNewerThan()` till the global cache timestamp is updated by the PLEG. In order to unblock the stuck worker as early as the Generic PLEG, this fix makes the Evented PLEG update the global cache as frequently as the Generic PLEG. --- pkg/kubelet/pleg/evented.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index 90e880f0dc6..eb5cc1d26c7 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -34,7 +34,7 @@ import ( // The frequency with which global timestamp of the cache is to // is to be updated periodically. If pod workers get stuck at cache.GetNewerThan // call, after this period it will be unblocked. -const globalCacheUpdatePeriod = 5 * time.Second +const globalCacheUpdatePeriod = 1 * time.Second var ( eventedPLEGUsage = false From 7aa7062892a8084725b53a6f3395b76245cb94a7 Mon Sep 17 00:00:00 2001 From: Hironori Shiina Date: Mon, 26 Aug 2024 11:01:25 +0200 Subject: [PATCH 3/5] Address feedback --- pkg/kubelet/kubelet.go | 5 ++++- pkg/kubelet/pleg/evented.go | 14 +++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 17179571de7..26b7fe12d11 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -184,6 +184,9 @@ const ( eventedPlegRelistPeriod = time.Second * 300 eventedPlegRelistThreshold = time.Minute * 10 eventedPlegMaxStreamRetries = 5 + // Evented PLEG needs to update the global timestamp of the cache as frequently as Generic PLEG relisting + // in order to wake up pod workers that get stuck in cache.GetNewerThan(). + eventedPlegCacheUpdatePeriod = genericPlegRelistPeriod // backOffPeriod is the period to back off when pod syncing results in an // error. It is also used as the base period for the exponential backoff @@ -736,7 +739,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, RelistThreshold: genericPlegRelistThreshold, } klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel, - klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}) + klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}, eventedPlegCacheUpdatePeriod) if err != nil { return nil, err } diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index eb5cc1d26c7..196bd711e40 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -31,11 +31,6 @@ import ( "k8s.io/utils/clock" ) -// The frequency with which global timestamp of the cache is to -// is to be updated periodically. If pod workers get stuck at cache.GetNewerThan -// call, after this period it will be unblocked. -const globalCacheUpdatePeriod = 1 * time.Second - var ( eventedPLEGUsage = false eventedPLEGUsageMu = sync.RWMutex{} @@ -76,6 +71,10 @@ type EventedPLEG struct { eventedPlegMaxStreamRetries int // Indicates relisting related parameters relistDuration *RelistDuration + // The frequency with which global timestamp of the cache is to + // is to be updated periodically. If pod workers get stuck at cache.GetNewerThan + // call, after this period it will be unblocked. + globalCacheUpdatePeriod time.Duration // Stop the Evented PLEG by closing the channel. stopCh chan struct{} // Stops the periodic update of the cache global timestamp. @@ -87,7 +86,7 @@ type EventedPLEG struct { // NewEventedPLEG instantiates a new EventedPLEG object and return it. func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent, cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int, - relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) { + relistDuration *RelistDuration, clock clock.Clock, cacheUpdatePeriod time.Duration) (PodLifecycleEventGenerator, error) { handler, ok := genericPleg.(podLifecycleEventGeneratorHandler) if !ok { return nil, fmt.Errorf("%v doesn't implement podLifecycleEventGeneratorHandler interface", genericPleg) @@ -101,6 +100,7 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries, relistDuration: relistDuration, clock: clock, + globalCacheUpdatePeriod: cacheUpdatePeriod, }, nil } @@ -125,7 +125,7 @@ func (e *EventedPLEG) Start() { e.stopCh = make(chan struct{}) e.stopCacheUpdateCh = make(chan struct{}) go wait.Until(e.watchEventsChannel, 0, e.stopCh) - go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh) + go wait.Until(e.updateGlobalCache, e.globalCacheUpdatePeriod, e.stopCacheUpdateCh) } // Stop stops the Evented PLEG From 05605d7ca4c97cbb31407413860ad83c62963be4 Mon Sep 17 00:00:00 2001 From: Hironori Shiina Date: Tue, 10 Sep 2024 10:15:22 +0200 Subject: [PATCH 4/5] Revert "Address feedback" This reverts commit 7aa7062892a8084725b53a6f3395b76245cb94a7. --- pkg/kubelet/kubelet.go | 5 +---- pkg/kubelet/pleg/evented.go | 14 +++++++------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 26b7fe12d11..17179571de7 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -184,9 +184,6 @@ const ( eventedPlegRelistPeriod = time.Second * 300 eventedPlegRelistThreshold = time.Minute * 10 eventedPlegMaxStreamRetries = 5 - // Evented PLEG needs to update the global timestamp of the cache as frequently as Generic PLEG relisting - // in order to wake up pod workers that get stuck in cache.GetNewerThan(). - eventedPlegCacheUpdatePeriod = genericPlegRelistPeriod // backOffPeriod is the period to back off when pod syncing results in an // error. It is also used as the base period for the exponential backoff @@ -739,7 +736,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, RelistThreshold: genericPlegRelistThreshold, } klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel, - klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}, eventedPlegCacheUpdatePeriod) + klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}) if err != nil { return nil, err } diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index 196bd711e40..eb5cc1d26c7 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -31,6 +31,11 @@ import ( "k8s.io/utils/clock" ) +// The frequency with which global timestamp of the cache is to +// is to be updated periodically. If pod workers get stuck at cache.GetNewerThan +// call, after this period it will be unblocked. +const globalCacheUpdatePeriod = 1 * time.Second + var ( eventedPLEGUsage = false eventedPLEGUsageMu = sync.RWMutex{} @@ -71,10 +76,6 @@ type EventedPLEG struct { eventedPlegMaxStreamRetries int // Indicates relisting related parameters relistDuration *RelistDuration - // The frequency with which global timestamp of the cache is to - // is to be updated periodically. If pod workers get stuck at cache.GetNewerThan - // call, after this period it will be unblocked. - globalCacheUpdatePeriod time.Duration // Stop the Evented PLEG by closing the channel. stopCh chan struct{} // Stops the periodic update of the cache global timestamp. @@ -86,7 +87,7 @@ type EventedPLEG struct { // NewEventedPLEG instantiates a new EventedPLEG object and return it. func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent, cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int, - relistDuration *RelistDuration, clock clock.Clock, cacheUpdatePeriod time.Duration) (PodLifecycleEventGenerator, error) { + relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) { handler, ok := genericPleg.(podLifecycleEventGeneratorHandler) if !ok { return nil, fmt.Errorf("%v doesn't implement podLifecycleEventGeneratorHandler interface", genericPleg) @@ -100,7 +101,6 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries, relistDuration: relistDuration, clock: clock, - globalCacheUpdatePeriod: cacheUpdatePeriod, }, nil } @@ -125,7 +125,7 @@ func (e *EventedPLEG) Start() { e.stopCh = make(chan struct{}) e.stopCacheUpdateCh = make(chan struct{}) go wait.Until(e.watchEventsChannel, 0, e.stopCh) - go wait.Until(e.updateGlobalCache, e.globalCacheUpdatePeriod, e.stopCacheUpdateCh) + go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh) } // Stop stops the Evented PLEG From 0289216183724d53039a8853fabdadd4600f372e Mon Sep 17 00:00:00 2001 From: Hironori Shiina Date: Tue, 10 Sep 2024 10:15:41 +0200 Subject: [PATCH 5/5] Revert "Update global cache timestamp more frequently" This reverts commit dbc47341fabfc6dcd6b6bc0854f0a59ce4095827. --- pkg/kubelet/pleg/evented.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index eb5cc1d26c7..90e880f0dc6 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -34,7 +34,7 @@ import ( // The frequency with which global timestamp of the cache is to // is to be updated periodically. If pod workers get stuck at cache.GetNewerThan // call, after this period it will be unblocked. -const globalCacheUpdatePeriod = 1 * time.Second +const globalCacheUpdatePeriod = 5 * time.Second var ( eventedPLEGUsage = false