Revert "Address feedback"

This reverts commit 7aa7062892.
This commit is contained in:
Hironori Shiina 2024-09-10 10:15:22 +02:00
parent 7aa7062892
commit 05605d7ca4
2 changed files with 8 additions and 11 deletions

View File

@ -184,9 +184,6 @@ const (
eventedPlegRelistPeriod = time.Second * 300 eventedPlegRelistPeriod = time.Second * 300
eventedPlegRelistThreshold = time.Minute * 10 eventedPlegRelistThreshold = time.Minute * 10
eventedPlegMaxStreamRetries = 5 eventedPlegMaxStreamRetries = 5
// Evented PLEG needs to update the global timestamp of the cache as frequently as Generic PLEG relisting
// in order to wake up pod workers that get stuck in cache.GetNewerThan().
eventedPlegCacheUpdatePeriod = genericPlegRelistPeriod
// backOffPeriod is the period to back off when pod syncing results in an // backOffPeriod is the period to back off when pod syncing results in an
// error. It is also used as the base period for the exponential backoff // error. It is also used as the base period for the exponential backoff
@ -739,7 +736,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
RelistThreshold: genericPlegRelistThreshold, RelistThreshold: genericPlegRelistThreshold,
} }
klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel, klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}, eventedPlegCacheUpdatePeriod) klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -31,6 +31,11 @@ import (
"k8s.io/utils/clock" "k8s.io/utils/clock"
) )
// The frequency with which global timestamp of the cache is to
// is to be updated periodically. If pod workers get stuck at cache.GetNewerThan
// call, after this period it will be unblocked.
const globalCacheUpdatePeriod = 1 * time.Second
var ( var (
eventedPLEGUsage = false eventedPLEGUsage = false
eventedPLEGUsageMu = sync.RWMutex{} eventedPLEGUsageMu = sync.RWMutex{}
@ -71,10 +76,6 @@ type EventedPLEG struct {
eventedPlegMaxStreamRetries int eventedPlegMaxStreamRetries int
// Indicates relisting related parameters // Indicates relisting related parameters
relistDuration *RelistDuration relistDuration *RelistDuration
// The frequency with which global timestamp of the cache is to
// is to be updated periodically. If pod workers get stuck at cache.GetNewerThan
// call, after this period it will be unblocked.
globalCacheUpdatePeriod time.Duration
// Stop the Evented PLEG by closing the channel. // Stop the Evented PLEG by closing the channel.
stopCh chan struct{} stopCh chan struct{}
// Stops the periodic update of the cache global timestamp. // Stops the periodic update of the cache global timestamp.
@ -86,7 +87,7 @@ type EventedPLEG struct {
// NewEventedPLEG instantiates a new EventedPLEG object and return it. // NewEventedPLEG instantiates a new EventedPLEG object and return it.
func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent, func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int, cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int,
relistDuration *RelistDuration, clock clock.Clock, cacheUpdatePeriod time.Duration) (PodLifecycleEventGenerator, error) { relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) {
handler, ok := genericPleg.(podLifecycleEventGeneratorHandler) handler, ok := genericPleg.(podLifecycleEventGeneratorHandler)
if !ok { if !ok {
return nil, fmt.Errorf("%v doesn't implement podLifecycleEventGeneratorHandler interface", genericPleg) return nil, fmt.Errorf("%v doesn't implement podLifecycleEventGeneratorHandler interface", genericPleg)
@ -100,7 +101,6 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru
eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries, eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries,
relistDuration: relistDuration, relistDuration: relistDuration,
clock: clock, clock: clock,
globalCacheUpdatePeriod: cacheUpdatePeriod,
}, nil }, nil
} }
@ -125,7 +125,7 @@ func (e *EventedPLEG) Start() {
e.stopCh = make(chan struct{}) e.stopCh = make(chan struct{})
e.stopCacheUpdateCh = make(chan struct{}) e.stopCacheUpdateCh = make(chan struct{})
go wait.Until(e.watchEventsChannel, 0, e.stopCh) go wait.Until(e.watchEventsChannel, 0, e.stopCh)
go wait.Until(e.updateGlobalCache, e.globalCacheUpdatePeriod, e.stopCacheUpdateCh) go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
} }
// Stop stops the Evented PLEG // Stop stops the Evented PLEG