mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Avoid perfoming deep copies in watchcache if not needed
This commit is contained in:
parent
7e434682e4
commit
0cb3a02bbe
@ -818,11 +818,11 @@ func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
|
||||
// Don't wrap Object for delete events - these are not to deliver any
|
||||
// events. Only wrap PrevObject.
|
||||
if object, err := newCachingObject(event.PrevObject); err == nil {
|
||||
// Update resource version of the underlying object.
|
||||
// Update resource version of the object.
|
||||
// event.PrevObject is used to deliver DELETE watch events and
|
||||
// for them, we set resourceVersion to <current> instead of
|
||||
// the resourceVersion of the last modification of the object.
|
||||
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
|
||||
updateResourceVersion(object, versioner, event.ResourceVersion)
|
||||
event.PrevObject = object
|
||||
} else {
|
||||
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
||||
@ -1255,20 +1255,17 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
|
||||
return heartbeatTime, true
|
||||
}
|
||||
|
||||
func getEventObject(object runtime.Object) runtime.Object {
|
||||
if _, ok := object.(runtime.CacheableObject); ok {
|
||||
func getMutableObject(object runtime.Object) runtime.Object {
|
||||
if _, ok := object.(*cachingObject); ok {
|
||||
// It is safe to return without deep-copy, because the underlying
|
||||
// object was already deep-copied during construction.
|
||||
// object will lazily perform deep-copy on the first try to change
|
||||
// any of its fields.
|
||||
return object
|
||||
}
|
||||
return object.DeepCopyObject()
|
||||
}
|
||||
|
||||
func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
|
||||
if _, ok := object.(*cachingObject); ok {
|
||||
// We assume that for cachingObject resourceVersion was already propagated before.
|
||||
return
|
||||
}
|
||||
func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
|
||||
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
|
||||
}
|
||||
@ -1291,13 +1288,17 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
|
||||
|
||||
switch {
|
||||
case curObjPasses && !oldObjPasses:
|
||||
return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)}
|
||||
return &watch.Event{Type: watch.Added, Object: event.Object}
|
||||
case curObjPasses && oldObjPasses:
|
||||
return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)}
|
||||
return &watch.Event{Type: watch.Modified, Object: event.Object}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
// return a delete event with the previous object content, but with the event's resource version
|
||||
oldObj := getEventObject(event.PrevObject)
|
||||
updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion)
|
||||
oldObj := getMutableObject(event.PrevObject)
|
||||
// We know that if oldObj is cachingObject (which can only be set via
|
||||
// setCachingObjects), its resourceVersion is already set correctly and
|
||||
// we don't need to update it. However, since cachingObject efficiently
|
||||
// handles noop updates, we avoid this microoptimization here.
|
||||
updateResourceVersion(oldObj, c.versioner, event.ResourceVersion)
|
||||
return &watch.Event{Type: watch.Deleted, Object: oldObj}
|
||||
}
|
||||
|
||||
|
@ -1387,7 +1387,7 @@ func testCachingObjects(t *testing.T, watchersCount int) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse resource version: %v", err)
|
||||
}
|
||||
updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion)
|
||||
updateResourceVersion(object, cacher.versioner, resourceVersion)
|
||||
}
|
||||
|
||||
var e runtime.Object
|
||||
|
@ -64,6 +64,16 @@ type serializationsCache map[runtime.Identifier]*serializationResult
|
||||
type cachingObject struct {
|
||||
lock sync.RWMutex
|
||||
|
||||
// deepCopied defines whether the object below has already been
|
||||
// deep copied. The operation is performed lazily on the first
|
||||
// setXxx operation.
|
||||
//
|
||||
// The lazy deep-copy make is useful, as effectively the only
|
||||
// case when we are setting some fields are ResourceVersion for
|
||||
// DELETE events, so in all other cases we can effectively avoid
|
||||
// performing any deep copies.
|
||||
deepCopied bool
|
||||
|
||||
// Object for which serializations are cached.
|
||||
object metaRuntimeInterface
|
||||
|
||||
@ -79,7 +89,10 @@ type cachingObject struct {
|
||||
// metav1.Object type.
|
||||
func newCachingObject(object runtime.Object) (*cachingObject, error) {
|
||||
if obj, ok := object.(metaRuntimeInterface); ok {
|
||||
result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)}
|
||||
result := &cachingObject{
|
||||
object: obj,
|
||||
deepCopied: false,
|
||||
}
|
||||
result.serializations.Store(make(serializationsCache))
|
||||
return result, nil
|
||||
}
|
||||
@ -156,7 +169,9 @@ func (o *cachingObject) DeepCopyObject() runtime.Object {
|
||||
// DeepCopyObject on cachingObject is not expected to be called anywhere.
|
||||
// However, to be on the safe-side, we implement it, though given the
|
||||
// cache is only an optimization we ignore copying it.
|
||||
result := &cachingObject{}
|
||||
result := &cachingObject{
|
||||
deepCopied: true,
|
||||
}
|
||||
result.serializations.Store(make(serializationsCache))
|
||||
|
||||
o.lock.RLock()
|
||||
@ -214,6 +229,10 @@ func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) {
|
||||
if isNoop() {
|
||||
return
|
||||
}
|
||||
if !o.deepCopied {
|
||||
o.object = o.object.DeepCopyObject().(metaRuntimeInterface)
|
||||
o.deepCopied = true
|
||||
}
|
||||
o.invalidateCacheLocked()
|
||||
set()
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
@ -162,3 +163,29 @@ func TestCachingObjectRaces(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestCachingObjectLazyDeepCopy(t *testing.T) {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
ResourceVersion: "123",
|
||||
},
|
||||
}
|
||||
object, err := newCachingObject(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't create cachingObject: %v", err)
|
||||
}
|
||||
|
||||
if object.deepCopied != false {
|
||||
t.Errorf("object deep-copied without the need")
|
||||
}
|
||||
|
||||
object.SetResourceVersion("123")
|
||||
if object.deepCopied != false {
|
||||
t.Errorf("object deep-copied on no-op change")
|
||||
}
|
||||
object.SetResourceVersion("234")
|
||||
if object.deepCopied != true {
|
||||
t.Errorf("object not deep-copied on change")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user