mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #111244 from p0lyn0mial/cacher-expired-watchers
cacher: indroduce expiredBookmarkWatchers
This commit is contained in:
commit
3992eda8e6
@ -297,6 +297,8 @@ type Cacher struct {
|
||||
watchersToStop []*cacheWatcher
|
||||
// Maintain a timeout queue to send the bookmark event before the watcher times out.
|
||||
bookmarkWatchers *watcherBookmarkTimeBuckets
|
||||
// expiredBookmarkWatchers is a list of watchers that were expired and need to be schedule for a next bookmark event
|
||||
expiredBookmarkWatchers []*cacheWatcher
|
||||
}
|
||||
|
||||
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
|
||||
@ -908,7 +910,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cacher) startDispatchingBookmarkEvents() {
|
||||
func (c *Cacher) startDispatchingBookmarkEventsLocked() {
|
||||
// Pop already expired watchers. However, explicitly ignore stopped ones,
|
||||
// as we don't delete watcher from bookmarkWatchers when it is stopped.
|
||||
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
|
||||
@ -919,8 +921,7 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
|
||||
continue
|
||||
}
|
||||
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
||||
// Requeue the watcher for the next bookmark if needed.
|
||||
c.bookmarkWatchers.addWatcher(watcher)
|
||||
c.expiredBookmarkWatchers = append(c.expiredBookmarkWatchers, watcher)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -945,7 +946,7 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) {
|
||||
c.watchersBuffer = c.watchersBuffer[:0]
|
||||
|
||||
if event.Type == watch.Bookmark {
|
||||
c.startDispatchingBookmarkEvents()
|
||||
c.startDispatchingBookmarkEventsLocked()
|
||||
// return here to reduce following code indentation and diff
|
||||
return
|
||||
}
|
||||
@ -989,6 +990,15 @@ func (c *Cacher) finishDispatching() {
|
||||
watcher.stopLocked()
|
||||
}
|
||||
c.watchersToStop = c.watchersToStop[:0]
|
||||
|
||||
for _, watcher := range c.expiredBookmarkWatchers {
|
||||
if watcher.stopped {
|
||||
continue
|
||||
}
|
||||
// requeue the watcher for the next bookmark if needed.
|
||||
c.bookmarkWatchers.addWatcher(watcher)
|
||||
}
|
||||
c.expiredBookmarkWatchers = c.expiredBookmarkWatchers[:0]
|
||||
}
|
||||
|
||||
func (c *Cacher) terminateAllWatchers() {
|
||||
@ -1245,12 +1255,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
||||
|
||||
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
|
||||
// We try to send bookmarks:
|
||||
// (a) roughly every minute
|
||||
// (b) right before the watcher timeout - for now we simply set it 2s before
|
||||
//
|
||||
// (a) right before the watcher timeout - for now we simply set it 2s before
|
||||
// the deadline
|
||||
// The former gives us periodicity if the watch breaks due to unexpected
|
||||
// conditions, the later ensures that on timeout the watcher is as close to
|
||||
//
|
||||
// (b) roughly every minute
|
||||
//
|
||||
// (b) gives us periodicity if the watch breaks due to unexpected
|
||||
// conditions, (a) ensures that on timeout the watcher is as close to
|
||||
// now as possible - this covers 99% of cases.
|
||||
|
||||
heartbeatTime := now.Add(bookmarkFrequency)
|
||||
if c.deadline.IsZero() {
|
||||
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
|
||||
|
Loading…
Reference in New Issue
Block a user