From 25a728ae5eb0ae067d21679fad915c0555242470 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Sun, 25 Aug 2019 09:55:52 +0200 Subject: [PATCH] Cache serializations --- .../apiserver/pkg/storage/cacher/cacher.go | 50 ++++++-- .../storage/cacher/cacher_whitebox_test.go | 110 +++++++++++++++++- .../pkg/storage/cacher/watch_cache.go | 54 ++++++++- .../pkg/storage/cacher/watch_cache_test.go | 51 ++++++++ .../pkg/storage/tests/cacher_test.go | 20 +++- 5 files changed, 262 insertions(+), 23 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 69a9b236d1f..696999bf902 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -754,17 +754,25 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) { return c.storage.Count(pathPrefix) } -func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { +// baseObjectThreadUnsafe omits locking for cachingObject. +func baseObjectThreadUnsafe(object runtime.Object) runtime.Object { + if co, ok := object.(*cachingObject); ok { + return co.object + } + return object +} + +func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bool) { if c.indexedTrigger == nil { return nil, false } result := make([]string, 0, 2) - result = append(result, c.indexedTrigger.indexerFunc(event.Object)) + result = append(result, c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.Object))) if event.PrevObject == nil { return result, true } - prevTriggerValue := c.indexedTrigger.indexerFunc(event.PrevObject) + prevTriggerValue := c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.PrevObject)) if result[0] != prevTriggerValue { result = append(result, prevTriggerValue) } @@ -892,7 +900,10 @@ func (c *Cacher) startDispatchingBookmarkEvents() { // startDispatching chooses watchers potentially interested in a given event // a marks dispatching as true. func (c *Cacher) startDispatching(event *watchCacheEvent) { - triggerValues, supported := c.triggerValues(event) + // It is safe to call triggerValuesThreadUnsafe here, because at this + // point only this thread can access this event (we create a separate + // watchCacheEvent for every dispatch). + triggerValues, supported := c.triggerValuesThreadUnsafe(event) c.Lock() defer c.Unlock() @@ -1165,7 +1176,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. - klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String()) + klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String()) c.forget() } @@ -1193,6 +1204,25 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) { return c.deadline.Add(-2 * time.Second), true } +func getEventObject(object runtime.Object) runtime.Object { + if _, ok := object.(runtime.CacheableObject); ok { + // It is safe to return without deep-copy, because the underlying + // object was already deep-copied during construction. + return object + } + return object.DeepCopyObject() +} + +func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { + if _, ok := object.(*cachingObject); ok { + // We assume that for cachingObject resourceVersion was already propagated before. + return + } + if err := versioner.UpdateObject(object, resourceVersion); err != nil { + utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err)) + } +} + func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { if event.Type == watch.Bookmark { return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} @@ -1210,15 +1240,13 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event switch { case curObjPasses && !oldObjPasses: - return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()} + return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)} case curObjPasses && oldObjPasses: - return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} + return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)} case !curObjPasses && oldObjPasses: // return a delete event with the previous object content, but with the event's resource version - oldObj := event.PrevObject.DeepCopyObject() - if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil { - utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err)) - } + oldObj := getEventObject(event.PrevObject) + updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion) return &watch.Event{Type: watch.Deleted, Object: oldObj} } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 845b6238107..e73e81ce080 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -20,12 +20,14 @@ import ( "context" "fmt" "reflect" + goruntime "runtime" "strconv" "sync" "testing" "time" v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -265,7 +267,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er Versioner: testVersioner{}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, - GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil }, + GetAttrsFunc: storage.DefaultNamespaceScopedAttr, NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), @@ -452,7 +454,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { shouldContinue = false break } - rv, err := testVersioner{}.ParseResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion) + rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion()) if err != nil { t.Errorf("unexpected parsing error: %v", err) } else { @@ -906,3 +908,107 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { t.Errorf("watcher is blocked by slower one (count: %d)", eventsCount) } } + +func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event) { + _, _, line, _ := goruntime.Caller(1) + for _, expectedEvent := range events { + select { + case event := <-w.ResultChan(): + if e, a := expectedEvent.Type, event.Type; e != a { + t.Logf("(called from line %d)", line) + t.Errorf("Expected: %s, got: %s", e, a) + } + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + if e, a := expectedEvent.Object, object; !apiequality.Semantic.DeepEqual(e, a) { + t.Logf("(called from line %d)", line) + t.Errorf("Expected: %#v, got: %#v", e, a) + } + case <-time.After(wait.ForeverTestTimeout): + t.Logf("(called from line %d)", line) + t.Errorf("Timed out waiting for an event") + } + } +} + +func TestCachingDeleteEvents(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + fooPredicate := storage.SelectionPredicate{ + Label: labels.SelectorFromSet(map[string]string{"foo": "true"}), + Field: fields.Everything(), + } + barPredicate := storage.SelectionPredicate{ + Label: labels.SelectorFromSet(map[string]string{"bar": "true"}), + Field: fields.Everything(), + } + + createWatch := func(pred storage.SelectionPredicate) watch.Interface { + w, err := cacher.Watch(context.TODO(), "pods/ns", "999", pred) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + return w + } + + allEventsWatcher := createWatch(storage.Everything) + defer allEventsWatcher.Stop() + fooEventsWatcher := createWatch(fooPredicate) + defer fooEventsWatcher.Stop() + barEventsWatcher := createWatch(barPredicate) + defer barEventsWatcher.Stop() + + makePod := func(labels map[string]string, rv string) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: labels, + ResourceVersion: rv, + }, + } + } + pod1 := makePod(map[string]string{"foo": "true", "bar": "true"}, "1001") + pod2 := makePod(map[string]string{"foo": "true"}, "1002") + pod3 := makePod(map[string]string{}, "1003") + pod4 := makePod(map[string]string{}, "1004") + pod1DeletedAt2 := pod1.DeepCopyObject().(*examplev1.Pod) + pod1DeletedAt2.ResourceVersion = "1002" + pod2DeletedAt3 := pod2.DeepCopyObject().(*examplev1.Pod) + pod2DeletedAt3.ResourceVersion = "1003" + + allEvents := []watch.Event{ + {Type: watch.Added, Object: pod1.DeepCopy()}, + {Type: watch.Modified, Object: pod2.DeepCopy()}, + {Type: watch.Modified, Object: pod3.DeepCopy()}, + {Type: watch.Deleted, Object: pod4.DeepCopy()}, + } + fooEvents := []watch.Event{ + {Type: watch.Added, Object: pod1.DeepCopy()}, + {Type: watch.Modified, Object: pod2.DeepCopy()}, + {Type: watch.Deleted, Object: pod2DeletedAt3.DeepCopy()}, + } + barEvents := []watch.Event{ + {Type: watch.Added, Object: pod1.DeepCopy()}, + {Type: watch.Deleted, Object: pod1DeletedAt2.DeepCopy()}, + } + + cacher.watchCache.Add(pod1) + cacher.watchCache.Update(pod2) + cacher.watchCache.Update(pod3) + cacher.watchCache.Delete(pod4) + + verifyEvents(t, allEventsWatcher, allEvents) + verifyEvents(t, fooEventsWatcher, fooEvents) + verifyEvents(t, barEventsWatcher, barEvents) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 332aacd98d5..709e352f57a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -206,6 +206,37 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob return object, resourceVersion, nil } +func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) { + switch event.Type { + case watch.Added, watch.Modified: + if object, err := newCachingObject(event.Object); err == nil { + event.Object = object + } else { + klog.Errorf("couldn't create cachingObject from: %#v", event.Object) + } + // Don't wrap PrevObject for update event (for create events it is nil). + // We only encode those to deliver DELETE watch events, so if + // event.Object is not nil it can be used only for watchers for which + // selector was satisfied for its previous version and is no longer + // satisfied for the current version. + // This is rare enough that it doesn't justify making deep-copy of the + // object (done by newCachingObject) every time. + case watch.Deleted: + // Don't wrap Object for delete events - these are not to deliver any + // events. Only wrap PrevObject. + if object, err := newCachingObject(event.PrevObject); err == nil { + // Update resource version of the underlying object. + // event.PrevObject is used to deliver DELETE watch events and + // for them, we set resourceVersion to instead of + // the resourceVersion of the last modification of the object. + updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion) + event.PrevObject = object + } else { + klog.Errorf("couldn't create cachingObject from: %#v", event.Object) + } + } +} + // processEvent is safe as long as there is at most one call to it in flight // at any point in time. func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { @@ -219,7 +250,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd return err } - watchCacheEvent := &watchCacheEvent{ + wcEvent := &watchCacheEvent{ Type: event.Type, Object: elem.Object, ObjLabels: elem.Labels, @@ -242,12 +273,12 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd } if exists { previousElem := previous.(*storeElement) - watchCacheEvent.PrevObject = previousElem.Object - watchCacheEvent.PrevObjLabels = previousElem.Labels - watchCacheEvent.PrevObjFields = previousElem.Fields + wcEvent.PrevObject = previousElem.Object + wcEvent.PrevObjLabels = previousElem.Labels + wcEvent.PrevObjFields = previousElem.Fields } - w.updateCache(watchCacheEvent) + w.updateCache(wcEvent) w.resourceVersion = resourceVersion defer w.cond.Broadcast() @@ -260,7 +291,18 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd // This is safe as long as there is at most one call to processEvent in flight // at any point in time. if w.eventHandler != nil { - w.eventHandler(watchCacheEvent) + // Set up caching of object serializations only for dispatching this event. + // + // Storing serializations in memory would result in increased memory usage, + // but it would help for caching encodings for watches started from old + // versions. However, we still don't have a convincing data that the gain + // from it justifies increased memory usage, so for now we drop the cached + // serializations after dispatching this event. + + // Make a shallow copy to allow overwriting Object and PrevObject. + wce := *wcEvent + setCachingObjects(&wce, w.versioner) + w.eventHandler(&wce) } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index e1f30de83e2..9883def730f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -18,6 +18,7 @@ package cacher import ( "fmt" + "reflect" "strconv" "strings" "testing" @@ -432,3 +433,53 @@ func TestReflectorForWatchCache(t *testing.T) { } } } + +func TestCachingObjects(t *testing.T) { + store := newTestWatchCache(5) + + index := 0 + store.eventHandler = func(event *watchCacheEvent) { + switch event.Type { + case watch.Added, watch.Modified: + if _, ok := event.Object.(runtime.CacheableObject); !ok { + t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object) + } + if _, ok := event.PrevObject.(runtime.CacheableObject); ok { + t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object) + } + case watch.Deleted: + if _, ok := event.Object.(runtime.CacheableObject); ok { + t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object) + } + if _, ok := event.PrevObject.(runtime.CacheableObject); !ok { + t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object) + } + } + + // Verify that delivered event is the same as cached one modulo Object/PrevObject. + switch event.Type { + case watch.Added, watch.Modified: + event.Object = event.Object.(runtime.CacheableObject).GetObject() + case watch.Deleted: + event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject() + // In events store in watchcache, we also don't update ResourceVersion. + // So we need to ensure that we don't fail on it. + resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject) + if err != nil { + t.Fatalf("Failed to parse resource version: %v", err) + } + updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion) + } + if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) { + t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e) + } + index++ + } + + pod1 := makeTestPod("pod", 1) + pod2 := makeTestPod("pod", 2) + pod3 := makeTestPod("pod", 3) + store.Add(pod1) + store.Update(pod2) + store.Delete(pod3) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 8c2cb0611f9..80eeea09ebf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -357,7 +357,11 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType t.Logf("(called from line %d)", line) t.Errorf("Expected: %s, got: %s", eventType, event.Type) } - if e, a := eventObject, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) { t.Logf("(called from line %d)", line) t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) } @@ -606,7 +610,11 @@ func TestStartingResourceVersion(t *testing.T) { select { case e := <-watcher.ResultChan(): - pod := e.Object.(*example.Pod) + object := e.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + pod := object.(*example.Pod) podRV, err := v.ParseResourceVersion(pod.ResourceVersion) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -725,7 +733,11 @@ func TestRandomWatchDeliver(t *testing.T) { if !ok { break } - if a, e := event.Object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { t.Errorf("Unexpected object watched: %s, expected %s", a, e) } watched++ @@ -911,7 +923,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { pod := fmt.Sprintf("foo-%d", i) err := createPod(etcdStorage, makeTestPod(pod)) if err != nil { - t.Fatalf("failed to create pod %v", pod) + t.Fatalf("failed to create pod %v: %v", pod, err) } time.Sleep(time.Second / 100) }