Cache serializations

This commit is contained in:
Wojciech Tyczynski 2019-08-25 09:55:52 +02:00 committed by wojtekt
parent 4cd81549f9
commit 25a728ae5e
5 changed files with 262 additions and 23 deletions

View File

@ -754,17 +754,25 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// baseObjectThreadUnsafe omits locking for cachingObject.
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
if co, ok := object.(*cachingObject); ok {
return co.object
}
return object
}
func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bool) {
if c.indexedTrigger == nil {
return nil, false
}
result := make([]string, 0, 2)
result = append(result, c.indexedTrigger.indexerFunc(event.Object))
result = append(result, c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.Object)))
if event.PrevObject == nil {
return result, true
}
prevTriggerValue := c.indexedTrigger.indexerFunc(event.PrevObject)
prevTriggerValue := c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.PrevObject))
if result[0] != prevTriggerValue {
result = append(result, prevTriggerValue)
}
@ -892,7 +900,10 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
// startDispatching chooses watchers potentially interested in a given event
// a marks dispatching as true.
func (c *Cacher) startDispatching(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
// It is safe to call triggerValuesThreadUnsafe here, because at this
// point only this thread can access this event (we create a separate
// watchCacheEvent for every dispatch).
triggerValues, supported := c.triggerValuesThreadUnsafe(event)
c.Lock()
defer c.Unlock()
@ -1165,7 +1176,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String())
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
c.forget()
}
@ -1193,6 +1204,25 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
return c.deadline.Add(-2 * time.Second), true
}
func getEventObject(object runtime.Object) runtime.Object {
if _, ok := object.(runtime.CacheableObject); ok {
// It is safe to return without deep-copy, because the underlying
// object was already deep-copied during construction.
return object
}
return object.DeepCopyObject()
}
func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
if _, ok := object.(*cachingObject); ok {
// We assume that for cachingObject resourceVersion was already propagated before.
return
}
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
}
}
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark {
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
@ -1210,15 +1240,13 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
oldObj := getEventObject(event.PrevObject)
updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion)
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}

View File

@ -20,12 +20,14 @@ import (
"context"
"fmt"
"reflect"
goruntime "runtime"
"strconv"
"sync"
"testing"
"time"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -265,7 +267,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er
Versioner: testVersioner{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil },
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
NewFunc: func() runtime.Object { return &example.Pod{} },
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
@ -452,7 +454,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
shouldContinue = false
break
}
rv, err := testVersioner{}.ParseResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion)
rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
if err != nil {
t.Errorf("unexpected parsing error: %v", err)
} else {
@ -906,3 +908,107 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
t.Errorf("watcher is blocked by slower one (count: %d)", eventsCount)
}
}
func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event) {
_, _, line, _ := goruntime.Caller(1)
for _, expectedEvent := range events {
select {
case event := <-w.ResultChan():
if e, a := expectedEvent.Type, event.Type; e != a {
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %s, got: %s", e, a)
}
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
if e, a := expectedEvent.Object, object; !apiequality.Semantic.DeepEqual(e, a) {
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %#v, got: %#v", e, a)
}
case <-time.After(wait.ForeverTestTimeout):
t.Logf("(called from line %d)", line)
t.Errorf("Timed out waiting for an event")
}
}
}
func TestCachingDeleteEvents(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
fooPredicate := storage.SelectionPredicate{
Label: labels.SelectorFromSet(map[string]string{"foo": "true"}),
Field: fields.Everything(),
}
barPredicate := storage.SelectionPredicate{
Label: labels.SelectorFromSet(map[string]string{"bar": "true"}),
Field: fields.Everything(),
}
createWatch := func(pred storage.SelectionPredicate) watch.Interface {
w, err := cacher.Watch(context.TODO(), "pods/ns", "999", pred)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
return w
}
allEventsWatcher := createWatch(storage.Everything)
defer allEventsWatcher.Stop()
fooEventsWatcher := createWatch(fooPredicate)
defer fooEventsWatcher.Stop()
barEventsWatcher := createWatch(barPredicate)
defer barEventsWatcher.Stop()
makePod := func(labels map[string]string, rv string) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "ns",
Labels: labels,
ResourceVersion: rv,
},
}
}
pod1 := makePod(map[string]string{"foo": "true", "bar": "true"}, "1001")
pod2 := makePod(map[string]string{"foo": "true"}, "1002")
pod3 := makePod(map[string]string{}, "1003")
pod4 := makePod(map[string]string{}, "1004")
pod1DeletedAt2 := pod1.DeepCopyObject().(*examplev1.Pod)
pod1DeletedAt2.ResourceVersion = "1002"
pod2DeletedAt3 := pod2.DeepCopyObject().(*examplev1.Pod)
pod2DeletedAt3.ResourceVersion = "1003"
allEvents := []watch.Event{
{Type: watch.Added, Object: pod1.DeepCopy()},
{Type: watch.Modified, Object: pod2.DeepCopy()},
{Type: watch.Modified, Object: pod3.DeepCopy()},
{Type: watch.Deleted, Object: pod4.DeepCopy()},
}
fooEvents := []watch.Event{
{Type: watch.Added, Object: pod1.DeepCopy()},
{Type: watch.Modified, Object: pod2.DeepCopy()},
{Type: watch.Deleted, Object: pod2DeletedAt3.DeepCopy()},
}
barEvents := []watch.Event{
{Type: watch.Added, Object: pod1.DeepCopy()},
{Type: watch.Deleted, Object: pod1DeletedAt2.DeepCopy()},
}
cacher.watchCache.Add(pod1)
cacher.watchCache.Update(pod2)
cacher.watchCache.Update(pod3)
cacher.watchCache.Delete(pod4)
verifyEvents(t, allEventsWatcher, allEvents)
verifyEvents(t, fooEventsWatcher, fooEvents)
verifyEvents(t, barEventsWatcher, barEvents)
}

