From 52e3af4e9358cb59852544cf0e6d90a3611ee58c Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 28 Jul 2015 08:26:53 +0200 Subject: [PATCH] Implement watchCache structure. --- pkg/client/cache/reflector.go | 14 +- pkg/client/cache/reflector_test.go | 41 ++++- pkg/client/cache/watch_cache.go | 245 +++++++++++++++++++++++++++ pkg/client/cache/watch_cache_test.go | 163 ++++++++++++++++++ 4 files changed, 453 insertions(+), 10 deletions(-) create mode 100644 pkg/client/cache/watch_cache.go create mode 100644 pkg/client/cache/watch_cache_test.go diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 5e6563b81b4..216f49efc34 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -135,13 +135,13 @@ outer: // Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a goroutine and returns immediately. func (r *Reflector) Run() { - go util.Forever(func() { r.listAndWatch(util.NeverStop) }, r.period) + go util.Forever(func() { r.ListAndWatch(util.NeverStop) }, r.period) } // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. // RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed. func (r *Reflector) RunUntil(stopCh <-chan struct{}) { - go util.Until(func() { r.listAndWatch(stopCh) }, r.period, stopCh) + go util.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh) } var ( @@ -170,7 +170,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { return t.C, t.Stop } -func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { +func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) { var resourceVersion string resyncCh, cleanup := r.resyncChan() defer cleanup() @@ -191,7 +191,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)) return } - if err := r.syncWith(items); err != nil { + if err := r.syncWith(items, resourceVersion); err != nil { util.HandleError(fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)) return } @@ -232,12 +232,16 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { } // syncWith replaces the store's items with the given list. -func (r *Reflector) syncWith(items []runtime.Object) error { +func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items)) for _, item := range items { found = append(found, item) } + myStore, ok := r.store.(*WatchCache) + if ok { + return myStore.ReplaceWithVersion(found, resourceVersion) + } return r.store.Replace(found) } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index d1fb7f6ed84..e592638cd41 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -51,7 +51,7 @@ func TestCloseWatchChannelOnError(t *testing.T) { return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil }, } - go r.listAndWatch(util.NeverStop) + go r.ListAndWatch(util.NeverStop) fw.Error(pod) select { case _, ok := <-fw.ResultChan(): @@ -214,7 +214,7 @@ func TestReflectorStopWatch(t *testing.T) { } } -func TestReflector_listAndWatch(t *testing.T) { +func TestReflector_ListAndWatch(t *testing.T) { createdFakes := make(chan *watch.FakeWatcher) // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc @@ -239,7 +239,7 @@ func TestReflector_listAndWatch(t *testing.T) { } s := NewFIFO(MetaNamespaceKeyFunc) r := NewReflector(lw, &api.Pod{}, s, 0) - go r.listAndWatch(util.NeverStop) + go r.ListAndWatch(util.NeverStop) ids := []string{"foo", "bar", "baz", "qux", "zoo"} var fw *watch.FakeWatcher @@ -272,7 +272,7 @@ func TestReflector_listAndWatch(t *testing.T) { } } -func TestReflector_listAndWatchWithErrors(t *testing.T) { +func TestReflector_ListAndWatchWithErrors(t *testing.T) { mkPod := func(id string, rv string) *api.Pod { return &api.Pod{ObjectMeta: api.ObjectMeta{Name: id, ResourceVersion: rv}} } @@ -356,6 +356,37 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) { }, } r := NewReflector(lw, &api.Pod{}, s, 0) - r.listAndWatch(util.NeverStop) + r.ListAndWatch(util.NeverStop) + } +} + +func TestReflectorForWatchCache(t *testing.T) { + store := NewWatchCache(5) + + { + _, version := store.ListWithVersion() + if version != 0 { + t.Errorf("unexpected resource version: %d", version) + } + } + + lw := &testLW{ + WatchFunc: func(rv string) (watch.Interface, error) { + fw := watch.NewFake() + go fw.Stop() + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil + }, + } + r := NewReflector(lw, &api.Pod{}, store, 0) + r.ListAndWatch(util.NeverStop) + + { + _, version := store.ListWithVersion() + if version != 10 { + t.Errorf("unexpected resource version: %d", version) + } } } diff --git a/pkg/client/cache/watch_cache.go b/pkg/client/cache/watch_cache.go new file mode 100644 index 00000000000..2adbea62f90 --- /dev/null +++ b/pkg/client/cache/watch_cache.go @@ -0,0 +1,245 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sort" + "strconv" + "sync" + + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// watchCacheElement is a single "watch event" stored in a cache. +// It contains the resource version of the object and the object +// itself. +type watchCacheElement struct { + resourceVersion uint64 + event watch.Event +} + +// WatchCache implements a Store interface. +// However, it depends on the elements implementing runtime.Object interface. +// +// WatchCache is a "sliding window" (with a limitted capacity) of objects +// observed from a watch. +type WatchCache struct { + sync.RWMutex + + // Maximum size of history window. + capacity int + + // cache is used a cyclic buffer - its first element (with the smallest + // resourceVersion) is defined by startIndex, its last element is defined + // by endIndex (if cache is full it will be startIndex + capacity). + // Both startIndex and endIndex can be greater than buffer capacity - + // you should always apply modulo capacity to get an index in cache array. + cache []watchCacheElement + startIndex int + endIndex int + + // store will effectively support LIST operation from the "end of cache + // history" i.e. from the moment just after the newest cached watched event. + // It is necessary to effectively allow clients to start watching at now. + store Store + + // ResourceVersion up to which the WatchCache is propagated. + resourceVersion uint64 + + // This handler is run at the end of every successful Replace() method. + onReplace func() + + // This handler is run at the end of every Add/Update/Delete method. + onEvent func(watch.Event) +} + +func NewWatchCache(capacity int) *WatchCache { + return &WatchCache{ + capacity: capacity, + cache: make([]watchCacheElement, capacity), + startIndex: 0, + endIndex: 0, + store: NewStore(MetaNamespaceKeyFunc), + resourceVersion: 0, + } +} + +func (w *WatchCache) Add(obj interface{}) error { + object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{watch.Added, object} + + f := func(obj runtime.Object) error { return w.store.Add(obj) } + return w.processEvent(event, resourceVersion, f) +} + +func (w *WatchCache) Update(obj interface{}) error { + object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{watch.Modified, object} + + f := func(obj runtime.Object) error { return w.store.Update(obj) } + return w.processEvent(event, resourceVersion, f) +} + +func (w *WatchCache) Delete(obj interface{}) error { + object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{watch.Deleted, object} + + f := func(obj runtime.Object) error { return w.store.Delete(obj) } + return w.processEvent(event, resourceVersion, f) +} + +func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) { + object, ok := obj.(runtime.Object) + if !ok { + return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) + } + meta, err := meta.Accessor(object) + if err != nil { + return nil, 0, err + } + resourceVersion, err := strconv.ParseUint(meta.ResourceVersion(), 10, 64) + if err != nil { + return nil, 0, err + } + return object, resourceVersion, nil +} + +func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { + w.Lock() + defer w.Unlock() + if w.onEvent != nil { + w.onEvent(event) + } + w.updateCache(resourceVersion, event) + w.resourceVersion = resourceVersion + return updateFunc(event.Object) +} + +// Assumes that lock is already held for write. +func (w *WatchCache) updateCache(resourceVersion uint64, event watch.Event) { + if w.endIndex == w.startIndex+w.capacity { + // Cache is full - remove the oldest element. + w.startIndex++ + } + w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} + w.endIndex++ +} + +func (w *WatchCache) List() []interface{} { + w.RLock() + defer w.RUnlock() + return w.store.List() +} + +func (w *WatchCache) ListWithVersion() ([]interface{}, uint64) { + w.RLock() + defer w.RUnlock() + return w.store.List(), w.resourceVersion +} + +func (w *WatchCache) ListKeys() []string { + w.RLock() + defer w.RUnlock() + return w.store.ListKeys() +} + +func (w *WatchCache) Get(obj interface{}) (interface{}, bool, error) { + w.RLock() + defer w.RUnlock() + return w.store.Get(obj) +} + +func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) { + w.RLock() + defer w.RUnlock() + return w.store.GetByKey(key) +} + +func (w *WatchCache) Replace(objs []interface{}) error { + return w.ReplaceWithVersion(objs, "0") +} + +func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error { + version, err := strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + return err + } + + w.Lock() + defer w.Unlock() + + w.startIndex = 0 + w.endIndex = 0 + if err := w.store.Replace(objs); err != nil { + return err + } + w.resourceVersion = version + if w.onReplace != nil { + w.onReplace() + } + return nil +} + +func (w *WatchCache) SetOnReplace(onReplace func()) { + w.Lock() + defer w.Unlock() + w.onReplace = onReplace +} + +func (w *WatchCache) SetOnEvent(onEvent func(watch.Event)) { + w.Lock() + defer w.Unlock() + w.onEvent = onEvent +} + +func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, error) { + w.RLock() + defer w.RUnlock() + + size := w.endIndex - w.startIndex + oldest := w.resourceVersion + if size > 0 { + oldest = w.cache[w.startIndex%w.capacity].resourceVersion + } + + // Binary seatch the smallest index at which resourceVersion is not smaller than + // the given one. + f := func(i int) bool { + return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion + } + if size > 0 && resourceVersion < oldest { + return nil, fmt.Errorf("too old resource version: %d (%d)", resourceVersion, oldest) + } + first := sort.Search(size, f) + result := make([]watch.Event, size-first) + for i := 0; i < size-first; i++ { + result[i] = w.cache[(w.startIndex+first+i)%w.capacity].event + } + return result, nil +} diff --git a/pkg/client/cache/watch_cache_test.go b/pkg/client/cache/watch_cache_test.go new file mode 100644 index 00000000000..d910a505a6f --- /dev/null +++ b/pkg/client/cache/watch_cache_test.go @@ -0,0 +1,163 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "strconv" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util" +) + +func makeTestPod(name string, resourceVersion uint64) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "ns", + Name: name, + ResourceVersion: strconv.FormatUint(resourceVersion, 10), + }, + } +} + +func TestWatchCacheBasic(t *testing.T) { + store := NewWatchCache(2) + + // Test Add/Update/Delete. + pod1 := makeTestPod("pod", 1) + if err := store.Add(pod1); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item, ok, _ := store.Get(pod1); !ok { + t.Errorf("didn't find pod") + } else { + if !api.Semantic.DeepEqual(pod1, item) { + t.Errorf("expected %v, got %v", pod1, item) + } + } + pod2 := makeTestPod("pod", 2) + if err := store.Update(pod2); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item, ok, _ := store.Get(pod2); !ok { + t.Errorf("didn't find pod") + } else { + if !api.Semantic.DeepEqual(pod2, item) { + t.Errorf("expected %v, got %v", pod1, item) + } + } + pod3 := makeTestPod("pod", 3) + if err := store.Delete(pod3); err != nil { + t.Errorf("unexpected error: %v", err) + } + if _, ok, _ := store.Get(pod3); ok { + t.Errorf("found pod") + } + + // Test List. + store.Add(makeTestPod("pod1", 4)) + store.Add(makeTestPod("pod2", 5)) + store.Add(makeTestPod("pod3", 6)) + { + podNames := util.StringSet{} + for _, item := range store.List() { + podNames.Insert(item.(*api.Pod).ObjectMeta.Name) + } + if !podNames.HasAll("pod1", "pod2", "pod3") { + t.Errorf("missing pods, found %v", podNames) + } + if len(podNames) != 3 { + t.Errorf("found missing/extra items") + } + } + + // Test Replace. + store.Replace([]interface{}{ + makeTestPod("pod4", 7), + makeTestPod("pod5", 8), + }) + { + podNames := util.StringSet{} + for _, item := range store.List() { + podNames.Insert(item.(*api.Pod).ObjectMeta.Name) + } + if !podNames.HasAll("pod4", "pod5") { + t.Errorf("missing pods, found %v", podNames) + } + if len(podNames) != 2 { + t.Errorf("found missing/extra items") + } + } +} + +func TestEvents(t *testing.T) { + store := NewWatchCache(5) + + store.Add(makeTestPod("pod", 2)) + store.Update(makeTestPod("pod", 3)) + store.Update(makeTestPod("pod", 4)) + + // Test with not full cache. + { + _, err := store.GetAllEventsSince(1) + if err == nil { + t.Errorf("expected error too old") + } + } + { + result, err := store.GetAllEventsSince(3) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 2 { + t.Fatalf("unexpected events: %v", result) + } + for i := 0; i < 2; i++ { + pod := makeTestPod("pod", uint64(i+3)) + if !api.Semantic.DeepEqual(pod, result[i].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) + } + } + } + + for i := 5; i < 9; i++ { + store.Update(makeTestPod("pod", uint64(i))) + } + + // Test with full cache - there should be elements from 4 to 8. + { + _, err := store.GetAllEventsSince(3) + if err == nil { + t.Errorf("expected error too old") + } + } + { + result, err := store.GetAllEventsSince(4) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 5 { + t.Fatalf("unexpected events: %v", result) + } + for i := 0; i < 5; i++ { + pod := makeTestPod("pod", uint64(i+4)) + if !api.Semantic.DeepEqual(pod, result[i].Object) { + t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) + } + } + } +}