diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 59dfe8057b0..aadd33d392b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -823,6 +823,37 @@ func (c *Cacher) dispatchEvents() { } } +func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) { + switch event.Type { + case watch.Added, watch.Modified: + if object, err := newCachingObject(event.Object); err == nil { + event.Object = object + } else { + klog.Errorf("couldn't create cachingObject from: %#v", event.Object) + } + // Don't wrap PrevObject for update event (for create events it is nil). + // We only encode those to deliver DELETE watch events, so if + // event.Object is not nil it can be used only for watchers for which + // selector was satisfied for its previous version and is no longer + // satisfied for the current version. + // This is rare enough that it doesn't justify making deep-copy of the + // object (done by newCachingObject) every time. + case watch.Deleted: + // Don't wrap Object for delete events - these are not to deliver any + // events. Only wrap PrevObject. + if object, err := newCachingObject(event.PrevObject); err == nil { + // Update resource version of the underlying object. + // event.PrevObject is used to deliver DELETE watch events and + // for them, we set resourceVersion to instead of + // the resourceVersion of the last modification of the object. + updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion) + event.PrevObject = object + } else { + klog.Errorf("couldn't create cachingObject from: %#v", event.Object) + } + } +} + func (c *Cacher) dispatchEvent(event *watchCacheEvent) { c.startDispatching(event) defer c.finishDispatching() @@ -836,6 +867,23 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { watcher.nonblockingAdd(event) } } else { + // Set up caching of object serializations only for dispatching this event. + // + // Storing serializations in memory would result in increased memory usage, + // but it would help for caching encodings for watches started from old + // versions. However, we still don't have a convincing data that the gain + // from it justifies increased memory usage, so for now we drop the cached + // serializations after dispatching this event. + // + // Given the deep-copies that are done to create cachingObjects, + // we try to cache serializations only if there are at least 3 watchers. + if len(c.watchersBuffer) >= 3 { + // Make a shallow copy to allow overwriting Object and PrevObject. + wcEvent := *event + setCachingObjects(&wcEvent, c.versioner) + event = &wcEvent + } + c.blockedWatchers = c.blockedWatchers[:0] for _, watcher := range c.watchersBuffer { if !watcher.nonblockingAdd(event) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 31984b9bc2e..cd5d0f5f3a7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -998,3 +998,104 @@ func TestCachingDeleteEvents(t *testing.T) { verifyEvents(t, fooEventsWatcher, fooEvents) verifyEvents(t, barEventsWatcher, barEvents) } + +func testCachingObjects(t *testing.T, watchersCount int) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage, 10) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + dispatchedEvents := []*watchCacheEvent{} + cacher.watchCache.eventHandler = func(event *watchCacheEvent) { + dispatchedEvents = append(dispatchedEvents, event) + cacher.processEvent(event) + } + + watchers := make([]watch.Interface, 0, watchersCount) + for i := 0; i < watchersCount; i++ { + w, err := cacher.Watch(context.TODO(), "pods/ns", "1000", storage.Everything) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + defer w.Stop() + watchers = append(watchers, w) + } + + makePod := func(name, rv string) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "ns", + ResourceVersion: rv, + }, + } + } + pod1 := makePod("pod", "1001") + pod2 := makePod("pod", "1002") + pod3 := makePod("pod", "1003") + + cacher.watchCache.Add(pod1) + cacher.watchCache.Update(pod2) + cacher.watchCache.Delete(pod3) + + // At this point, we already have dispatchedEvents fully propagated. + + verifyEvents := func(w watch.Interface) { + var event watch.Event + for index := range dispatchedEvents { + select { + case event = <-w.ResultChan(): + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout watiching for the event") + } + + var object runtime.Object + if watchersCount >= 3 { + if _, ok := event.Object.(runtime.CacheableObject); !ok { + t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object) + } + object = event.Object.(runtime.CacheableObject).GetObject() + } else { + if _, ok := event.Object.(runtime.CacheableObject); ok { + t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object) + } + object = event.Object.DeepCopyObject() + } + + if event.Type == watch.Deleted { + resourceVersion, err := cacher.versioner.ObjectResourceVersion(cacher.watchCache.cache[index].PrevObject) + if err != nil { + t.Fatalf("Failed to parse resource version: %v", err) + } + updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion) + } + + var e runtime.Object + switch event.Type { + case watch.Added, watch.Modified: + e = cacher.watchCache.cache[index].Object + case watch.Deleted: + e = cacher.watchCache.cache[index].PrevObject + default: + t.Errorf("unexpected watch event: %#v", event) + } + if a := object; !reflect.DeepEqual(a, e) { + t.Errorf("event object messed up for %s: %#v, expected: %#v", event.Type, a, e) + } + } + } + + for i := range watchers { + verifyEvents(watchers[i]) + } +} + +func TestCachingObjects(t *testing.T) { + t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) }) + t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 65c6b31f882..0c33b57fe42 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -210,37 +210,6 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob return object, resourceVersion, nil } -func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) { - switch event.Type { - case watch.Added, watch.Modified: - if object, err := newCachingObject(event.Object); err == nil { - event.Object = object - } else { - klog.Errorf("couldn't create cachingObject from: %#v", event.Object) - } - // Don't wrap PrevObject for update event (for create events it is nil). - // We only encode those to deliver DELETE watch events, so if - // event.Object is not nil it can be used only for watchers for which - // selector was satisfied for its previous version and is no longer - // satisfied for the current version. - // This is rare enough that it doesn't justify making deep-copy of the - // object (done by newCachingObject) every time. - case watch.Deleted: - // Don't wrap Object for delete events - these are not to deliver any - // events. Only wrap PrevObject. - if object, err := newCachingObject(event.PrevObject); err == nil { - // Update resource version of the underlying object. - // event.PrevObject is used to deliver DELETE watch events and - // for them, we set resourceVersion to instead of - // the resourceVersion of the last modification of the object. - updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion) - event.PrevObject = object - } else { - klog.Errorf("couldn't create cachingObject from: %#v", event.Object) - } - } -} - // processEvent is safe as long as there is at most one call to it in flight // at any point in time. func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { @@ -295,18 +264,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd // This is safe as long as there is at most one call to processEvent in flight // at any point in time. if w.eventHandler != nil { - // Set up caching of object serializations only for dispatching this event. - // - // Storing serializations in memory would result in increased memory usage, - // but it would help for caching encodings for watches started from old - // versions. However, we still don't have a convincing data that the gain - // from it justifies increased memory usage, so for now we drop the cached - // serializations after dispatching this event. - - // Make a shallow copy to allow overwriting Object and PrevObject. - wce := *wcEvent - setCachingObjects(&wce, w.versioner) - w.eventHandler(&wce) + w.eventHandler(wcEvent) } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 860fba0b8a3..4f40134a1b4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -18,7 +18,6 @@ package cacher import ( "fmt" - "reflect" "strconv" "strings" "testing" @@ -436,53 +435,3 @@ func TestReflectorForWatchCache(t *testing.T) { } } } - -func TestCachingObjects(t *testing.T) { - store := newTestWatchCache(5) - - index := 0 - store.eventHandler = func(event *watchCacheEvent) { - switch event.Type { - case watch.Added, watch.Modified: - if _, ok := event.Object.(runtime.CacheableObject); !ok { - t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object) - } - if _, ok := event.PrevObject.(runtime.CacheableObject); ok { - t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object) - } - case watch.Deleted: - if _, ok := event.Object.(runtime.CacheableObject); ok { - t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object) - } - if _, ok := event.PrevObject.(runtime.CacheableObject); !ok { - t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object) - } - } - - // Verify that delivered event is the same as cached one modulo Object/PrevObject. - switch event.Type { - case watch.Added, watch.Modified: - event.Object = event.Object.(runtime.CacheableObject).GetObject() - case watch.Deleted: - event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject() - // In events store in watchcache, we also don't update ResourceVersion. - // So we need to ensure that we don't fail on it. - resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject) - if err != nil { - t.Fatalf("Failed to parse resource version: %v", err) - } - updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion) - } - if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) { - t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e) - } - index++ - } - - pod1 := makeTestPod("pod", 1) - pod2 := makeTestPod("pod", 2) - pod3 := makeTestPod("pod", 3) - store.Add(pod1) - store.Update(pod2) - store.Delete(pod3) -}