diff --git a/pkg/client/unversioned/cache/reflector_test.go b/pkg/client/unversioned/cache/reflector_test.go index e592638cd41..c57b723ae7b 100644 --- a/pkg/client/unversioned/cache/reflector_test.go +++ b/pkg/client/unversioned/cache/reflector_test.go @@ -359,34 +359,3 @@ func TestReflector_ListAndWatchWithErrors(t *testing.T) { r.ListAndWatch(util.NeverStop) } } - -func TestReflectorForWatchCache(t *testing.T) { - store := NewWatchCache(5) - - { - _, version := store.ListWithVersion() - if version != 0 { - t.Errorf("unexpected resource version: %d", version) - } - } - - lw := &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { - fw := watch.NewFake() - go fw.Stop() - return fw, nil - }, - ListFunc: func() (runtime.Object, error) { - return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil - }, - } - r := NewReflector(lw, &api.Pod{}, store, 0) - r.ListAndWatch(util.NeverStop) - - { - _, version := store.ListWithVersion() - if version != 10 { - t.Errorf("unexpected resource version: %d", version) - } - } -} diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index c3a8c7ae975..ee76b029a1f 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -86,7 +86,7 @@ type Cacher struct { storage Interface // "sliding window" of recent changes of objects and the current state. - watchCache *cache.WatchCache + watchCache *watchCache reflector *cache.Reflector // Registered watchers. @@ -104,7 +104,7 @@ type Cacher struct { // internal cache and updating its cache in the background based on the given // configuration. func NewCacher(config CacherConfig) *Cacher { - watchCache := cache.NewWatchCache(config.CacheCapacity) + watchCache := newWatchCache(config.CacheCapacity) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) cacher := &Cacher{ @@ -272,7 +272,7 @@ func (c *Cacher) Codec() runtime.Codec { return c.storage.Codec() } -func (c *Cacher) processEvent(event cache.WatchCacheEvent) { +func (c *Cacher) processEvent(event watchCacheEvent) { c.Lock() defer c.Unlock() for _, watcher := range c.watchers { @@ -361,16 +361,16 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e // cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex - input chan cache.WatchCacheEvent + input chan watchCacheEvent result chan watch.Event filter FilterFunc stopped bool forget func() } -func newCacheWatcher(initEvents []cache.WatchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { +func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { watcher := &cacheWatcher{ - input: make(chan cache.WatchCacheEvent, 10), + input: make(chan watchCacheEvent, 10), result: make(chan watch.Event, 10), filter: filter, stopped: false, @@ -400,11 +400,11 @@ func (c *cacheWatcher) stop() { } } -func (c *cacheWatcher) add(event cache.WatchCacheEvent) { +func (c *cacheWatcher) add(event watchCacheEvent) { c.input <- event } -func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { +func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) oldObjPasses := false if event.PrevObject != nil { @@ -430,7 +430,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { } } -func (c *cacheWatcher) process(initEvents []cache.WatchCacheEvent) { +func (c *cacheWatcher) process(initEvents []watchCacheEvent) { for _, event := range initEvents { c.sendWatchCacheEvent(event) } diff --git a/pkg/client/unversioned/cache/watch_cache.go b/pkg/storage/watch_cache.go similarity index 76% rename from pkg/client/unversioned/cache/watch_cache.go rename to pkg/storage/watch_cache.go index ae49f0f851f..d41a2b7960b 100644 --- a/pkg/client/unversioned/cache/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cache +package storage import ( "fmt" @@ -23,20 +23,16 @@ import ( "sync" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/client/unversioned/cache" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" ) -// TODO(wojtek-t): All structure in this file should be private to -// pkg/storage package. We should remove the reference to WatchCache -// from Reflector (by changing the Replace method signature in Store -// interface to take resource version too) and move it under pkg/storage. - -// WatchCacheEvent is a single "watch event" that is send to users of -// WatchCache. Additionally to a typical "watch.Event" it contains +// watchCacheEvent is a single "watch event" that is send to users of +// watchCache. Additionally to a typical "watch.Event" it contains // the previous value of the object to enable proper filtering in the // upper layers. -type WatchCacheEvent struct { +type watchCacheEvent struct { Type watch.EventType Object runtime.Object PrevObject runtime.Object @@ -47,15 +43,15 @@ type WatchCacheEvent struct { // itself. type watchCacheElement struct { resourceVersion uint64 - watchCacheEvent WatchCacheEvent + watchCacheEvent watchCacheEvent } -// WatchCache implements a Store interface. +// watchCache implements a Store interface. // However, it depends on the elements implementing runtime.Object interface. // -// WatchCache is a "sliding window" (with a limitted capacity) of objects +// watchCache is a "sliding window" (with a limitted capacity) of objects // observed from a watch. -type WatchCache struct { +type watchCache struct { sync.RWMutex // Maximum size of history window. @@ -73,9 +69,9 @@ type WatchCache struct { // store will effectively support LIST operation from the "end of cache // history" i.e. from the moment just after the newest cached watched event. // It is necessary to effectively allow clients to start watching at now. - store Store + store cache.Store - // ResourceVersion up to which the WatchCache is propagated. + // ResourceVersion up to which the watchCache is propagated. resourceVersion uint64 // This handler is run at the end of every successful Replace() method. @@ -83,21 +79,21 @@ type WatchCache struct { // This handler is run at the end of every Add/Update/Delete method // and additionally gets the previous value of the object. - onEvent func(WatchCacheEvent) + onEvent func(watchCacheEvent) } -func NewWatchCache(capacity int) *WatchCache { - return &WatchCache{ +func newWatchCache(capacity int) *watchCache { + return &watchCache{ capacity: capacity, cache: make([]watchCacheElement, capacity), startIndex: 0, endIndex: 0, - store: NewStore(MetaNamespaceKeyFunc), + store: cache.NewStore(cache.MetaNamespaceKeyFunc), resourceVersion: 0, } } -func (w *WatchCache) Add(obj interface{}) error { +func (w *watchCache) Add(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { return err @@ -108,7 +104,7 @@ func (w *WatchCache) Add(obj interface{}) error { return w.processEvent(event, resourceVersion, f) } -func (w *WatchCache) Update(obj interface{}) error { +func (w *watchCache) Update(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { return err @@ -119,7 +115,7 @@ func (w *WatchCache) Update(obj interface{}) error { return w.processEvent(event, resourceVersion, f) } -func (w *WatchCache) Delete(obj interface{}) error { +func (w *watchCache) Delete(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { return err @@ -153,7 +149,7 @@ func parseResourceVersion(resourceVersion string) (uint64, error) { return strconv.ParseUint(resourceVersion, 10, 64) } -func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { +func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { w.Lock() defer w.Unlock() previous, exists, err := w.store.Get(event.Object) @@ -166,7 +162,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd } else { prevObject = nil } - watchCacheEvent := WatchCacheEvent{event.Type, event.Object, prevObject} + watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject} if w.onEvent != nil { w.onEvent(watchCacheEvent) } @@ -176,7 +172,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd } // Assumes that lock is already held for write. -func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) { +func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) { if w.endIndex == w.startIndex+w.capacity { // Cache is full - remove the oldest element. w.startIndex++ @@ -185,37 +181,37 @@ func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) w.endIndex++ } -func (w *WatchCache) List() []interface{} { +func (w *watchCache) List() []interface{} { w.RLock() defer w.RUnlock() return w.store.List() } -func (w *WatchCache) ListWithVersion() ([]interface{}, uint64) { +func (w *watchCache) ListWithVersion() ([]interface{}, uint64) { w.RLock() defer w.RUnlock() return w.store.List(), w.resourceVersion } -func (w *WatchCache) ListKeys() []string { +func (w *watchCache) ListKeys() []string { w.RLock() defer w.RUnlock() return w.store.ListKeys() } -func (w *WatchCache) Get(obj interface{}) (interface{}, bool, error) { +func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) { w.RLock() defer w.RUnlock() return w.store.Get(obj) } -func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) { +func (w *watchCache) GetByKey(key string) (interface{}, bool, error) { w.RLock() defer w.RUnlock() return w.store.GetByKey(key) } -func (w *WatchCache) Replace(objs []interface{}, resourceVersion string) error { +func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { version, err := parseResourceVersion(resourceVersion) if err != nil { return err @@ -236,19 +232,19 @@ func (w *WatchCache) Replace(objs []interface{}, resourceVersion string) error { return nil } -func (w *WatchCache) SetOnReplace(onReplace func()) { +func (w *watchCache) SetOnReplace(onReplace func()) { w.Lock() defer w.Unlock() w.onReplace = onReplace } -func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) { +func (w *watchCache) SetOnEvent(onEvent func(watchCacheEvent)) { w.Lock() defer w.Unlock() w.onEvent = onEvent } -func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]WatchCacheEvent, error) { +func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]watchCacheEvent, error) { size := w.endIndex - w.startIndex oldest := w.resourceVersion if size > 0 { @@ -264,14 +260,14 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]Wa return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion } first := sort.Search(size, f) - result := make([]WatchCacheEvent, size-first) + result := make([]watchCacheEvent, size-first) for i := 0; i < size-first; i++ { result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent } return result, nil } -func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) { +func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEvent, error) { w.RLock() defer w.RUnlock() return w.GetAllEventsSinceThreadUnsafe(resourceVersion) diff --git a/pkg/client/unversioned/cache/watch_cache_test.go b/pkg/storage/watch_cache_test.go similarity index 83% rename from pkg/client/unversioned/cache/watch_cache_test.go rename to pkg/storage/watch_cache_test.go index be827ba556f..f94e35d94f4 100644 --- a/pkg/client/unversioned/cache/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -14,13 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cache +package storage import ( "strconv" "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/unversioned/cache" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" ) @@ -36,7 +38,7 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod { } func TestWatchCacheBasic(t *testing.T) { - store := NewWatchCache(2) + store := newWatchCache(2) // Test Add/Update/Delete. pod1 := makeTestPod("pod", 1) @@ -106,7 +108,7 @@ func TestWatchCacheBasic(t *testing.T) { } func TestEvents(t *testing.T) { - store := NewWatchCache(5) + store := newWatchCache(5) store.Add(makeTestPod("pod", 2)) @@ -221,3 +223,44 @@ func TestEvents(t *testing.T) { } } } + +type testLW struct { + ListFunc func() (runtime.Object, error) + WatchFunc func(resourceVersion string) (watch.Interface, error) +} + +func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() } +func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { + return t.WatchFunc(resourceVersion) +} + +func TestReflectorForWatchCache(t *testing.T) { + store := newWatchCache(5) + + { + _, version := store.ListWithVersion() + if version != 0 { + t.Errorf("unexpected resource version: %d", version) + } + } + + lw := &testLW{ + WatchFunc: func(rv string) (watch.Interface, error) { + fw := watch.NewFake() + go fw.Stop() + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil + }, + } + r := cache.NewReflector(lw, &api.Pod{}, store, 0) + r.ListAndWatch(util.NeverStop) + + { + _, version := store.ListWithVersion() + if version != 10 { + t.Errorf("unexpected resource version: %d", version) + } + } +}