From e424da7d0d89e36f005bbb37ee8119ad187e49ca Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 28 Jul 2015 08:26:53 +0200 Subject: [PATCH] Implement Cacher for watch in apiserver --- pkg/client/cache/reflector.go | 24 ++- pkg/client/cache/watch_cache.go | 17 +- pkg/storage/cacher.go | 334 +++++++++++++++++++++++++++++++ pkg/storage/cacher_test.go | 337 ++++++++++++++++++++++++++++++++ pkg/storage/util.go | 9 + 5 files changed, 703 insertions(+), 18 deletions(-) create mode 100644 pkg/storage/cacher.go create mode 100644 pkg/storage/cacher_test.go diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 216f49efc34..a8d74c9f14d 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -170,30 +170,27 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { return t.C, t.Stop } -func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) { +// Returns error if ListAndWatch didn't even tried to initialize watch. +func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { var resourceVersion string resyncCh, cleanup := r.resyncChan() defer cleanup() list, err := r.listerWatcher.List() if err != nil { - util.HandleError(fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)) - return + return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } meta, err := meta.Accessor(list) if err != nil { - util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)) - return + return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list) } resourceVersion = meta.ResourceVersion() items, err := runtime.ExtractList(list) if err != nil { - util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)) - return + return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } if err := r.syncWith(items, resourceVersion); err != nil { - util.HandleError(fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)) - return + return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } r.setLastSyncResourceVersion(resourceVersion) @@ -220,13 +217,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) { } } } - return + return nil } if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil { if err != errorResyncRequested && err != errorStopRequested { glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } - return + return nil } } } @@ -277,6 +274,7 @@ loop: util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } + newResourceVersion := meta.ResourceVersion() switch event.Type { case watch.Added: r.store.Add(event.Object) @@ -290,8 +288,8 @@ loop: default: util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } - *resourceVersion = meta.ResourceVersion() - r.setLastSyncResourceVersion(*resourceVersion) + *resourceVersion = newResourceVersion + r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } diff --git a/pkg/client/cache/watch_cache.go b/pkg/client/cache/watch_cache.go index c6d606eb9c3..60e86f7b9b2 100644 --- a/pkg/client/cache/watch_cache.go +++ b/pkg/client/cache/watch_cache.go @@ -123,13 +123,20 @@ func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, er if err != nil { return nil, 0, err } - resourceVersion, err := strconv.ParseUint(meta.ResourceVersion(), 10, 64) + resourceVersion, err := parseResourceVersion(meta.ResourceVersion()) if err != nil { return nil, 0, err } return object, resourceVersion, nil } +func parseResourceVersion(resourceVersion string) (uint64, error) { + if resourceVersion == "" { + return 0, nil + } + return strconv.ParseUint(resourceVersion, 10, 64) +} + func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { w.Lock() defer w.Unlock() @@ -186,7 +193,7 @@ func (w *WatchCache) Replace(objs []interface{}) error { } func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error { - version, err := strconv.ParseUint(resourceVersion, 10, 64) + version, err := parseResourceVersion(resourceVersion) if err != nil { return err } @@ -227,15 +234,15 @@ func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, e if size > 0 { oldest = w.cache[w.startIndex%w.capacity].resourceVersion } + if resourceVersion < oldest { + return nil, fmt.Errorf("too old resource version: %d (%d)", resourceVersion, oldest) + } // Binary seatch the smallest index at which resourceVersion is not smaller than // the given one. f := func(i int) bool { return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion } - if size > 0 && resourceVersion < oldest { - return nil, fmt.Errorf("too old resource version: %d (%d)", resourceVersion, oldest) - } first := sort.Search(size, f) result := make([]watch.Event, size-first) for i := 0; i < size-first; i++ { diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go new file mode 100644 index 00000000000..13cbd885851 --- /dev/null +++ b/pkg/storage/cacher.go @@ -0,0 +1,334 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "sync" + + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/conversion" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +// CacherConfig contains the configuration for a given Cache. +type CacherConfig struct { + // Maximum size of the history cached in memory. + CacheCapacity int + + // An underlying storage.Interface. + Storage Interface + + // An underlying storage.Versioner. + Versioner Versioner + + // The Cache will be caching objects of a given Type and assumes that they + // are all stored under ResourcePrefix directory in the underlying database. + Type interface{} + ResourcePrefix string + + // KeyFunc is used to get a key in the underyling storage for a given object. + KeyFunc func(runtime.Object) (string, error) + + // NewList is a function that creates new empty object storing a list of + // objects of type Type. + NewListFunc func() runtime.Object + + // Cacher will be stopped when the StopChannel will be closed. + StopChannel <-chan struct{} +} + +// Cacher is responsible for serving WATCH and LIST requests for a given +// resource from its internal cache and updating its cache in the background +// based on the underlying storage contents. +type Cacher struct { + sync.RWMutex + + // Whether Cacher is initialized. + initialized sync.WaitGroup + initOnce sync.Once + + // "sliding window" of recent changes of objects and the current state. + watchCache *cache.WatchCache + reflector *cache.Reflector + + // Registered watchers. + watcherIdx int + watchers map[int]*cacheWatcher + + // Versioner is used to handle resource versions. + versioner Versioner + + // keyFunc is used to get a key in the underyling storage for a given object. + keyFunc func(runtime.Object) (string, error) +} + +// Create a new Cacher responsible from service WATCH and LIST requests from its +// internal cache and updating its cache in the background based on the given +// configuration. +func NewCacher(config CacherConfig) *Cacher { + watchCache := cache.NewWatchCache(config.CacheCapacity) + listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + + cacher := &Cacher{ + initialized: sync.WaitGroup{}, + watchCache: watchCache, + reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), + watcherIdx: 0, + watchers: make(map[int]*cacheWatcher), + versioner: config.Versioner, + keyFunc: config.KeyFunc, + } + cacher.initialized.Add(1) + // See startCaching method for why explanation on it. + watchCache.SetOnReplace(func() { + cacher.initOnce.Do(func() { cacher.initialized.Done() }) + cacher.Unlock() + }) + watchCache.SetOnEvent(cacher.processEvent) + + stopCh := config.StopChannel + go util.Until(func() { cacher.startCaching(stopCh) }, 0, stopCh) + cacher.initialized.Wait() + return cacher +} + +func (c *Cacher) startCaching(stopChannel <-chan struct{}) { + c.Lock() + c.terminateAllWatchers() + // We explicitly do NOT Unlock() in this method. + // This is because we do not want to allow any WATCH/LIST methods before + // the cache is initialized. Once the underlying cache is propagated, + // onReplace handler will be called, which will do the Unlock() as + // configured in NewCacher(). + // Note: the same bahavior is also triggered every time we fall out of + // backen storage (e.g. etcd's) watch event window. + // Note that since onReplace may be not called due to errors, we explicitly + // need to retry it on errors under lock. + for { + err := c.reflector.ListAndWatch(stopChannel) + if err == nil { + break + } + } +} + +// Implements Watch (signature from storage.Interface). +func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { + c.Lock() + defer c.Unlock() + + initEvents, err := c.watchCache.GetAllEventsSince(resourceVersion) + if err != nil { + return nil, err + } + watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) + c.watchers[c.watcherIdx] = watcher + c.watcherIdx++ + return watcher, nil +} + +// Implements WatchList (signature from storage.Interface). +func (c *Cacher) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { + return c.Watch(key, resourceVersion, filter) +} + +// Implements List (signature from storage.Interface). +func (c *Cacher) List(key string, listObj runtime.Object) error { + listPtr, err := runtime.GetItemsPtr(listObj) + if err != nil { + return err + } + listVal, err := conversion.EnforcePtr(listPtr) + if err != nil || listVal.Kind() != reflect.Slice { + return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) + } + filter := filterFunction(key, c.keyFunc, Everything) + + objs, resourceVersion := c.watchCache.ListWithVersion() + for _, obj := range objs { + object, ok := obj.(runtime.Object) + if !ok { + return fmt.Errorf("non runtime.Object returned from storage: %v", obj) + } + if filter(object) { + listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) + } + } + if c.versioner != nil { + if err := c.versioner.UpdateList(listObj, resourceVersion); err != nil { + return err + } + } + return nil +} + +func (c *Cacher) processEvent(event watch.Event) { + c.Lock() + defer c.Unlock() + for _, watcher := range c.watchers { + watcher.add(event) + } +} + +func (c *Cacher) terminateAllWatchers() { + for key, watcher := range c.watchers { + delete(c.watchers, key) + watcher.stop() + } +} + +func forgetWatcher(c *Cacher, index int) func() { + return func() { + c.Lock() + defer c.Unlock() + // It's possible that the watcher is already not in the map (e.g. in case of + // simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything. + delete(c.watchers, index) + } +} + +func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter FilterFunc) FilterFunc { + return func(obj runtime.Object) bool { + objKey, err := keyFunc(obj) + if err != nil { + glog.Errorf("Invalid object for filter: %v", obj) + return false + } + if !strings.HasPrefix(objKey, key) { + return false + } + return filter(obj) + } +} + +// Returns resource version to which the underlying cache is synced. +func (c *Cacher) LastSyncResourceVersion() (uint64, error) { + c.RLock() + defer c.RUnlock() + + resourceVersion := c.reflector.LastSyncResourceVersion() + if resourceVersion == "" { + return 0, nil + } + return strconv.ParseUint(resourceVersion, 10, 64) +} + +// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. +type cacherListerWatcher struct { + storage Interface + resourcePrefix string + newListFunc func() runtime.Object +} + +func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { + return &cacherListerWatcher{ + storage: storage, + resourcePrefix: resourcePrefix, + newListFunc: newListFunc, + } +} + +// Implements cache.ListerWatcher interface. +func (lw *cacherListerWatcher) List() (runtime.Object, error) { + list := lw.newListFunc() + if err := lw.storage.List(lw.resourcePrefix, list); err != nil { + return nil, err + } + return list, nil +} + +// Implements cache.ListerWatcher interface. +func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, error) { + version, err := ParseWatchResourceVersion(resourceVersion, lw.resourcePrefix) + if err != nil { + return nil, err + } + return lw.storage.WatchList(lw.resourcePrefix, version, Everything) +} + +// cacherWatch implements watch.Interface +type cacheWatcher struct { + sync.Mutex + input chan watch.Event + result chan watch.Event + filter FilterFunc + stopped bool + forget func() +} + +func newCacheWatcher(initEvents []watch.Event, filter FilterFunc, forget func()) *cacheWatcher { + watcher := &cacheWatcher{ + input: make(chan watch.Event, 10), + result: make(chan watch.Event, 10), + filter: filter, + stopped: false, + forget: forget, + } + go watcher.process(initEvents) + return watcher +} + +// Implements watch.Interface. +func (c *cacheWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Implements watch.Interface. +func (c *cacheWatcher) Stop() { + c.forget() + c.stop() +} + +func (c *cacheWatcher) stop() { + c.Lock() + defer c.Unlock() + if !c.stopped { + c.stopped = true + close(c.input) + } +} + +func (c *cacheWatcher) add(event watch.Event) { + c.input <- event +} + +func (c *cacheWatcher) process(initEvents []watch.Event) { + for _, event := range initEvents { + if c.filter(event.Object) { + c.result <- event + } + } + defer close(c.result) + defer c.Stop() + for { + event, ok := <-c.input + if !ok { + return + } + if c.filter(event.Object) { + c.result <- event + } + } +} diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go new file mode 100644 index 00000000000..763dcf0b172 --- /dev/null +++ b/pkg/storage/cacher_test.go @@ -0,0 +1,337 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage_test + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" + etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/tools" + "k8s.io/kubernetes/pkg/tools/etcdtest" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" +) + +func newTestCacher(client tools.EtcdClient) *storage.Cacher { + prefix := "pods" + config := storage.CacherConfig{ + CacheCapacity: 10, + Versioner: etcdstorage.APIObjectVersioner{}, + Storage: etcdstorage.NewEtcdStorage(client, testapi.Codec(), etcdtest.PathPrefix()), + Type: &api.Pod{}, + ResourcePrefix: prefix, + KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + StopChannel: util.NeverStop, + } + return storage.NewCacher(config) +} + +func makeTestPod(name string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{Namespace: "ns", Name: name}, + Spec: api.PodSpec{ + DNSPolicy: api.DNSClusterFirst, + RestartPolicy: api.RestartPolicyAlways, + }, + } +} + +func waitForUpToDateCache(cacher *storage.Cacher, resourceVersion uint64) error { + ready := func() (bool, error) { + result, err := cacher.LastSyncResourceVersion() + if err != nil { + return false, err + } + return result == resourceVersion, nil + } + return wait.Poll(10*time.Millisecond, 100*time.Millisecond, ready) +} + +func TestList(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + prefixedKey := etcdtest.AddPrefix("pods") + fakeClient.ExpectNotFoundGet(prefixedKey) + cacher := newTestCacher(fakeClient) + fakeClient.WaitForWatchCompletion() + + podFoo := makeTestPod("foo") + podBar := makeTestPod("bar") + podBaz := makeTestPod("baz") + + podFooPrime := makeTestPod("foo") + podFooPrime.Spec.NodeName = "fakeNode" + + testCases := []*etcd.Response{ + { + Action: "create", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + { + Action: "create", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podBar)), + CreatedIndex: 2, + ModifiedIndex: 2, + }, + }, + { + Action: "create", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podBaz)), + CreatedIndex: 3, + ModifiedIndex: 3, + }, + }, + { + Action: "set", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFooPrime)), + CreatedIndex: 1, + ModifiedIndex: 4, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + { + Action: "delete", + Node: &etcd.Node{ + CreatedIndex: 1, + ModifiedIndex: 5, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podBar)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + } + + // Propagate some data to etcd. + for _, test := range testCases { + fakeClient.WatchResponse <- test + } + if err := waitForUpToDateCache(cacher, 5); err != nil { + t.Errorf("watch cache didn't propagated correctly: %v", err) + } + + result := &api.PodList{} + if err := cacher.List("pods/ns", result); err != nil { + t.Errorf("unexpected error: %v", err) + } + if result.ListMeta.ResourceVersion != "5" { + t.Errorf("incorrect resource version: %v", result.ListMeta.ResourceVersion) + } + if len(result.Items) != 2 { + t.Errorf("unexpected list result: %d", len(result.Items)) + } + keys := util.StringSet{} + for _, item := range result.Items { + keys.Insert(item.ObjectMeta.Name) + } + if !keys.HasAll("foo", "baz") { + t.Errorf("unexpected list result: %#v", result) + } + for _, item := range result.Items { + // unset fields that are set by the infrastructure + item.ObjectMeta.ResourceVersion = "" + item.ObjectMeta.CreationTimestamp = util.Time{} + + var expected *api.Pod + switch item.ObjectMeta.Name { + case "foo": + expected = podFooPrime + case "baz": + expected = podBaz + default: + t.Errorf("unexpected item: %v", item) + } + if e, a := *expected, item; !reflect.DeepEqual(e, a) { + t.Errorf("expected: %#v, got: %#v", e, a) + } + } + + close(fakeClient.WatchResponse) +} + +func TestWatch(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + prefixedKey := etcdtest.AddPrefix("pods") + fakeClient.ExpectNotFoundGet(prefixedKey) + cacher := newTestCacher(fakeClient) + fakeClient.WaitForWatchCompletion() + + podFoo := makeTestPod("foo") + podBar := makeTestPod("bar") + + testCases := []struct { + object *api.Pod + etcdResponse *etcd.Response + event watch.EventType + filtered bool + }{ + { + object: podFoo, + etcdResponse: &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + event: watch.Added, + filtered: true, + }, + { + object: podBar, + etcdResponse: &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podBar)), + CreatedIndex: 2, + ModifiedIndex: 2, + }, + }, + event: watch.Added, + filtered: false, + }, + { + object: podFoo, + etcdResponse: &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 3, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + event: watch.Modified, + filtered: true, + }, + } + + // Set up Watch for object "podFoo". + watcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + for _, test := range testCases { + fakeClient.WatchResponse <- test.etcdResponse + if test.filtered { + event := <-watcher.ResultChan() + if e, a := test.event, event.Type; e != a { + t.Errorf("%v %v", e, a) + } + // unset fields that are set by the infrastructure + obj := event.Object.(*api.Pod) + obj.ObjectMeta.ResourceVersion = "" + obj.ObjectMeta.CreationTimestamp = util.Time{} + if e, a := test.object, obj; !reflect.DeepEqual(e, a) { + t.Errorf("expected: %#v, got: %#v", e, a) + } + } + } + + // Check whether we get too-old error. + _, err = cacher.Watch("pods/ns/foo", 0, storage.Everything) + if err == nil { + t.Errorf("expected 'error too old' error") + } + + // Now test watch with initial state. + initialWatcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + for _, test := range testCases { + if test.filtered { + event := <-initialWatcher.ResultChan() + if e, a := test.event, event.Type; e != a { + t.Errorf("%v %v", e, a) + } + // unset fields that are set by the infrastructure + obj := event.Object.(*api.Pod) + obj.ObjectMeta.ResourceVersion = "" + obj.ObjectMeta.CreationTimestamp = util.Time{} + if e, a := test.object, obj; !reflect.DeepEqual(e, a) { + t.Errorf("expected: %#v, got: %#v", e, a) + } + } + } + + close(fakeClient.WatchResponse) +} + +func TestStorageError(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + prefixedKey := etcdtest.AddPrefix("pods") + fakeClient.ExpectNotFoundGet(prefixedKey) + cacher := newTestCacher(fakeClient) + fakeClient.WaitForWatchCompletion() + + podFoo := makeTestPod("foo") + + // Set up Watch for object "podFoo". + watcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + } + _ = <-watcher.ResultChan() + + // Injecting error is simulating error from etcd. + // This is almost the same what would happen e.g. in case of + // "error too old" when reconnecting to etcd watch. + fakeClient.WatchInjectError <- fmt.Errorf("fake error") + + _, ok := <-watcher.ResultChan() + if ok { + t.Errorf("unexpected event") + } +} diff --git a/pkg/storage/util.go b/pkg/storage/util.go index e1efca9bc7c..518b31b2537 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -20,6 +20,7 @@ import ( "strconv" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/fielderrors" ) @@ -49,3 +50,11 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { } return version + 1, nil } + +func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", err + } + return prefix + "/" + meta.Namespace() + "/" + meta.Name(), nil +}