From 3a71eb1bcc34bef98ed63273d80df0a8ee942eb9 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 14 Aug 2015 12:14:09 +0200 Subject: [PATCH] Store previous value in WatchCache for filtering --- pkg/client/cache/watch_cache.go | 49 ++++++--- pkg/client/cache/watch_cache_test.go | 60 +++++++++++ pkg/storage/cacher.go | 36 ++++--- pkg/storage/cacher_test.go | 142 +++++++++++++++++++++++++++ 4 files changed, 264 insertions(+), 23 deletions(-) diff --git a/pkg/client/cache/watch_cache.go b/pkg/client/cache/watch_cache.go index 60e86f7b9b2..7a1aa7c1ce8 100644 --- a/pkg/client/cache/watch_cache.go +++ b/pkg/client/cache/watch_cache.go @@ -27,12 +27,27 @@ import ( "k8s.io/kubernetes/pkg/watch" ) +// TODO(wojtek-t): All structure in this file should be private to +// pkg/storage package. We should remove the reference to WatchCache +// from Reflector (by changing the Replace method signature in Store +// interface to take resource version too) and move it under pkg/storage. + +// WatchCacheEvent is a single "watch event" that is send to users of +// WatchCache. Additionally to a typical "watch.Event" it contains +// the previous value of the object to enable proper filtering in the +// upper layers. +type WatchCacheEvent struct { + Type watch.EventType + Object runtime.Object + PrevObject runtime.Object +} + // watchCacheElement is a single "watch event" stored in a cache. // It contains the resource version of the object and the object // itself. type watchCacheElement struct { resourceVersion uint64 - event watch.Event + watchCacheEvent WatchCacheEvent } // WatchCache implements a Store interface. @@ -66,8 +81,9 @@ type WatchCache struct { // This handler is run at the end of every successful Replace() method. onReplace func() - // This handler is run at the end of every Add/Update/Delete method. - onEvent func(watch.Event) + // This handler is run at the end of every Add/Update/Delete method + // and additionally gets the previous value of the object. + onEvent func(WatchCacheEvent) } func NewWatchCache(capacity int) *WatchCache { @@ -140,16 +156,27 @@ func parseResourceVersion(resourceVersion string) (uint64, error) { func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { w.Lock() defer w.Unlock() - if w.onEvent != nil { - w.onEvent(event) + previous, exists, err := w.store.Get(event.Object) + if err != nil { + return err } - w.updateCache(resourceVersion, event) + var prevObject runtime.Object + if exists { + prevObject = previous.(runtime.Object) + } else { + prevObject = nil + } + watchCacheEvent := WatchCacheEvent{event.Type, event.Object, prevObject} + if w.onEvent != nil { + w.onEvent(watchCacheEvent) + } + w.updateCache(resourceVersion, watchCacheEvent) w.resourceVersion = resourceVersion return updateFunc(event.Object) } // Assumes that lock is already held for write. -func (w *WatchCache) updateCache(resourceVersion uint64, event watch.Event) { +func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) { if w.endIndex == w.startIndex+w.capacity { // Cache is full - remove the oldest element. w.startIndex++ @@ -219,13 +246,13 @@ func (w *WatchCache) SetOnReplace(onReplace func()) { w.onReplace = onReplace } -func (w *WatchCache) SetOnEvent(onEvent func(watch.Event)) { +func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) { w.Lock() defer w.Unlock() w.onEvent = onEvent } -func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, error) { +func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) { w.RLock() defer w.RUnlock() @@ -244,9 +271,9 @@ func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, e return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion } first := sort.Search(size, f) - result := make([]watch.Event, size-first) + result := make([]WatchCacheEvent, size-first) for i := 0; i < size-first; i++ { - result[i] = w.cache[(w.startIndex+first+i)%w.capacity].event + result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent } return result, nil } diff --git a/pkg/client/cache/watch_cache_test.go b/pkg/client/cache/watch_cache_test.go index d910a505a6f..8f10c197cf4 100644 --- a/pkg/client/cache/watch_cache_test.go +++ b/pkg/client/cache/watch_cache_test.go @@ -22,6 +22,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/watch" ) func makeTestPod(name string, resourceVersion uint64) *api.Pod { @@ -108,6 +109,34 @@ func TestEvents(t *testing.T) { store := NewWatchCache(5) store.Add(makeTestPod("pod", 2)) + + // Test for Added event. + { + _, err := store.GetAllEventsSince(1) + if err == nil { + t.Errorf("expected error too old") + } + } + { + result, err := store.GetAllEventsSince(2) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 1 { + t.Fatalf("unexpected events: %v", result) + } + if result[0].Type != watch.Added { + t.Errorf("unexpected event type: %v", result[0].Type) + } + pod := makeTestPod("pod", uint64(2)) + if !api.Semantic.DeepEqual(pod, result[0].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) + } + if result[0].PrevObject != nil { + t.Errorf("unexpected item: %v", result[0].PrevObject) + } + } + store.Update(makeTestPod("pod", 3)) store.Update(makeTestPod("pod", 4)) @@ -127,10 +156,17 @@ func TestEvents(t *testing.T) { t.Fatalf("unexpected events: %v", result) } for i := 0; i < 2; i++ { + if result[i].Type != watch.Modified { + t.Errorf("unexpected event type: %v", result[i].Type) + } pod := makeTestPod("pod", uint64(i+3)) if !api.Semantic.DeepEqual(pod, result[i].Object) { t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) } + prevPod := makeTestPod("pod", uint64(i+2)) + if !api.Semantic.DeepEqual(prevPod, result[i].PrevObject) { + t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod) + } } } @@ -160,4 +196,28 @@ func TestEvents(t *testing.T) { } } } + + // Test for delete event. + store.Delete(makeTestPod("pod", uint64(9))) + + { + result, err := store.GetAllEventsSince(9) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 1 { + t.Fatalf("unexpected events: %v", result) + } + if result[0].Type != watch.Deleted { + t.Errorf("unexpected event type: %v", result[0].Type) + } + pod := makeTestPod("pod", uint64(9)) + if !api.Semantic.DeepEqual(pod, result[0].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) + } + prevPod := makeTestPod("pod", uint64(8)) + if !api.Semantic.DeepEqual(prevPod, result[0].PrevObject) { + t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod) + } + } } diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 13cbd885851..c817c23f3ab 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -184,7 +184,7 @@ func (c *Cacher) List(key string, listObj runtime.Object) error { return nil } -func (c *Cacher) processEvent(event watch.Event) { +func (c *Cacher) processEvent(event cache.WatchCacheEvent) { c.Lock() defer c.Unlock() for _, watcher := range c.watchers { @@ -271,16 +271,16 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e // cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex - input chan watch.Event + input chan cache.WatchCacheEvent result chan watch.Event filter FilterFunc stopped bool forget func() } -func newCacheWatcher(initEvents []watch.Event, filter FilterFunc, forget func()) *cacheWatcher { +func newCacheWatcher(initEvents []cache.WatchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { watcher := &cacheWatcher{ - input: make(chan watch.Event, 10), + input: make(chan cache.WatchCacheEvent, 10), result: make(chan watch.Event, 10), filter: filter, stopped: false, @@ -310,15 +310,29 @@ func (c *cacheWatcher) stop() { } } -func (c *cacheWatcher) add(event watch.Event) { +func (c *cacheWatcher) add(event cache.WatchCacheEvent) { c.input <- event } -func (c *cacheWatcher) process(initEvents []watch.Event) { +func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { + curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) + oldObjPasses := false + if event.PrevObject != nil { + oldObjPasses = c.filter(event.PrevObject) + } + switch { + case curObjPasses && !oldObjPasses: + c.result <- watch.Event{watch.Added, event.Object} + case curObjPasses && oldObjPasses: + c.result <- watch.Event{watch.Modified, event.Object} + case !curObjPasses && oldObjPasses: + c.result <- watch.Event{watch.Deleted, event.Object} + } +} + +func (c *cacheWatcher) process(initEvents []cache.WatchCacheEvent) { for _, event := range initEvents { - if c.filter(event.Object) { - c.result <- event - } + c.sendWatchCacheEvent(event) } defer close(c.result) defer c.Stop() @@ -327,8 +341,6 @@ func (c *cacheWatcher) process(initEvents []watch.Event) { if !ok { return } - if c.filter(event.Object) { - c.result <- event - } + c.sendWatchCacheEvent(event) } } diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 763dcf0b172..0603ec3465d 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -25,7 +25,9 @@ import ( "github.com/coreos/go-etcd/etcd" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" @@ -300,6 +302,146 @@ func TestWatch(t *testing.T) { close(fakeClient.WatchResponse) } +func TestFiltering(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + prefixedKey := etcdtest.AddPrefix("pods") + fakeClient.ExpectNotFoundGet(prefixedKey) + cacher := newTestCacher(fakeClient) + fakeClient.WaitForWatchCompletion() + + podFoo := makeTestPod("foo") + podFoo.ObjectMeta.Labels = map[string]string{"filter": "foo"} + podFooFiltered := makeTestPod("foo") + + testCases := []struct { + object *api.Pod + etcdResponse *etcd.Response + filtered bool + event watch.EventType + }{ + { + object: podFoo, + etcdResponse: &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + filtered: true, + event: watch.Added, + }, + { + object: podFooFiltered, + etcdResponse: &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFooFiltered)), + CreatedIndex: 1, + ModifiedIndex: 2, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + filtered: true, + // Deleted, because the new object doesn't match filter. + event: watch.Deleted, + }, + { + object: podFoo, + etcdResponse: &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 3, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFooFiltered)), + CreatedIndex: 1, + ModifiedIndex: 2, + }, + }, + filtered: true, + // Added, because the previous object didn't match filter. + event: watch.Added, + }, + { + object: podFoo, + etcdResponse: &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 4, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 3, + }, + }, + filtered: true, + event: watch.Modified, + }, + { + object: podFoo, + etcdResponse: &etcd.Response{ + Action: "delete", + Node: &etcd.Node{ + CreatedIndex: 1, + ModifiedIndex: 5, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 4, + }, + }, + filtered: true, + event: watch.Deleted, + }, + } + + // Set up Watch for object "podFoo" with label filter set. + selector := labels.SelectorFromSet(labels.Set{"filter": "foo"}) + filter := func(obj runtime.Object) bool { + metadata, err := meta.Accessor(obj) + if err != nil { + t.Errorf("unexpected error: %v", err) + return false + } + return selector.Matches(labels.Set(metadata.Labels())) + } + watcher, err := cacher.Watch("pods/ns/foo", 1, filter) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + for _, test := range testCases { + fakeClient.WatchResponse <- test.etcdResponse + if test.filtered { + event := <-watcher.ResultChan() + if e, a := test.event, event.Type; e != a { + t.Errorf("%v %v", e, a) + } + // unset fields that are set by the infrastructure + obj := event.Object.(*api.Pod) + obj.ObjectMeta.ResourceVersion = "" + obj.ObjectMeta.CreationTimestamp = util.Time{} + if e, a := test.object, obj; !reflect.DeepEqual(e, a) { + t.Errorf("expected: %#v, got: %#v", e, a) + } + } + } + + close(fakeClient.WatchResponse) +} + func TestStorageError(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) prefixedKey := etcdtest.AddPrefix("pods")