cacher: with expiredBookmarkWatchers

expiredBookmarkWatchers allows us to schedule the next bookmark event after dispatching not before as it was previously.
It opens a new functionality in which a watcher might decide to change when the next bookmark should be delivered based on some internal state.
This commit is contained in:
Lukasz Szaszkiewicz 2022-07-15 15:28:50 +02:00
parent e234917e0a
commit 0576f6a011

View File

@ -297,6 +297,8 @@ type Cacher struct {
watchersToStop []*cacheWatcher watchersToStop []*cacheWatcher
// Maintain a timeout queue to send the bookmark event before the watcher times out. // Maintain a timeout queue to send the bookmark event before the watcher times out.
bookmarkWatchers *watcherBookmarkTimeBuckets bookmarkWatchers *watcherBookmarkTimeBuckets
// expiredBookmarkWatchers is a list of watchers that were expired and need to be schedule for a next bookmark event
expiredBookmarkWatchers []*cacheWatcher
} }
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
@ -908,7 +910,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
} }
} }
func (c *Cacher) startDispatchingBookmarkEvents() { func (c *Cacher) startDispatchingBookmarkEventsLocked() {
// Pop already expired watchers. However, explicitly ignore stopped ones, // Pop already expired watchers. However, explicitly ignore stopped ones,
// as we don't delete watcher from bookmarkWatchers when it is stopped. // as we don't delete watcher from bookmarkWatchers when it is stopped.
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() { for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
@ -919,8 +921,7 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
continue continue
} }
c.watchersBuffer = append(c.watchersBuffer, watcher) c.watchersBuffer = append(c.watchersBuffer, watcher)
// Requeue the watcher for the next bookmark if needed. c.expiredBookmarkWatchers = append(c.expiredBookmarkWatchers, watcher)
c.bookmarkWatchers.addWatcher(watcher)
} }
} }
} }
@ -945,7 +946,7 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) {
c.watchersBuffer = c.watchersBuffer[:0] c.watchersBuffer = c.watchersBuffer[:0]
if event.Type == watch.Bookmark { if event.Type == watch.Bookmark {
c.startDispatchingBookmarkEvents() c.startDispatchingBookmarkEventsLocked()
// return here to reduce following code indentation and diff // return here to reduce following code indentation and diff
return return
} }
@ -989,6 +990,15 @@ func (c *Cacher) finishDispatching() {
watcher.stopLocked() watcher.stopLocked()
} }
c.watchersToStop = c.watchersToStop[:0] c.watchersToStop = c.watchersToStop[:0]
for _, watcher := range c.expiredBookmarkWatchers {
if watcher.stopped {
continue
}
// requeue the watcher for the next bookmark if needed.
c.bookmarkWatchers.addWatcher(watcher)
}
c.expiredBookmarkWatchers = c.expiredBookmarkWatchers[:0]
} }
func (c *Cacher) terminateAllWatchers() { func (c *Cacher) terminateAllWatchers() {
@ -1245,12 +1255,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) { func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
// We try to send bookmarks: // We try to send bookmarks:
// (a) roughly every minute //
// (b) right before the watcher timeout - for now we simply set it 2s before // (a) right before the watcher timeout - for now we simply set it 2s before
// the deadline // the deadline
// The former gives us periodicity if the watch breaks due to unexpected //
// conditions, the later ensures that on timeout the watcher is as close to // (b) roughly every minute
//
// (b) gives us periodicity if the watch breaks due to unexpected
// conditions, (a) ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases. // now as possible - this covers 99% of cases.
heartbeatTime := now.Add(bookmarkFrequency) heartbeatTime := now.Add(bookmarkFrequency)
if c.deadline.IsZero() { if c.deadline.IsZero() {
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by // Timeout is set by our client libraries (e.g. reflector) as well as defaulted by