From 0bd8104809922aae504fc9be2e853650ecd17b8a Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 29 Sep 2020 12:27:20 +0200 Subject: [PATCH] Watch bookmarks may contain version of objects of other types --- .../k8s.io/apiserver/pkg/storage/cacher/BUILD | 3 - .../apiserver/pkg/storage/cacher/cacher.go | 14 +++- .../storage/cacher/cacher_whitebox_test.go | 70 +++++++++++++++++-- .../pkg/storage/cacher/watch_cache.go | 34 +++++---- 4 files changed, 97 insertions(+), 24 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD index 16254390930..56fe89a25a9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD @@ -65,12 +65,9 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 83874945961..dc1e9456eb7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -793,7 +793,19 @@ func (c *Cacher) dispatchEvents() { if !ok { return } - c.dispatchEvent(&event) + // Don't dispatch bookmarks coming from the storage layer. + // They can be very frequent (even to the level of subseconds) + // to allow efficient watch resumption on kube-apiserver restarts, + // and propagating them down may overload the whole system. + // + // TODO: If at some point we decide the performance and scalability + // footprint is acceptable, this is the place to hook them in. + // However, we then need to check if this was called as a result + // of a bookmark event or regular Add/Update/Delete operation by + // checking if resourceVersion here has changed. + if event.Type != watch.Bookmark { + c.dispatchEvent(&event) + } lastProcessedResourceVersion = event.ResourceVersion case <-bookmarkTimer.C(): bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index c13c1140672..5f5b0a1838f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -41,10 +41,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" - "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" - utilfeature "k8s.io/apiserver/pkg/util/feature" - featuregatetesting "k8s.io/component-base/featuregate/testing" ) var ( @@ -637,7 +634,6 @@ func TestTimeBucketWatchersBasic(t *testing.T) { } func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) if err != nil { @@ -865,7 +861,6 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { } func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) if err != nil { @@ -938,6 +933,71 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { } } +func TestBookmarksOnResourceVersionUpdates(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Ensure that bookmarks are sent more frequently than every 1m. + cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second) + + // Wait until cacher is initialized. + cacher.ready.wait() + + makePod := func(i int) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", i), + }, + } + } + if err := cacher.watchCache.Add(makePod(1000)); err != nil { + t.Errorf("error: %v", err) + } + + pred := storage.Everything + pred.AllowWatchBookmarks = true + + w, err := cacher.Watch(context.TODO(), "/pods/ns", storage.ListOptions{ + ResourceVersion: "1000", + Predicate: pred, + }) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + expectedRV := 2000 + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + event, ok := <-w.ResultChan() + if !ok { + t.Fatalf("Unexpected closed channel") + } + rv, err := cacher.versioner.ObjectResourceVersion(event.Object) + if err != nil { + t.Errorf("failed to parse resource version from %#v: %v", event.Object, err) + } + if event.Type == watch.Bookmark && rv == uint64(expectedRV) { + return + } + } + }() + + // Simulate progress notify event. + cacher.watchCache.UpdateResourceVersion(strconv.Itoa(expectedRV)) + + wg.Wait() +} + func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 1035d4a700a..dafcb399636 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -320,8 +320,9 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd } // Avoid calling event handler under lock. - // This is safe as long as there is at most one call to processEvent in flight - // at any point in time. + // This is safe as long as there is at most one call to Add/Update/Delete and + // UpdateResourceVersion in flight at any point in time, which is true now, + // because reflector calls them synchronously from its main thread. if w.eventHandler != nil { w.eventHandler(wcEvent) } @@ -388,20 +389,23 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) { return } - w.Lock() - defer w.Unlock() - w.resourceVersion = rv + func() { + w.Lock() + defer w.Unlock() + w.resourceVersion = rv + }() - // Don't dispatch bookmarks coming from the storage layer. - // They can be very frequent (even to the level of subseconds) - // to allow efficient watch resumption on kube-apiserver restarts, - // and propagating them down may overload the whole system. - // - // TODO: If at some point we decide the performance and scalability - // footprint is acceptable, this is the place to hook them in. - // However, we then need to check if this was called as a result - // of a bookmark event or regular Add/Update/Delete operation by - // checking if resourceVersion here has changed. + // Avoid calling event handler under lock. + // This is safe as long as there is at most one call to Add/Update/Delete and + // UpdateResourceVersion in flight at any point in time, which is true now, + // because reflector calls them synchronously from its main thread. + if w.eventHandler != nil { + wcEvent := &watchCacheEvent{ + Type: watch.Bookmark, + ResourceVersion: rv, + } + w.eventHandler(wcEvent) + } } // List returns list of pointers to objects.