diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 6df0fd135e0..84b8ba8403b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -51,6 +51,9 @@ const ( // storageWatchListPageSize is the cacher's request chunk size of // initial and resync watch lists to storage. storageWatchListPageSize = int64(10000) + // defaultBookmarkFrequency defines how frequently watch bookmarks should be send + // in addition to sending a bookmark right before watch deadline + defaultBookmarkFrequency = time.Minute ) // Config contains the configuration for a given Cache. @@ -154,24 +157,26 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache // second in a bucket, and pop up them once at the timeout. To be more specific, // if you set fire time at X, you can get the bookmark within (X-1,X+1) period. type watcherBookmarkTimeBuckets struct { - lock sync.Mutex - watchersBuckets map[int64][]*cacheWatcher - startBucketID int64 - clock clock.Clock + lock sync.Mutex + watchersBuckets map[int64][]*cacheWatcher + startBucketID int64 + clock clock.Clock + bookmarkFrequency time.Duration } -func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets { +func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) *watcherBookmarkTimeBuckets { return &watcherBookmarkTimeBuckets{ - watchersBuckets: make(map[int64][]*cacheWatcher), - startBucketID: clock.Now().Unix(), - clock: clock, + watchersBuckets: make(map[int64][]*cacheWatcher), + startBucketID: clock.Now().Unix(), + clock: clock, + bookmarkFrequency: bookmarkFrequency, } } // adds a watcher to the bucket, if the deadline is before the start, it will be // added to the first one. func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool { - nextTime, ok := w.nextBookmarkTime(t.clock.Now()) + nextTime, ok := w.nextBookmarkTime(t.clock.Now(), t.bookmarkFrequency) if !ok { return false } @@ -339,7 +344,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { stopCh: stopCh, clock: clock, timer: time.NewTimer(time.Duration(0)), - bookmarkWatchers: newTimeBucketWatchers(clock), + bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency), } // Ensure that timer is stopped. @@ -914,9 +919,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() { continue } c.watchersBuffer = append(c.watchersBuffer, watcher) - // Given that we send bookmark event once at deadline-2s, never push again - // after the watcher pops up from the buckets. Once we decide to change the - // strategy to more sophisticated, we may need it here. + // Requeue the watcher for the next bookmark if needed. + c.bookmarkWatchers.addWatcher(watcher) } } } @@ -1219,13 +1223,28 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { } } -func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) { - // For now we return 2s before deadline (and maybe +infinity is now already passed this time) - // but it gives us extensibility for the future(false when deadline is not set). +func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) { + // We try to send bookmarks: + // (a) roughly every minute + // (b) right before the watcher timeout - for now we simply set it 2s before + // the deadline + // The former gives us periodicity if the watch breaks due to unexpected + // conditions, the later ensures that on timeout the watcher is as close to + // now as possible - this covers 99% of cases. + heartbeatTime := now.Add(bookmarkFrequency) if c.deadline.IsZero() { - return c.deadline, false + // Timeout is set by our client libraries (e.g. reflector) as well as defaulted by + // apiserver if properly configured. So this shoudln't happen in practice. + return heartbeatTime, true } - return c.deadline.Add(-2 * time.Second), true + if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) { + heartbeatTime = pretimeoutTime + } + + if heartbeatTime.Before(now) { + return time.Time{}, false + } + return heartbeatTime, true } func getEventObject(object runtime.Object) runtime.Object { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 133e4ccc49d..18abd0a602f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -558,7 +558,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) { } clock := clock.NewFakeClock(time.Now()) - watchers := newTimeBucketWatchers(clock) + watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency) now := clock.Now() watchers.addWatcher(newWatcher(now.Add(10 * time.Second))) watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) @@ -746,6 +746,77 @@ func TestCacherSendBookmarkEvents(t *testing.T) { } } +func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + // Update bookmarkFrequency to speed up test. + // Note that the frequency lower than 1s doesn't change much due to + // resolution how frequency we recompute. + cacher.bookmarkWatchers.bookmarkFrequency = time.Second + + // Wait until cacher is initialized. + cacher.ready.wait() + pred := storage.Everything + pred.AllowWatchBookmarks = true + + makePod := func(index int) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", index), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%v", 100+index), + }, + } + } + + // Create pod to initialize watch cache. + if err := cacher.watchCache.Add(makePod(0)); err != nil { + t.Fatalf("failed to add a pod: %v", err) + } + + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + w, err := cacher.Watch(ctx, "pods/ns", "100", pred) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + // Create one more pod, to ensure that current RV is higher and thus + // bookmarks will be delievere (events are delivered for RV higher + // than the max from init events). + if err := cacher.watchCache.Add(makePod(1)); err != nil { + t.Fatalf("failed to add a pod: %v", err) + } + + timeoutCh := time.After(5 * time.Second) + lastObservedRV := uint64(0) + // Ensure that a watcher gets two bookmarks. + for observedBookmarks := 0; observedBookmarks < 2; { + select { + case event, ok := <-w.ResultChan(): + if !ok { + t.Fatal("Unexpected closed") + } + rv, err := cacher.versioner.ObjectResourceVersion(event.Object) + if err != nil { + t.Errorf("failed to parse resource version from %#v: %v", event.Object, err) + } + if event.Type == watch.Bookmark { + observedBookmarks++ + if rv < lastObservedRV { + t.Errorf("Unexpected bookmark event resource version %v (last %v)", rv, lastObservedRV) + } + } + lastObservedRV = rv + case <-timeoutCh: + t.Fatal("Unexpected timeout to receive bookmark events") + } + } +} + func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() backingStorage := &dummyStorage{} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 040f621d440..c9e0a78bd1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -855,12 +855,12 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) { allowWatchBookmark bool }{ { // test old client won't get Bookmark event - timeout: 2 * time.Second, + timeout: 3 * time.Second, expected: false, allowWatchBookmark: false, }, { - timeout: 2 * time.Second, + timeout: 3 * time.Second, expected: true, allowWatchBookmark: true, }, @@ -909,7 +909,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { pred := storage.Everything pred.AllowWatchBookmarks = true - ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred) if err != nil { t.Fatalf("Unexpected error: %v", err)