mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #108252 from wojtek-t/avoid_object_deep_copies
Avoid deep-copying object when possible on kube-apiserver watch path
This commit is contained in:
commit
9946b5364e
@ -27,11 +27,11 @@ import (
|
||||
|
||||
// Interface can be implemented by anything that knows how to watch and report changes.
|
||||
type Interface interface {
|
||||
// Stops watching. Will close the channel returned by ResultChan(). Releases
|
||||
// Stop stops watching. Will close the channel returned by ResultChan(). Releases
|
||||
// any resources used by the watch.
|
||||
Stop()
|
||||
|
||||
// Returns a chan which will receive all the events. If an error occurs
|
||||
// ResultChan returns a chan which will receive all the events. If an error occurs
|
||||
// or Stop() is called, the implementation will close this channel and
|
||||
// release any resources used by the watch.
|
||||
ResultChan() <-chan Event
|
||||
|
@ -59,8 +59,14 @@ func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}
|
||||
if _, ok := obj.(*metav1.Status); ok {
|
||||
return obj, nil
|
||||
}
|
||||
if err := ensureNonNilItems(obj); err != nil {
|
||||
return nil, err
|
||||
|
||||
// ensure that for empty lists we don't return <nil> items.
|
||||
// This is safe to modify without deep-copying the object, as
|
||||
// List objects themselves are never cached.
|
||||
if meta.IsListType(obj) && meta.LenList(obj) == 0 {
|
||||
if err := meta.SetList(obj, []runtime.Object{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
switch target := mediaType.Convert; {
|
||||
|
@ -356,16 +356,6 @@ func dedupOwnerReferencesAndAddWarning(obj runtime.Object, requestContext contex
|
||||
}
|
||||
}
|
||||
|
||||
// ensureNonNilItems ensures that for empty lists we don't return <nil> items.
|
||||
func ensureNonNilItems(obj runtime.Object) error {
|
||||
if meta.IsListType(obj) && meta.LenList(obj) == 0 {
|
||||
if err := meta.SetList(obj, []runtime.Object{}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func summarizeData(data []byte, maxLength int) string {
|
||||
switch {
|
||||
case len(data) == 0:
|
||||
|
@ -818,11 +818,11 @@ func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
|
||||
// Don't wrap Object for delete events - these are not to deliver any
|
||||
// events. Only wrap PrevObject.
|
||||
if object, err := newCachingObject(event.PrevObject); err == nil {
|
||||
// Update resource version of the underlying object.
|
||||
// Update resource version of the object.
|
||||
// event.PrevObject is used to deliver DELETE watch events and
|
||||
// for them, we set resourceVersion to <current> instead of
|
||||
// the resourceVersion of the last modification of the object.
|
||||
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
|
||||
updateResourceVersion(object, versioner, event.ResourceVersion)
|
||||
event.PrevObject = object
|
||||
} else {
|
||||
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
||||
@ -851,14 +851,14 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
||||
// from it justifies increased memory usage, so for now we drop the cached
|
||||
// serializations after dispatching this event.
|
||||
//
|
||||
// Given the deep-copies that are done to create cachingObjects,
|
||||
// we try to cache serializations only if there are at least 3 watchers.
|
||||
if len(c.watchersBuffer) >= 3 {
|
||||
// Make a shallow copy to allow overwriting Object and PrevObject.
|
||||
wcEvent := *event
|
||||
setCachingObjects(&wcEvent, c.versioner)
|
||||
event = &wcEvent
|
||||
}
|
||||
// Given that CachingObject is just wrapping the object and not perfoming
|
||||
// deep-copying (until some field is explicitly being modified), we create
|
||||
// it unconditionally to ensure safety and reduce deep-copying.
|
||||
//
|
||||
// Make a shallow copy to allow overwriting Object and PrevObject.
|
||||
wcEvent := *event
|
||||
setCachingObjects(&wcEvent, c.versioner)
|
||||
event = &wcEvent
|
||||
|
||||
c.blockedWatchers = c.blockedWatchers[:0]
|
||||
for _, watcher := range c.watchersBuffer {
|
||||
@ -1255,20 +1255,17 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
|
||||
return heartbeatTime, true
|
||||
}
|
||||
|
||||
func getEventObject(object runtime.Object) runtime.Object {
|
||||
if _, ok := object.(runtime.CacheableObject); ok {
|
||||
func getMutableObject(object runtime.Object) runtime.Object {
|
||||
if _, ok := object.(*cachingObject); ok {
|
||||
// It is safe to return without deep-copy, because the underlying
|
||||
// object was already deep-copied during construction.
|
||||
// object will lazily perform deep-copy on the first try to change
|
||||
// any of its fields.
|
||||
return object
|
||||
}
|
||||
return object.DeepCopyObject()
|
||||
}
|
||||
|
||||
func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
|
||||
if _, ok := object.(*cachingObject); ok {
|
||||
// We assume that for cachingObject resourceVersion was already propagated before.
|
||||
return
|
||||
}
|
||||
func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
|
||||
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
|
||||
}
|
||||
@ -1291,13 +1288,17 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
|
||||
|
||||
switch {
|
||||
case curObjPasses && !oldObjPasses:
|
||||
return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)}
|
||||
return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)}
|
||||
case curObjPasses && oldObjPasses:
|
||||
return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)}
|
||||
return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
// return a delete event with the previous object content, but with the event's resource version
|
||||
oldObj := getEventObject(event.PrevObject)
|
||||
updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion)
|
||||
oldObj := getMutableObject(event.PrevObject)
|
||||
// We know that if oldObj is cachingObject (which can only be set via
|
||||
// setCachingObjects), its resourceVersion is already set correctly and
|
||||
// we don't need to update it. However, since cachingObject efficiently
|
||||
// handles noop updates, we avoid this microoptimization here.
|
||||
updateResourceVersion(oldObj, c.versioner, event.ResourceVersion)
|
||||
return &watch.Event{Type: watch.Deleted, Object: oldObj}
|
||||
}
|
||||
|
||||
|
@ -1370,24 +1370,17 @@ func testCachingObjects(t *testing.T, watchersCount int) {
|
||||
}
|
||||
|
||||
var object runtime.Object
|
||||
if watchersCount >= 3 {
|
||||
if _, ok := event.Object.(runtime.CacheableObject); !ok {
|
||||
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
|
||||
}
|
||||
object = event.Object.(runtime.CacheableObject).GetObject()
|
||||
} else {
|
||||
if _, ok := event.Object.(runtime.CacheableObject); ok {
|
||||
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
|
||||
}
|
||||
object = event.Object.DeepCopyObject()
|
||||
if _, ok := event.Object.(runtime.CacheableObject); !ok {
|
||||
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
|
||||
}
|
||||
object = event.Object.(runtime.CacheableObject).GetObject()
|
||||
|
||||
if event.Type == watch.Deleted {
|
||||
resourceVersion, err := cacher.versioner.ObjectResourceVersion(cacher.watchCache.cache[index].PrevObject)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse resource version: %v", err)
|
||||
}
|
||||
updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion)
|
||||
updateResourceVersion(object, cacher.versioner, resourceVersion)
|
||||
}
|
||||
|
||||
var e runtime.Object
|
||||
|
@ -64,6 +64,16 @@ type serializationsCache map[runtime.Identifier]*serializationResult
|
||||
type cachingObject struct {
|
||||
lock sync.RWMutex
|
||||
|
||||
// deepCopied defines whether the object below has already been
|
||||
// deep copied. The operation is performed lazily on the first
|
||||
// setXxx operation.
|
||||
//
|
||||
// The lazy deep-copy make is useful, as effectively the only
|
||||
// case when we are setting some fields are ResourceVersion for
|
||||
// DELETE events, so in all other cases we can effectively avoid
|
||||
// performing any deep copies.
|
||||
deepCopied bool
|
||||
|
||||
// Object for which serializations are cached.
|
||||
object metaRuntimeInterface
|
||||
|
||||
@ -79,7 +89,10 @@ type cachingObject struct {
|
||||
// metav1.Object type.
|
||||
func newCachingObject(object runtime.Object) (*cachingObject, error) {
|
||||
if obj, ok := object.(metaRuntimeInterface); ok {
|
||||
result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)}
|
||||
result := &cachingObject{
|
||||
object: obj,
|
||||
deepCopied: false,
|
||||
}
|
||||
result.serializations.Store(make(serializationsCache))
|
||||
return result, nil
|
||||
}
|
||||
@ -124,6 +137,10 @@ func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.O
|
||||
result := o.getSerializationResult(id)
|
||||
result.once.Do(func() {
|
||||
buffer := bytes.NewBuffer(nil)
|
||||
// TODO(wojtek-t): This is currently making a copy to avoid races
|
||||
// in cases where encoding is making subtle object modifications,
|
||||
// e.g. #82497
|
||||
// Figure out if we can somehow avoid this under some conditions.
|
||||
result.err = encode(o.GetObject(), buffer)
|
||||
result.raw = buffer.Bytes()
|
||||
})
|
||||
@ -156,7 +173,9 @@ func (o *cachingObject) DeepCopyObject() runtime.Object {
|
||||
// DeepCopyObject on cachingObject is not expected to be called anywhere.
|
||||
// However, to be on the safe-side, we implement it, though given the
|
||||
// cache is only an optimization we ignore copying it.
|
||||
result := &cachingObject{}
|
||||
result := &cachingObject{
|
||||
deepCopied: true,
|
||||
}
|
||||
result.serializations.Store(make(serializationsCache))
|
||||
|
||||
o.lock.RLock()
|
||||
@ -214,6 +233,10 @@ func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) {
|
||||
if isNoop() {
|
||||
return
|
||||
}
|
||||
if !o.deepCopied {
|
||||
o.object = o.object.DeepCopyObject().(metaRuntimeInterface)
|
||||
o.deepCopied = true
|
||||
}
|
||||
o.invalidateCacheLocked()
|
||||
set()
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
@ -162,3 +163,29 @@ func TestCachingObjectRaces(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestCachingObjectLazyDeepCopy(t *testing.T) {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
ResourceVersion: "123",
|
||||
},
|
||||
}
|
||||
object, err := newCachingObject(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't create cachingObject: %v", err)
|
||||
}
|
||||
|
||||
if object.deepCopied != false {
|
||||
t.Errorf("object deep-copied without the need")
|
||||
}
|
||||
|
||||
object.SetResourceVersion("123")
|
||||
if object.deepCopied != false {
|
||||
t.Errorf("object deep-copied on no-op change")
|
||||
}
|
||||
object.SetResourceVersion("234")
|
||||
if object.deepCopied != true {
|
||||
t.Errorf("object not deep-copied on change")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user