mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #84043 from wojtek-t/tweak_serialize_object_once
Don't use CachingObject if the number of watchers is small
This commit is contained in:
commit
f9acca889c
@ -823,6 +823,37 @@ func (c *Cacher) dispatchEvents() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added, watch.Modified:
|
||||||
|
if object, err := newCachingObject(event.Object); err == nil {
|
||||||
|
event.Object = object
|
||||||
|
} else {
|
||||||
|
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
||||||
|
}
|
||||||
|
// Don't wrap PrevObject for update event (for create events it is nil).
|
||||||
|
// We only encode those to deliver DELETE watch events, so if
|
||||||
|
// event.Object is not nil it can be used only for watchers for which
|
||||||
|
// selector was satisfied for its previous version and is no longer
|
||||||
|
// satisfied for the current version.
|
||||||
|
// This is rare enough that it doesn't justify making deep-copy of the
|
||||||
|
// object (done by newCachingObject) every time.
|
||||||
|
case watch.Deleted:
|
||||||
|
// Don't wrap Object for delete events - these are not to deliver any
|
||||||
|
// events. Only wrap PrevObject.
|
||||||
|
if object, err := newCachingObject(event.PrevObject); err == nil {
|
||||||
|
// Update resource version of the underlying object.
|
||||||
|
// event.PrevObject is used to deliver DELETE watch events and
|
||||||
|
// for them, we set resourceVersion to <current> instead of
|
||||||
|
// the resourceVersion of the last modification of the object.
|
||||||
|
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
|
||||||
|
event.PrevObject = object
|
||||||
|
} else {
|
||||||
|
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
||||||
c.startDispatching(event)
|
c.startDispatching(event)
|
||||||
defer c.finishDispatching()
|
defer c.finishDispatching()
|
||||||
@ -836,6 +867,23 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
|||||||
watcher.nonblockingAdd(event)
|
watcher.nonblockingAdd(event)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// Set up caching of object serializations only for dispatching this event.
|
||||||
|
//
|
||||||
|
// Storing serializations in memory would result in increased memory usage,
|
||||||
|
// but it would help for caching encodings for watches started from old
|
||||||
|
// versions. However, we still don't have a convincing data that the gain
|
||||||
|
// from it justifies increased memory usage, so for now we drop the cached
|
||||||
|
// serializations after dispatching this event.
|
||||||
|
//
|
||||||
|
// Given the deep-copies that are done to create cachingObjects,
|
||||||
|
// we try to cache serializations only if there are at least 3 watchers.
|
||||||
|
if len(c.watchersBuffer) >= 3 {
|
||||||
|
// Make a shallow copy to allow overwriting Object and PrevObject.
|
||||||
|
wcEvent := *event
|
||||||
|
setCachingObjects(&wcEvent, c.versioner)
|
||||||
|
event = &wcEvent
|
||||||
|
}
|
||||||
|
|
||||||
c.blockedWatchers = c.blockedWatchers[:0]
|
c.blockedWatchers = c.blockedWatchers[:0]
|
||||||
for _, watcher := range c.watchersBuffer {
|
for _, watcher := range c.watchersBuffer {
|
||||||
if !watcher.nonblockingAdd(event) {
|
if !watcher.nonblockingAdd(event) {
|
||||||
|
@ -998,3 +998,104 @@ func TestCachingDeleteEvents(t *testing.T) {
|
|||||||
verifyEvents(t, fooEventsWatcher, fooEvents)
|
verifyEvents(t, fooEventsWatcher, fooEvents)
|
||||||
verifyEvents(t, barEventsWatcher, barEvents)
|
verifyEvents(t, barEventsWatcher, barEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCachingObjects(t *testing.T, watchersCount int) {
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait until cacher is initialized.
|
||||||
|
cacher.ready.wait()
|
||||||
|
|
||||||
|
dispatchedEvents := []*watchCacheEvent{}
|
||||||
|
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
|
||||||
|
dispatchedEvents = append(dispatchedEvents, event)
|
||||||
|
cacher.processEvent(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
watchers := make([]watch.Interface, 0, watchersCount)
|
||||||
|
for i := 0; i < watchersCount; i++ {
|
||||||
|
w, err := cacher.Watch(context.TODO(), "pods/ns", "1000", storage.Everything)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create watch: %v", err)
|
||||||
|
}
|
||||||
|
defer w.Stop()
|
||||||
|
watchers = append(watchers, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
makePod := func(name, rv string) *examplev1.Pod {
|
||||||
|
return &examplev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: rv,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pod1 := makePod("pod", "1001")
|
||||||
|
pod2 := makePod("pod", "1002")
|
||||||
|
pod3 := makePod("pod", "1003")
|
||||||
|
|
||||||
|
cacher.watchCache.Add(pod1)
|
||||||
|
cacher.watchCache.Update(pod2)
|
||||||
|
cacher.watchCache.Delete(pod3)
|
||||||
|
|
||||||
|
// At this point, we already have dispatchedEvents fully propagated.
|
||||||
|
|
||||||
|
verifyEvents := func(w watch.Interface) {
|
||||||
|
var event watch.Event
|
||||||
|
for index := range dispatchedEvents {
|
||||||
|
select {
|
||||||
|
case event = <-w.ResultChan():
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatalf("timeout watiching for the event")
|
||||||
|
}
|
||||||
|
|
||||||
|
var object runtime.Object
|
||||||
|
if watchersCount >= 3 {
|
||||||
|
if _, ok := event.Object.(runtime.CacheableObject); !ok {
|
||||||
|
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
object = event.Object.(runtime.CacheableObject).GetObject()
|
||||||
|
} else {
|
||||||
|
if _, ok := event.Object.(runtime.CacheableObject); ok {
|
||||||
|
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
|
||||||
|
}
|
||||||
|
object = event.Object.DeepCopyObject()
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.Type == watch.Deleted {
|
||||||
|
resourceVersion, err := cacher.versioner.ObjectResourceVersion(cacher.watchCache.cache[index].PrevObject)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to parse resource version: %v", err)
|
||||||
|
}
|
||||||
|
updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
var e runtime.Object
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added, watch.Modified:
|
||||||
|
e = cacher.watchCache.cache[index].Object
|
||||||
|
case watch.Deleted:
|
||||||
|
e = cacher.watchCache.cache[index].PrevObject
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected watch event: %#v", event)
|
||||||
|
}
|
||||||
|
if a := object; !reflect.DeepEqual(a, e) {
|
||||||
|
t.Errorf("event object messed up for %s: %#v, expected: %#v", event.Type, a, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range watchers {
|
||||||
|
verifyEvents(watchers[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCachingObjects(t *testing.T) {
|
||||||
|
t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) })
|
||||||
|
t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) })
|
||||||
|
}
|
||||||
|
@ -210,37 +210,6 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
|
|||||||
return object, resourceVersion, nil
|
return object, resourceVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
|
|
||||||
switch event.Type {
|
|
||||||
case watch.Added, watch.Modified:
|
|
||||||
if object, err := newCachingObject(event.Object); err == nil {
|
|
||||||
event.Object = object
|
|
||||||
} else {
|
|
||||||
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
|
||||||
}
|
|
||||||
// Don't wrap PrevObject for update event (for create events it is nil).
|
|
||||||
// We only encode those to deliver DELETE watch events, so if
|
|
||||||
// event.Object is not nil it can be used only for watchers for which
|
|
||||||
// selector was satisfied for its previous version and is no longer
|
|
||||||
// satisfied for the current version.
|
|
||||||
// This is rare enough that it doesn't justify making deep-copy of the
|
|
||||||
// object (done by newCachingObject) every time.
|
|
||||||
case watch.Deleted:
|
|
||||||
// Don't wrap Object for delete events - these are not to deliver any
|
|
||||||
// events. Only wrap PrevObject.
|
|
||||||
if object, err := newCachingObject(event.PrevObject); err == nil {
|
|
||||||
// Update resource version of the underlying object.
|
|
||||||
// event.PrevObject is used to deliver DELETE watch events and
|
|
||||||
// for them, we set resourceVersion to <current> instead of
|
|
||||||
// the resourceVersion of the last modification of the object.
|
|
||||||
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
|
|
||||||
event.PrevObject = object
|
|
||||||
} else {
|
|
||||||
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// processEvent is safe as long as there is at most one call to it in flight
|
// processEvent is safe as long as there is at most one call to it in flight
|
||||||
// at any point in time.
|
// at any point in time.
|
||||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
||||||
@ -295,18 +264,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||||||
// This is safe as long as there is at most one call to processEvent in flight
|
// This is safe as long as there is at most one call to processEvent in flight
|
||||||
// at any point in time.
|
// at any point in time.
|
||||||
if w.eventHandler != nil {
|
if w.eventHandler != nil {
|
||||||
// Set up caching of object serializations only for dispatching this event.
|
w.eventHandler(wcEvent)
|
||||||
//
|
|
||||||
// Storing serializations in memory would result in increased memory usage,
|
|
||||||
// but it would help for caching encodings for watches started from old
|
|
||||||
// versions. However, we still don't have a convincing data that the gain
|
|
||||||
// from it justifies increased memory usage, so for now we drop the cached
|
|
||||||
// serializations after dispatching this event.
|
|
||||||
|
|
||||||
// Make a shallow copy to allow overwriting Object and PrevObject.
|
|
||||||
wce := *wcEvent
|
|
||||||
setCachingObjects(&wce, w.versioner)
|
|
||||||
w.eventHandler(&wce)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ package cacher
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -436,53 +435,3 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCachingObjects(t *testing.T) {
|
|
||||||
store := newTestWatchCache(5)
|
|
||||||
|
|
||||||
index := 0
|
|
||||||
store.eventHandler = func(event *watchCacheEvent) {
|
|
||||||
switch event.Type {
|
|
||||||
case watch.Added, watch.Modified:
|
|
||||||
if _, ok := event.Object.(runtime.CacheableObject); !ok {
|
|
||||||
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
|
|
||||||
}
|
|
||||||
if _, ok := event.PrevObject.(runtime.CacheableObject); ok {
|
|
||||||
t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object)
|
|
||||||
}
|
|
||||||
case watch.Deleted:
|
|
||||||
if _, ok := event.Object.(runtime.CacheableObject); ok {
|
|
||||||
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
|
|
||||||
}
|
|
||||||
if _, ok := event.PrevObject.(runtime.CacheableObject); !ok {
|
|
||||||
t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that delivered event is the same as cached one modulo Object/PrevObject.
|
|
||||||
switch event.Type {
|
|
||||||
case watch.Added, watch.Modified:
|
|
||||||
event.Object = event.Object.(runtime.CacheableObject).GetObject()
|
|
||||||
case watch.Deleted:
|
|
||||||
event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject()
|
|
||||||
// In events store in watchcache, we also don't update ResourceVersion.
|
|
||||||
// So we need to ensure that we don't fail on it.
|
|
||||||
resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to parse resource version: %v", err)
|
|
||||||
}
|
|
||||||
updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion)
|
|
||||||
}
|
|
||||||
if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) {
|
|
||||||
t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e)
|
|
||||||
}
|
|
||||||
index++
|
|
||||||
}
|
|
||||||
|
|
||||||
pod1 := makeTestPod("pod", 1)
|
|
||||||
pod2 := makeTestPod("pod", 2)
|
|
||||||
pod3 := makeTestPod("pod", 3)
|
|
||||||
store.Add(pod1)
|
|
||||||
store.Update(pod2)
|
|
||||||
store.Delete(pod3)
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user