diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 26b7fe12d11..17179571de7 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -184,9 +184,6 @@ const ( eventedPlegRelistPeriod = time.Second * 300 eventedPlegRelistThreshold = time.Minute * 10 eventedPlegMaxStreamRetries = 5 - // Evented PLEG needs to update the global timestamp of the cache as frequently as Generic PLEG relisting - // in order to wake up pod workers that get stuck in cache.GetNewerThan(). - eventedPlegCacheUpdatePeriod = genericPlegRelistPeriod // backOffPeriod is the period to back off when pod syncing results in an // error. It is also used as the base period for the exponential backoff @@ -739,7 +736,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, RelistThreshold: genericPlegRelistThreshold, } klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel, - klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}, eventedPlegCacheUpdatePeriod) + klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}) if err != nil { return nil, err } diff --git a/pkg/kubelet/pleg/evented.go b/pkg/kubelet/pleg/evented.go index 196bd711e40..eb5cc1d26c7 100644 --- a/pkg/kubelet/pleg/evented.go +++ b/pkg/kubelet/pleg/evented.go @@ -31,6 +31,11 @@ import ( "k8s.io/utils/clock" ) +// The frequency with which global timestamp of the cache is to +// is to be updated periodically. If pod workers get stuck at cache.GetNewerThan +// call, after this period it will be unblocked. +const globalCacheUpdatePeriod = 1 * time.Second + var ( eventedPLEGUsage = false eventedPLEGUsageMu = sync.RWMutex{} @@ -71,10 +76,6 @@ type EventedPLEG struct { eventedPlegMaxStreamRetries int // Indicates relisting related parameters relistDuration *RelistDuration - // The frequency with which global timestamp of the cache is to - // is to be updated periodically. If pod workers get stuck at cache.GetNewerThan - // call, after this period it will be unblocked. - globalCacheUpdatePeriod time.Duration // Stop the Evented PLEG by closing the channel. stopCh chan struct{} // Stops the periodic update of the cache global timestamp. @@ -86,7 +87,7 @@ type EventedPLEG struct { // NewEventedPLEG instantiates a new EventedPLEG object and return it. func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent, cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int, - relistDuration *RelistDuration, clock clock.Clock, cacheUpdatePeriod time.Duration) (PodLifecycleEventGenerator, error) { + relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) { handler, ok := genericPleg.(podLifecycleEventGeneratorHandler) if !ok { return nil, fmt.Errorf("%v doesn't implement podLifecycleEventGeneratorHandler interface", genericPleg) @@ -100,7 +101,6 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries, relistDuration: relistDuration, clock: clock, - globalCacheUpdatePeriod: cacheUpdatePeriod, }, nil } @@ -125,7 +125,7 @@ func (e *EventedPLEG) Start() { e.stopCh = make(chan struct{}) e.stopCacheUpdateCh = make(chan struct{}) go wait.Until(e.watchEventsChannel, 0, e.stopCh) - go wait.Until(e.updateGlobalCache, e.globalCacheUpdatePeriod, e.stopCacheUpdateCh) + go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh) } // Stop stops the Evented PLEG