diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 231c999b4a4..fd462dcbae3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -818,11 +818,11 @@ func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) { // Don't wrap Object for delete events - these are not to deliver any // events. Only wrap PrevObject. if object, err := newCachingObject(event.PrevObject); err == nil { - // Update resource version of the underlying object. + // Update resource version of the object. // event.PrevObject is used to deliver DELETE watch events and // for them, we set resourceVersion to instead of // the resourceVersion of the last modification of the object. - updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion) + updateResourceVersion(object, versioner, event.ResourceVersion) event.PrevObject = object } else { klog.Errorf("couldn't create cachingObject from: %#v", event.Object) @@ -1255,20 +1255,17 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du return heartbeatTime, true } -func getEventObject(object runtime.Object) runtime.Object { - if _, ok := object.(runtime.CacheableObject); ok { +func getMutableObject(object runtime.Object) runtime.Object { + if _, ok := object.(*cachingObject); ok { // It is safe to return without deep-copy, because the underlying - // object was already deep-copied during construction. + // object will lazily perform deep-copy on the first try to change + // any of its fields. return object } return object.DeepCopyObject() } -func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { - if _, ok := object.(*cachingObject); ok { - // We assume that for cachingObject resourceVersion was already propagated before. - return - } +func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { if err := versioner.UpdateObject(object, resourceVersion); err != nil { utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err)) } @@ -1291,13 +1288,17 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event switch { case curObjPasses && !oldObjPasses: - return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)} + return &watch.Event{Type: watch.Added, Object: event.Object} case curObjPasses && oldObjPasses: - return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)} + return &watch.Event{Type: watch.Modified, Object: event.Object} case !curObjPasses && oldObjPasses: // return a delete event with the previous object content, but with the event's resource version - oldObj := getEventObject(event.PrevObject) - updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion) + oldObj := getMutableObject(event.PrevObject) + // We know that if oldObj is cachingObject (which can only be set via + // setCachingObjects), its resourceVersion is already set correctly and + // we don't need to update it. However, since cachingObject efficiently + // handles noop updates, we avoid this microoptimization here. + updateResourceVersion(oldObj, c.versioner, event.ResourceVersion) return &watch.Event{Type: watch.Deleted, Object: oldObj} } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index a359e5befd7..03ea945691e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1387,7 +1387,7 @@ func testCachingObjects(t *testing.T, watchersCount int) { if err != nil { t.Fatalf("Failed to parse resource version: %v", err) } - updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion) + updateResourceVersion(object, cacher.versioner, resourceVersion) } var e runtime.Object diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go index 1ce675a1ed5..91a22cb459f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go @@ -64,6 +64,16 @@ type serializationsCache map[runtime.Identifier]*serializationResult type cachingObject struct { lock sync.RWMutex + // deepCopied defines whether the object below has already been + // deep copied. The operation is performed lazily on the first + // setXxx operation. + // + // The lazy deep-copy make is useful, as effectively the only + // case when we are setting some fields are ResourceVersion for + // DELETE events, so in all other cases we can effectively avoid + // performing any deep copies. + deepCopied bool + // Object for which serializations are cached. object metaRuntimeInterface @@ -79,7 +89,10 @@ type cachingObject struct { // metav1.Object type. func newCachingObject(object runtime.Object) (*cachingObject, error) { if obj, ok := object.(metaRuntimeInterface); ok { - result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)} + result := &cachingObject{ + object: obj, + deepCopied: false, + } result.serializations.Store(make(serializationsCache)) return result, nil } @@ -156,7 +169,9 @@ func (o *cachingObject) DeepCopyObject() runtime.Object { // DeepCopyObject on cachingObject is not expected to be called anywhere. // However, to be on the safe-side, we implement it, though given the // cache is only an optimization we ignore copying it. - result := &cachingObject{} + result := &cachingObject{ + deepCopied: true, + } result.serializations.Store(make(serializationsCache)) o.lock.RLock() @@ -214,6 +229,10 @@ func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) { if isNoop() { return } + if !o.deepCopied { + o.object = o.object.DeepCopyObject().(metaRuntimeInterface) + o.deepCopied = true + } o.invalidateCacheLocked() set() } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go index f992573bab3..34fee69934f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -162,3 +163,29 @@ func TestCachingObjectRaces(t *testing.T) { } wg.Wait() } + +func TestCachingObjectLazyDeepCopy(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + ResourceVersion: "123", + }, + } + object, err := newCachingObject(pod) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + + if object.deepCopied != false { + t.Errorf("object deep-copied without the need") + } + + object.SetResourceVersion("123") + if object.deepCopied != false { + t.Errorf("object deep-copied on no-op change") + } + object.SetResourceVersion("234") + if object.deepCopied != true { + t.Errorf("object not deep-copied on change") + } +}