From 7e434682e450e28d36f0ee4787e7b4672e8eb255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 21 Feb 2022 15:05:35 +0100 Subject: [PATCH 1/3] Prepare apiserver for operating on cached objects by not modifying them --- staging/src/k8s.io/apimachinery/pkg/watch/watch.go | 4 ++-- .../apiserver/pkg/endpoints/handlers/response.go | 10 ++++++++-- .../k8s.io/apiserver/pkg/endpoints/handlers/rest.go | 10 ---------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/watch.go b/staging/src/k8s.io/apimachinery/pkg/watch/watch.go index fd0550e4a7a..b6c7bbfa8fa 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/watch.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/watch.go @@ -27,11 +27,11 @@ import ( // Interface can be implemented by anything that knows how to watch and report changes. type Interface interface { - // Stops watching. Will close the channel returned by ResultChan(). Releases + // Stop stops watching. Will close the channel returned by ResultChan(). Releases // any resources used by the watch. Stop() - // Returns a chan which will receive all the events. If an error occurs + // ResultChan returns a chan which will receive all the events. If an error occurs // or Stop() is called, the implementation will close this channel and // release any resources used by the watch. ResultChan() <-chan Event diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go index 640378d9a74..55b83af0c3b 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go @@ -59,8 +59,14 @@ func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{} if _, ok := obj.(*metav1.Status); ok { return obj, nil } - if err := ensureNonNilItems(obj); err != nil { - return nil, err + + // ensure that for empty lists we don't return items. + // This is safe to modify without deep-copying the object, as + // List objects themselves are never cached. + if meta.IsListType(obj) && meta.LenList(obj) == 0 { + if err := meta.SetList(obj, []runtime.Object{}); err != nil { + return nil, err + } } switch target := mediaType.Convert; { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index b40423dac34..15e3d9f7f1e 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -356,16 +356,6 @@ func dedupOwnerReferencesAndAddWarning(obj runtime.Object, requestContext contex } } -// ensureNonNilItems ensures that for empty lists we don't return items. -func ensureNonNilItems(obj runtime.Object) error { - if meta.IsListType(obj) && meta.LenList(obj) == 0 { - if err := meta.SetList(obj, []runtime.Object{}); err != nil { - return err - } - } - return nil -} - func summarizeData(data []byte, maxLength int) string { switch { case len(data) == 0: From 0cb3a02bbede2f547ddd20e23e2bbc782ac3956c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 21 Feb 2022 15:18:25 +0100 Subject: [PATCH 2/3] Avoid perfoming deep copies in watchcache if not needed --- .../apiserver/pkg/storage/cacher/cacher.go | 29 ++++++++++--------- .../storage/cacher/cacher_whitebox_test.go | 2 +- .../pkg/storage/cacher/caching_object.go | 23 +++++++++++++-- .../pkg/storage/cacher/caching_object_test.go | 27 +++++++++++++++++ 4 files changed, 64 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 231c999b4a4..fd462dcbae3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -818,11 +818,11 @@ func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) { // Don't wrap Object for delete events - these are not to deliver any // events. Only wrap PrevObject. if object, err := newCachingObject(event.PrevObject); err == nil { - // Update resource version of the underlying object. + // Update resource version of the object. // event.PrevObject is used to deliver DELETE watch events and // for them, we set resourceVersion to instead of // the resourceVersion of the last modification of the object. - updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion) + updateResourceVersion(object, versioner, event.ResourceVersion) event.PrevObject = object } else { klog.Errorf("couldn't create cachingObject from: %#v", event.Object) @@ -1255,20 +1255,17 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du return heartbeatTime, true } -func getEventObject(object runtime.Object) runtime.Object { - if _, ok := object.(runtime.CacheableObject); ok { +func getMutableObject(object runtime.Object) runtime.Object { + if _, ok := object.(*cachingObject); ok { // It is safe to return without deep-copy, because the underlying - // object was already deep-copied during construction. + // object will lazily perform deep-copy on the first try to change + // any of its fields. return object } return object.DeepCopyObject() } -func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { - if _, ok := object.(*cachingObject); ok { - // We assume that for cachingObject resourceVersion was already propagated before. - return - } +func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { if err := versioner.UpdateObject(object, resourceVersion); err != nil { utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err)) } @@ -1291,13 +1288,17 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event switch { case curObjPasses && !oldObjPasses: - return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)} + return &watch.Event{Type: watch.Added, Object: event.Object} case curObjPasses && oldObjPasses: - return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)} + return &watch.Event{Type: watch.Modified, Object: event.Object} case !curObjPasses && oldObjPasses: // return a delete event with the previous object content, but with the event's resource version - oldObj := getEventObject(event.PrevObject) - updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion) + oldObj := getMutableObject(event.PrevObject) + // We know that if oldObj is cachingObject (which can only be set via + // setCachingObjects), its resourceVersion is already set correctly and + // we don't need to update it. However, since cachingObject efficiently + // handles noop updates, we avoid this microoptimization here. + updateResourceVersion(oldObj, c.versioner, event.ResourceVersion) return &watch.Event{Type: watch.Deleted, Object: oldObj} } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index a359e5befd7..03ea945691e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1387,7 +1387,7 @@ func testCachingObjects(t *testing.T, watchersCount int) { if err != nil { t.Fatalf("Failed to parse resource version: %v", err) } - updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion) + updateResourceVersion(object, cacher.versioner, resourceVersion) } var e runtime.Object diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go index 1ce675a1ed5..91a22cb459f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go @@ -64,6 +64,16 @@ type serializationsCache map[runtime.Identifier]*serializationResult type cachingObject struct { lock sync.RWMutex + // deepCopied defines whether the object below has already been + // deep copied. The operation is performed lazily on the first + // setXxx operation. + // + // The lazy deep-copy make is useful, as effectively the only + // case when we are setting some fields are ResourceVersion for + // DELETE events, so in all other cases we can effectively avoid + // performing any deep copies. + deepCopied bool + // Object for which serializations are cached. object metaRuntimeInterface @@ -79,7 +89,10 @@ type cachingObject struct { // metav1.Object type. func newCachingObject(object runtime.Object) (*cachingObject, error) { if obj, ok := object.(metaRuntimeInterface); ok { - result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)} + result := &cachingObject{ + object: obj, + deepCopied: false, + } result.serializations.Store(make(serializationsCache)) return result, nil } @@ -156,7 +169,9 @@ func (o *cachingObject) DeepCopyObject() runtime.Object { // DeepCopyObject on cachingObject is not expected to be called anywhere. // However, to be on the safe-side, we implement it, though given the // cache is only an optimization we ignore copying it. - result := &cachingObject{} + result := &cachingObject{ + deepCopied: true, + } result.serializations.Store(make(serializationsCache)) o.lock.RLock() @@ -214,6 +229,10 @@ func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) { if isNoop() { return } + if !o.deepCopied { + o.object = o.object.DeepCopyObject().(metaRuntimeInterface) + o.deepCopied = true + } o.invalidateCacheLocked() set() } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go index f992573bab3..34fee69934f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -162,3 +163,29 @@ func TestCachingObjectRaces(t *testing.T) { } wg.Wait() } + +func TestCachingObjectLazyDeepCopy(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + ResourceVersion: "123", + }, + } + object, err := newCachingObject(pod) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + + if object.deepCopied != false { + t.Errorf("object deep-copied without the need") + } + + object.SetResourceVersion("123") + if object.deepCopied != false { + t.Errorf("object deep-copied on no-op change") + } + object.SetResourceVersion("234") + if object.deepCopied != true { + t.Errorf("object not deep-copied on change") + } +} From 779f157ecfb24d0ee944f18e481bfa8cc8c94f6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 22 Feb 2022 17:14:15 +0100 Subject: [PATCH 3/3] Fix potential race in dispatching watch event --- .../apiserver/pkg/storage/cacher/cacher.go | 20 +++++++++---------- .../storage/cacher/cacher_whitebox_test.go | 13 +++--------- .../pkg/storage/cacher/caching_object.go | 4 ++++ 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index fd462dcbae3..84940ab0cd7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -851,14 +851,14 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { // from it justifies increased memory usage, so for now we drop the cached // serializations after dispatching this event. // - // Given the deep-copies that are done to create cachingObjects, - // we try to cache serializations only if there are at least 3 watchers. - if len(c.watchersBuffer) >= 3 { - // Make a shallow copy to allow overwriting Object and PrevObject. - wcEvent := *event - setCachingObjects(&wcEvent, c.versioner) - event = &wcEvent - } + // Given that CachingObject is just wrapping the object and not perfoming + // deep-copying (until some field is explicitly being modified), we create + // it unconditionally to ensure safety and reduce deep-copying. + // + // Make a shallow copy to allow overwriting Object and PrevObject. + wcEvent := *event + setCachingObjects(&wcEvent, c.versioner) + event = &wcEvent c.blockedWatchers = c.blockedWatchers[:0] for _, watcher := range c.watchersBuffer { @@ -1288,9 +1288,9 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event switch { case curObjPasses && !oldObjPasses: - return &watch.Event{Type: watch.Added, Object: event.Object} + return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)} case curObjPasses && oldObjPasses: - return &watch.Event{Type: watch.Modified, Object: event.Object} + return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)} case !curObjPasses && oldObjPasses: // return a delete event with the previous object content, but with the event's resource version oldObj := getMutableObject(event.PrevObject) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 03ea945691e..b71260102e2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1370,17 +1370,10 @@ func testCachingObjects(t *testing.T, watchersCount int) { } var object runtime.Object - if watchersCount >= 3 { - if _, ok := event.Object.(runtime.CacheableObject); !ok { - t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object) - } - object = event.Object.(runtime.CacheableObject).GetObject() - } else { - if _, ok := event.Object.(runtime.CacheableObject); ok { - t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object) - } - object = event.Object.DeepCopyObject() + if _, ok := event.Object.(runtime.CacheableObject); !ok { + t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object) } + object = event.Object.(runtime.CacheableObject).GetObject() if event.Type == watch.Deleted { resourceVersion, err := cacher.versioner.ObjectResourceVersion(cacher.watchCache.cache[index].PrevObject) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go index 91a22cb459f..9ee5c951f11 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go @@ -137,6 +137,10 @@ func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.O result := o.getSerializationResult(id) result.once.Do(func() { buffer := bytes.NewBuffer(nil) + // TODO(wojtek-t): This is currently making a copy to avoid races + // in cases where encoding is making subtle object modifications, + // e.g. #82497 + // Figure out if we can somehow avoid this under some conditions. result.err = encode(o.GetObject(), buffer) result.raw = buffer.Bytes() })