View File

@ -206,6 +206,37 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
return object, resourceVersion, nil
}
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
switch event.Type {
case watch.Added, watch.Modified:
if object, err := newCachingObject(event.Object); err == nil {
event.Object = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch.Deleted:
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
}
}
// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
@ -219,7 +250,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
return err
}
watchCacheEvent := &watchCacheEvent{
wcEvent := &watchCacheEvent{
Type: event.Type,
Object: elem.Object,
ObjLabels: elem.Labels,
@ -242,12 +273,12 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
wcEvent.PrevObject = previousElem.Object
wcEvent.PrevObjLabels = previousElem.Labels
wcEvent.PrevObjFields = previousElem.Fields
}
w.updateCache(watchCacheEvent)
w.updateCache(wcEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
@ -260,7 +291,18 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
// This is safe as long as there is at most one call to processEvent in flight
// at any point in time.
if w.eventHandler != nil {
w.eventHandler(watchCacheEvent)
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
// Make a shallow copy to allow overwriting Object and PrevObject.
wce := *wcEvent
setCachingObjects(&wce, w.versioner)
w.eventHandler(&wce)
}
return nil
}

View File

@ -18,6 +18,7 @@ package cacher
import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
@ -432,3 +433,53 @@ func TestReflectorForWatchCache(t *testing.T) {
}
}
}
func TestCachingObjects(t *testing.T) {
store := newTestWatchCache(5)
index := 0
store.eventHandler = func(event *watchCacheEvent) {
switch event.Type {
case watch.Added, watch.Modified:
if _, ok := event.Object.(runtime.CacheableObject); !ok {
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
}
if _, ok := event.PrevObject.(runtime.CacheableObject); ok {
t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object)
}
case watch.Deleted:
if _, ok := event.Object.(runtime.CacheableObject); ok {
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
}
if _, ok := event.PrevObject.(runtime.CacheableObject); !ok {
t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object)
}
}
// Verify that delivered event is the same as cached one modulo Object/PrevObject.
switch event.Type {
case watch.Added, watch.Modified:
event.Object = event.Object.(runtime.CacheableObject).GetObject()
case watch.Deleted:
event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject()
// In events store in watchcache, we also don't update ResourceVersion.
// So we need to ensure that we don't fail on it.
resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject)
if err != nil {
t.Fatalf("Failed to parse resource version: %v", err)
}
updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion)
}
if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) {
t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e)
}
index++
}
pod1 := makeTestPod("pod", 1)
pod2 := makeTestPod("pod", 2)
pod3 := makeTestPod("pod", 3)
store.Add(pod1)
store.Update(pod2)
store.Delete(pod3)
}

View File

@ -357,7 +357,11 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
}
if e, a := eventObject, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Logf("(called from line %d)", line)
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
}
@ -606,7 +610,11 @@ func TestStartingResourceVersion(t *testing.T) {
select {
case e := <-watcher.ResultChan():
pod := e.Object.(*example.Pod)
object := e.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
pod := object.(*example.Pod)
podRV, err := v.ParseResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -725,7 +733,11 @@ func TestRandomWatchDeliver(t *testing.T) {
if !ok {
break
}
if a, e := event.Object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
t.Errorf("Unexpected object watched: %s, expected %s", a, e)
}
watched++
@ -911,7 +923,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
pod := fmt.Sprintf("foo-%d", i)
err := createPod(etcdStorage, makeTestPod(pod))
if err != nil {
t.Fatalf("failed to create pod %v", pod)
t.Fatalf("failed to create pod %v: %v", pod, err)
}
time.Sleep(time.Second / 100)
}